forked from HelloZeroNet/ZeroNet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConnectionServer.py
250 lines (212 loc) · 10.4 KB
/
ConnectionServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import logging
import time
import sys
import gevent
import msgpack
from gevent.server import StreamServer
from gevent.pool import Pool
from collections import defaultdict
from Debug import Debug
from Connection import Connection
from Config import config
from Crypt import CryptConnection
from Crypt import CryptHash
from Tor import TorManager
class ConnectionServer:
def __init__(self, ip=None, port=None, request_handler=None):
self.ip = ip
self.port = port
self.last_connection_id = 1 # Connection id incrementer
self.log = logging.getLogger("ConnServer")
self.port_opened = None
if config.tor != "disabled":
self.tor_manager = TorManager(self.ip, self.port)
else:
self.tor_manager = None
self.connections = [] # Connections
self.whitelist = config.ip_local # No flood protection on this ips
self.ip_incoming = {} # Incoming connections from ip in the last minute to avoid connection flood
self.broken_ssl_peer_ids = {} # Peerids of broken ssl connections
self.ips = {} # Connection by ip
self.has_internet = True # Internet outage detection
self.stream_server = None
self.running = True
self.thread_checker = gevent.spawn(self.checkConnections)
self.stat_recv = defaultdict(lambda: defaultdict(int))
self.stat_sent = defaultdict(lambda: defaultdict(int))
self.bytes_recv = 0
self.bytes_sent = 0
# Bittorrent style peerid
self.peer_id = "-ZN0%s-%s" % (config.version.replace(".", ""), CryptHash.random(12, "base64"))
# Check msgpack version
if msgpack.version[0] == 0 and msgpack.version[1] < 4:
self.log.error(
"Error: Unsupported msgpack version: %s (<0.4.0), please run `sudo apt-get install python-pip; sudo pip install msgpack-python --upgrade`" %
str(msgpack.version)
)
sys.exit(0)
if port: # Listen server on a port
self.pool = Pool(500) # do not accept more than 500 connections
self.stream_server = StreamServer(
(ip.replace("*", "0.0.0.0"), port), self.handleIncomingConnection, spawn=self.pool, backlog=100
)
if request_handler:
self.handleRequest = request_handler
def start(self):
self.running = True
CryptConnection.manager.loadCerts()
self.log.debug("Binding to: %s:%s, (msgpack: %s), supported crypt: %s" % (
self.ip, self.port,
".".join(map(str, msgpack.version)), CryptConnection.manager.crypt_supported)
)
try:
self.stream_server.serve_forever() # Start normal connection server
except Exception, err:
self.log.info("StreamServer bind error, must be running already: %s" % err)
def stop(self):
self.running = False
if self.stream_server:
self.stream_server.stop()
def handleIncomingConnection(self, sock, addr):
ip, port = addr
# Connection flood protection
if ip in self.ip_incoming and ip not in self.whitelist:
self.ip_incoming[ip] += 1
if self.ip_incoming[ip] > 6: # Allow 6 in 1 minute from same ip
self.log.debug("Connection flood detected from %s" % ip)
time.sleep(30)
sock.close()
return False
else:
self.ip_incoming[ip] = 1
connection = Connection(self, ip, port, sock)
self.connections.append(connection)
self.ips[ip] = connection
connection.handleIncomingConnection(sock)
def getConnection(self, ip=None, port=None, peer_id=None, create=True, site=None):
if ip.endswith(".onion") and self.tor_manager.start_onions and site: # Site-unique connection for Tor
site_onion = self.tor_manager.getOnion(site.address)
key = ip + site_onion
else:
key = ip
# Find connection by ip
if key in self.ips:
connection = self.ips[key]
if not peer_id or connection.handshake.get("peer_id") == peer_id: # Filter by peer_id
if not connection.connected and create:
succ = connection.event_connected.get() # Wait for connection
if not succ:
raise Exception("Connection event return error")
return connection
# Recover from connection pool
for connection in self.connections:
if connection.ip == ip:
if peer_id and connection.handshake.get("peer_id") != peer_id: # Does not match
continue
if ip.endswith(".onion") and self.tor_manager.start_onions and ip.replace(".onion", "") != connection.target_onion:
# For different site
continue
if not connection.connected and create:
succ = connection.event_connected.get() # Wait for connection
if not succ:
raise Exception("Connection event return error")
return connection
# No connection found
if create: # Allow to create new connection if not found
if port == 0:
raise Exception("This peer is not connectable")
try:
if ip.endswith(".onion") and self.tor_manager.start_onions and site: # Lock connection to site
connection = Connection(self, ip, port, target_onion=site_onion)
else:
connection = Connection(self, ip, port)
self.ips[key] = connection
self.connections.append(connection)
succ = connection.connect()
if not succ:
connection.close("Connection event return error")
raise Exception("Connection event return error")
except Exception, err:
connection.close("%s Connect error: %s" % (ip, Debug.formatException(err)))
raise err
return connection
else:
return None
def removeConnection(self, connection):
# Delete if same as in registry
if self.ips.get(connection.ip) == connection:
del self.ips[connection.ip]
# Site locked connection
if connection.target_onion:
if self.ips.get(connection.ip + connection.target_onion) == connection:
del self.ips[connection.ip + connection.target_onion]
# Cert pinned connection
if connection.cert_pin and self.ips.get(connection.ip + "#" + connection.cert_pin) == connection:
del self.ips[connection.ip + "#" + connection.cert_pin]
if connection in self.connections:
self.connections.remove(connection)
def checkConnections(self):
run_i = 0
while self.running:
run_i += 1
time.sleep(15) # Check every minute
self.ip_incoming = {} # Reset connected ips counter
self.broken_ssl_peer_ids = {} # Reset broken ssl peerids count
last_message_time = 0
s = time.time()
for connection in self.connections[:]: # Make a copy
idle = time.time() - max(connection.last_recv_time, connection.start_time, connection.last_message_time)
last_message_time = max(last_message_time, connection.last_message_time)
if connection.unpacker and idle > 30:
# Delete the unpacker if not needed
del connection.unpacker
connection.unpacker = None
elif connection.last_cmd_sent == "announce" and idle > 20: # Bootstrapper connection close after 20 sec
connection.close("[Cleanup] Tracker connection: %s" % idle)
if idle > 60 * 60:
# Wake up after 1h
connection.close("[Cleanup] After wakeup, idle: %s" % idle)
elif idle > 20 * 60 and connection.last_send_time < time.time() - 10:
# Idle more than 20 min and we have not sent request in last 10 sec
if not connection.ping():
connection.close("[Cleanup] Ping timeout")
elif idle > 10 and connection.incomplete_buff_recv > 0:
# Incomplete data with more than 10 sec idle
connection.close("[Cleanup] Connection buff stalled")
elif idle > 10 and connection.protocol == "?": # No connection after 10 sec
connection.close(
"[Cleanup] Connect timeout: %.3fs" % idle
)
elif idle > 10 and connection.waiting_requests and time.time() - connection.last_send_time > 10:
# Sent command and no response in 10 sec
connection.close(
"[Cleanup] Command %s timeout: %.3fs" % (connection.last_cmd_sent, time.time() - connection.last_send_time)
)
elif idle < 60 and connection.bad_actions > 40:
connection.close(
"[Cleanup] Too many bad actions: %s" % connection.bad_actions
)
elif idle > 5*60 and connection.sites == 0:
connection.close(
"[Cleanup] No site for connection"
)
elif run_i % 90 == 0:
# Reset bad action counter every 30 min
connection.bad_actions = 0
# Internet outage detection
if time.time() - last_message_time > max(60, 60*10/max(1,float(len(self.connections))/50)):
# Offline: Last message more than 60-600sec depending on connection number
if self.has_internet:
self.has_internet = False
self.onInternetOffline()
else:
# Online
if not self.has_internet:
self.has_internet = True
self.onInternetOnline()
if time.time() - s > 0.01:
self.log.debug("Connection cleanup in %.3fs" % (time.time() - s))
def onInternetOnline(self):
self.log.info("Internet online")
def onInternetOffline(self):
self.log.info("Internet offline")