-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcoap_client.py
56 lines (36 loc) · 1.51 KB
/
coap_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
from aiocoap import *
import time
import struct
async def fetch_image(uri, output_path):
context = await Context.create_client_context()
request = Message(code=GET, uri=uri)
client_start_time = time.time()
try:
response = await context.request(request).response
client_end_time = time.time()
# Unpack the server timestamp from the response payload
server_timestamp, = struct.unpack('d', response.payload[:8])
image_data = response.payload[8:]
with open(output_path, 'wb') as file:
file.write(image_data)
print(f"Image successfully saved to {output_path}")
# Calculate times
rtt = client_end_time - client_start_time
transmission_time = client_end_time - server_timestamp
print(f"Round-trip time (RTT): {rtt:.6f} seconds")
print(f"Time taken for image transmission: {transmission_time:.6f} seconds")
except Exception as e:
print(f"An error occurred: {e}")
async def main():
coap_server_uri = "coap://192.168.137.106:5683/image"
output_file_path = r"C:\Users\anant\OneDrive\Pictures\received_image_by_coap.png"
await fetch_image(coap_server_uri, output_file_path)
# Check if there's an existing event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If the event loop is already running, just create a task for the main function
asyncio.ensure_future(main())
else:
# Otherwise, run the main function in a new event loop
asyncio.run(main())