-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnetwork.py
95 lines (75 loc) · 2.87 KB
/
network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from socket import socket, AF_INET, SOCK_DGRAM, inet_aton, inet_ntoa
import random, time
import threading, queue
from socketserver import ThreadingUDPServer
lock = threading.Lock()
rate = 10000
corrupt_rate = None
loss_rate = 0.1
buffer_size = 500000
def bytes_to_addr(bytes):
return inet_ntoa(bytes[:4]), int.from_bytes(bytes[4:8], 'big')
def addr_to_bytes(addr):
return inet_aton(addr[0]) + addr[1].to_bytes(4, 'big')
def corrupt(data: bytes) -> bytes:
raw = list(data)
for _ in range(0, random.randint(0, 3)):
pos = random.randint(0, len(raw) - 1)
raw[pos] = random.randint(0, 255)
return bytes(raw)
class Server(ThreadingUDPServer):
def __init__(self, addr, rate=None, delay=None):
super().__init__(addr, None)
self.rate = rate
self.buffer = 0
self.delay = delay
def verify_request(self, request, client_address):
"""
request is a tuple (data, socket)
data is the received bytes object
socket is new socket created automatically to handle the request
if this function returns False, the request will not be processed, i.e. is discarded.
details: https://docs.python.org/3/library/socketserver.html
"""
if self.buffer < buffer_size: # some finite buffer size (in bytes)
self.buffer += len(request[0])
return True
else:
return False
def finish_request(self, request, client_address):
data, socket = request
with lock:
if self.rate:
time.sleep(len(data) / self.rate)
self.buffer -= len(data)
"""
blockingly process each request
you can add random loss/corrupt here
for example:
if random.random() < loss_rate:
return
for i in range(len(data)-1):
if random.random() < corrupt_rate:
data[i] = data[:i] + (data[i]+1).to_bytes(1,'big) + data[i+1:]
"""
if random.random() < loss_rate:
return
"""
this part is not blocking and is executed by multiple threads in parallel
you can add random delay here, this would change the order of arrival at the receiver side.
for example:
time.sleep(random.random())
"""
to = bytes_to_addr(data[:8])
print(client_address, to) # observe tht traffic
data = bytearray(data)
if corrupt_rate:
for i in range(len(data[8:])):
if random.random() < corrupt_rate:
print("Corrupt")
data[8 + i] = data[8 + i] ^ 0xff
socket.sendto(addr_to_bytes(client_address) + data[8:], to)
server_address = ('127.0.0.1', 12345)
if __name__ == '__main__':
with Server(server_address, rate=rate) as server:
server.serve_forever()