forked from p2pool/p2pool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupnp.py
530 lines (438 loc) · 18.5 KB
/
upnp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
"""
This module is the heart of the upnp support. Device discover, ip discovery
and port mappings are implemented here.
@author: Raphael Slinckx
@author: Anthony Baxter
@copyright: Copyright 2005
@license: LGPL
@contact: U{[email protected]<mailto:[email protected]>}
@version: 0.1.0
"""
__revision__ = "$id"
import socket, random, urlparse, logging
from twisted.internet import reactor, defer
from twisted.web import client
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.python import failure
from nattraverso.pynupnp.soap import SoapProxy
from nattraverso.pynupnp.upnpxml import UPnPXml
from nattraverso import ipdiscover, portmapper
class UPnPError(Exception):
"""
A generic UPnP error, with a descriptive message as content.
"""
pass
class UPnPMapper(portmapper.NATMapper):
"""
This is the UPnP port mapper implementing the
L{NATMapper<portmapper.NATMapper>} interface.
@see: L{NATMapper<portmapper.NATMapper>}
"""
def __init__(self, upnp):
"""
Creates the mapper, with the given L{UPnPDevice} instance.
@param upnp: L{UPnPDevice} instance
"""
self._mapped = {}
self._upnp = upnp
def map(self, port):
"""
See interface
"""
self._check_valid_port(port)
#Port is already mapped
if port in self._mapped:
return defer.succeed(self._mapped[port])
#Trigger a new mapping creation, first fetch local ip.
result = ipdiscover.get_local_ip()
self._mapped[port] = result
return result.addCallback(self._map_got_local_ip, port)
def info(self, port):
"""
See interface
"""
# If the mapping exists, everything's ok
if port in self._mapped:
return self._mapped[port]
else:
raise ValueError('Port %r is not currently mapped'%(port))
def unmap(self, port):
"""
See interface
"""
if port in self._mapped:
existing = self._mapped[port]
#Pending mapping, queue an unmap,return existing deferred
if type(existing) is not tuple:
existing.addCallback(lambda x: self.unmap(port))
return existing
#Remove our local mapping
del self._mapped[port]
#Ask the UPnP to remove the mapping
extaddr, extport = existing
return self._upnp.remove_port_mapping(extport, port.getHost().type)
else:
raise ValueError('Port %r is not currently mapped'%(port))
def get_port_mappings(self):
"""
See interface
"""
return self._upnp.get_port_mappings()
def _map_got_local_ip(self, ip_result, port):
"""
We got the local ip address, retreive the existing port mappings
in the device.
@param ip_result: result of L{ipdiscover.get_local_ip}
@param port: a L{twisted.internet.interfaces.IListeningPort} we
want to map
"""
local, ip = ip_result
return self._upnp.get_port_mappings().addCallback(
self._map_got_port_mappings, ip, port)
def _map_got_port_mappings(self, mappings, ip, port):
"""
We got all the existing mappings in the device, find an unused one
and assign it for the requested port.
@param ip: The local ip of this host "x.x.x.x"
@param port: a L{twisted.internet.interfaces.IListeningPort} we
want to map
@param mappings: result of L{UPnPDevice.get_port_mappings}
"""
#Get the requested mapping's info
ptype = port.getHost().type
intport = port.getHost().port
for extport in [random.randrange(1025, 65536) for val in range(20)]:
# Check if there is an existing mapping, if it does not exist, bingo
if not (ptype, extport) in mappings:
break
if (ptype, extport) in mappings:
existing = mappings[ptype, extport]
local_ip, local_port = existing
if local_ip == ip and local_port == intport:
# Existing binding for this host/port/proto - replace it
break
# Triggers the creation of the mapping on the device
result = self._upnp.add_port_mapping(ip, intport, extport, 'pynupnp', ptype)
# We also need the external IP, so we queue first an
# External IP Discovery, then we add the mapping.
return result.addCallback(
lambda x: self._upnp.get_external_ip()).addCallback(
self._port_mapping_added, extport, port)
def _port_mapping_added(self, extaddr, extport, port):
"""
The port mapping was added in the device, this means::
Internet NAT LAN
|
> IP:extaddr |> IP:local ip
> Port:extport |> Port:port
|
@param extaddr: The exernal ip address
@param extport: The external port as number
@param port: The internal port as a
L{twisted.internet.interfaces.IListeningPort} object, that has been
mapped
"""
self._mapped[port] = (extaddr, extport)
return (extaddr, extport)
class UPnPDevice:
"""
Represents an UPnP device, with the associated infos, and remote methods.
"""
def __init__(self, soap_proxy, info):
"""
Build the device, with the given SOAP proxy, and the meta-infos.
@param soap_proxy: an initialized L{SoapProxy} to the device
@param info: a dictionnary of various infos concerning the
device extracted with L{UPnPXml}
"""
self._soap_proxy = soap_proxy
self._info = info
def get_external_ip(self):
"""
Triggers an external ip discovery on the upnp device. Returns
a deferred called with the external ip of this host.
@return: A deferred called with the ip address, as "x.x.x.x"
@rtype: L{twisted.internet.defer.Deferred}
"""
result = self._soap_proxy.call('GetExternalIPAddress')
result.addCallback(self._on_external_ip)
return result
def get_port_mappings(self):
"""
Retreive the existing port mappings
@see: L{portmapper.NATMapper.get_port_mappings}
@return: A deferred called with the dictionnary as defined
in the interface L{portmapper.NATMapper.get_port_mappings}
@rtype: L{twisted.internet.defer.Deferred}
"""
return self._get_port_mapping()
def add_port_mapping(self, local_ip, intport, extport, desc, proto, lease=0):
"""
Add a port mapping in the upnp device. Returns a deferred.
@param local_ip: the LAN ip of this host as "x.x.x.x"
@param intport: the internal port number
@param extport: the external port number
@param desc: the description of this mapping (string)
@param proto: "UDP" or "TCP"
@param lease: The duration of the lease in (mili)seconds(??)
@return: A deferred called with None when the mapping is done
@rtype: L{twisted.internet.defer.Deferred}
"""
result = self._soap_proxy.call('AddPortMapping', NewRemoteHost="",
NewExternalPort=extport,
NewProtocol=proto,
NewInternalPort=intport,
NewInternalClient=local_ip,
NewEnabled=1,
NewPortMappingDescription=desc,
NewLeaseDuration=lease)
return result.addCallbacks(self._on_port_mapping_added,
self._on_no_port_mapping_added)
def remove_port_mapping(self, extport, proto):
"""
Remove an existing port mapping on the device. Returns a deferred
@param extport: the external port number associated to the mapping
to be removed
@param proto: either "UDP" or "TCP"
@return: A deferred called with None when the mapping is done
@rtype: L{twisted.internet.defer.Deferred}
"""
result = self._soap_proxy.call('DeletePortMapping', NewRemoteHost="",
NewExternalPort=extport,
NewProtocol=proto)
return result.addCallbacks(self._on_port_mapping_removed,
self._on_no_port_mapping_removed)
# Private --------
def _on_external_ip(self, res):
"""
Called when we received the external ip address from the device.
@param res: the SOAPpy structure of the result
@return: the external ip string, as "x.x.x.x"
"""
logging.debug("Got external ip struct: %r", res)
return res['NewExternalIPAddress']
def _get_port_mapping(self, mapping_id=0, mappings=None):
"""
Fetch the existing mappings starting at index
"mapping_id" from the device.
To retreive all the mappings call this without parameters.
@param mapping_id: The index of the mapping to start fetching from
@param mappings: the dictionnary of already fetched mappings
@return: A deferred called with the existing mappings when all have been
retreived, see L{get_port_mappings}
@rtype: L{twisted.internet.defer.Deferred}
"""
if mappings == None:
mappings = {}
result = self._soap_proxy.call('GetGenericPortMappingEntry',
NewPortMappingIndex=mapping_id)
return result.addCallbacks(
lambda x: self._on_port_mapping_received(x, mapping_id+1, mappings),
lambda x: self._on_no_port_mapping_received( x, mappings))
def _on_port_mapping_received(self, response, mapping_id, mappings):
"""
Called we we receive a single mapping from the device.
@param response: a SOAPpy structure, representing the device's answer
@param mapping_id: The index of the next mapping in the device
@param mappings: the already fetched mappings, see L{get_port_mappings}
@return: A deferred called with the existing mappings when all have been
retreived, see L{get_port_mappings}
@rtype: L{twisted.internet.defer.Deferred}
"""
logging.debug("Got mapping struct: %r", response)
mappings[
response['NewProtocol'], response['NewExternalPort']
] = (response['NewInternalClient'], response['NewInternalPort'])
return self._get_port_mapping(mapping_id, mappings)
def _on_no_port_mapping_received(self, failure, mappings):
"""
Called when we have no more port mappings to retreive, or an
error occured while retreiving them.
Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
it just means we have finished. If it returns some other error, then we
fail with an UPnPError.
@param mappings: the already retreived mappings
@param failure: the failure
@return: The existing mappings as defined in L{get_port_mappings}
@raise UPnPError: When we got any other error
than "SpecifiedArrayIndexInvalid"
"""
logging.debug("_on_no_port_mapping_received: %s", failure)
err = failure.value
message = err.args[0]["UPnPError"]["errorDescription"]
if "SpecifiedArrayIndexInvalid" == message:
return mappings
else:
return failure
def _on_port_mapping_added(self, response):
"""
The port mapping was successfully added, return None to the deferred.
"""
return None
def _on_no_port_mapping_added(self, failure):
"""
Called when the port mapping could not be added. Immediately
raise an UPnPError, with the SOAPpy structure inside.
@raise UPnPError: When the port mapping could not be added
"""
return failure
def _on_port_mapping_removed(self, response):
"""
The port mapping was successfully removed, return None to the deferred.
"""
return None
def _on_no_port_mapping_removed(self, failure):
"""
Called when the port mapping could not be removed. Immediately
raise an UPnPError, with the SOAPpy structure inside.
@raise UPnPError: When the port mapping could not be deleted
"""
return failure
# UPNP multicast address, port and request string
_UPNP_MCAST = '239.255.255.250'
_UPNP_PORT = 1900
_UPNP_SEARCH_REQUEST = """M-SEARCH * HTTP/1.1\r
Host:%s:%s\r
ST:urn:schemas-upnp-org:device:InternetGatewayDevice:1\r
Man:"ssdp:discover"\r
MX:3\r
\r
""" % (_UPNP_MCAST, _UPNP_PORT)
class UPnPProtocol(DatagramProtocol, object):
"""
The UPnP Device discovery udp multicast twisted protocol.
"""
def __init__(self, *args, **kwargs):
"""
Init the protocol, no parameters needed.
"""
super(UPnPProtocol, self).__init__(*args, **kwargs)
#Device discovery deferred
self._discovery = None
self._discovery_timeout = None
self.mcast = None
self._done = False
# Public methods
def search_device(self):
"""
Triggers a UPnP device discovery.
The returned deferred will be called with the L{UPnPDevice} that has
been found in the LAN.
@return: A deferred called with the detected L{UPnPDevice} instance.
@rtype: L{twisted.internet.defer.Deferred}
"""
if self._discovery is not None:
raise ValueError('already used')
self._discovery = defer.Deferred()
self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
attempt = 0
mcast = None
while True:
try:
self.mcast = reactor.listenMulticast(1900+attempt, self)
break
except CannotListenError:
attempt = random.randint(0, 500)
# joined multicast group, starting upnp search
self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
return self._discovery
#Private methods
def datagramReceived(self, dgram, address):
if self._done:
return
"""
This is private, handle the multicast answer from the upnp device.
"""
logging.debug("Got UPNP multicast search answer:\n%s", dgram)
#This is an HTTP response
response, message = dgram.split('\r\n', 1)
# Prepare status line
version, status, textstatus = response.split(None, 2)
if not version.startswith('HTTP'):
return
if status != "200":
return
# Launch the info fetching
def parse_discovery_response(message):
"""Separate headers and body from the received http answer."""
hdict = {}
body = ''
remaining = message
while remaining:
line, remaining = remaining.split('\r\n', 1)
line = line.strip()
if not line:
body = remaining
break
key, val = line.split(':', 1)
key = key.lower()
hdict.setdefault(key, []).append(val.strip())
return hdict, body
headers, body = parse_discovery_response(message)
if not 'location' in headers:
self._on_discovery_failed(
UPnPError(
"No location header in response to M-SEARCH!: %r"%headers))
return
loc = headers['location'][0]
result = client.getPage(url=loc)
result.addCallback(self._on_gateway_response, loc).addErrback(self._on_discovery_failed)
def _on_gateway_response(self, body, loc):
if self._done:
return
"""
Called with the UPnP device XML description fetched via HTTP.
If the device has suitable services for ip discovery and port mappings,
the callback returned in L{search_device} is called with
the discovered L{UPnPDevice}.
@raise UPnPError: When no suitable service has been
found in the description, or another error occurs.
@param body: The xml description of the device.
@param loc: the url used to retreive the xml description
"""
# Parse answer
upnpinfo = UPnPXml(body)
# Check if we have a base url, if not consider location as base url
urlbase = upnpinfo.urlbase
if urlbase == None:
urlbase = loc
# Check the control url, if None, then the device cannot do what we want
controlurl = upnpinfo.controlurl
if controlurl == None:
self._on_discovery_failed(UPnPError("upnp response showed no WANConnections"))
return
control_url2 = urlparse.urljoin(urlbase, controlurl)
soap_proxy = SoapProxy(control_url2, upnpinfo.wanservice)
self._on_discovery_succeeded(UPnPDevice(soap_proxy, upnpinfo.deviceinfos))
def _on_discovery_succeeded(self, res):
if self._done:
return
self._done = True
self.mcast.stopListening()
self._discovery_timeout.cancel()
self._discovery.callback(res)
def _on_discovery_failed(self, err):
if self._done:
return
self._done = True
self.mcast.stopListening()
self._discovery_timeout.cancel()
self._discovery.errback(err)
def _on_discovery_timeout(self):
if self._done:
return
self._done = True
self.mcast.stopListening()
self._discovery.errback(failure.Failure(defer.TimeoutError('in _on_discovery_timeout')))
def search_upnp_device ():
"""
Check the network for an UPnP device. Returns a deferred
with the L{UPnPDevice} instance as result, if found.
@return: A deferred called with the L{UPnPDevice} instance
@rtype: L{twisted.internet.defer.Deferred}
"""
return defer.maybeDeferred(UPnPProtocol().search_device)