Skip to content

Commit

Permalink
Refactor code in connection.py.
Browse files Browse the repository at this point in the history
* Drop unused attributes _peer_alive and responses_received.
* Make failure cases stand out in send, send_raw and start_handshake.
* Drop PeerListener.stop.
  * Do not destroy global context arbitrarily.
  * Turn TransportLayer.shutdown into abstract method.
* Straighten PeerListener.listen.
* Make default callbacks equal to None.
* Catch ValueErrors produced by json.loads.
* Assert the sender's guid has been initialized prior to sending.
* Remove unused import.
  • Loading branch information
Renelvon committed Oct 20, 2014
1 parent 94ce759 commit 588b31e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 113 deletions.
179 changes: 78 additions & 101 deletions node/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def __init__(self, transport, address, nickname=""):
self.transport = transport
self.address = address
self.nickname = nickname
self.responses_received = {}

# Establishing a ZeroMQ stream object
self.ctx = transport.ctx
Expand Down Expand Up @@ -58,35 +57,25 @@ def close_socket(self):
def send(self, data, callback):
self.send_raw(json.dumps(data), callback)

def send_raw(self, serialized, callback=lambda msg: None):

def send_raw(self, serialized, callback=None):
compressed_data = zlib.compress(serialized, 9)
self.stream.send(compressed_data)

try:

self.stream.send(compressed_data)

def cb(stream, msg):
def cb(stream, msg):
try:
response = json.loads(msg[0])
self.log.debug('[send_raw] %s', pformat(response))

# Update active peer info

if 'senderNick' in response and\
response['senderNick'] != self.nickname:
self.nickname = response['senderNick']

if callback is not None:
self.log.debug('%s', msg)
callback(msg)
except ValueError:
self.log.error('[send_raw] Bad JSON response: %s', msg[0])
return
self.log.debug('[send_raw] %s', pformat(response))

self.stream.on_recv_stream(cb)
# Update active peer info
self.nickname = response.get('senderNick', self.nickname)
if callback is not None:
self.log.debug('%s', msg)
callback(msg)

except Exception as e:
self.log.error(e)
# Shouldn't we raise the exception here?
# I think not doing this could cause buggy behavior on top.
raise
self.stream.on_recv_stream(cb)


class CryptoPeerConnection(GUIDMixin, PeerConnection):
Expand All @@ -105,45 +94,45 @@ def __init__(self, transport, address, pub=None, guid=None, nickname="",
self.port = url.port

self.sin = sin
self._peer_alive = False # unused; might remove later if unnecessary
self.address = "tcp://%s:%s" % (self.ip, self.port)

def start_handshake(self, handshake_cb=None):

def cb(msg, handshake_cb=None):
if msg:
if not msg:
return

self.log.debug('ALIVE PEER %s', msg[0])
msg = msg[0]
self.log.debug('ALIVE PEER %s', msg[0])
msg = msg[0]
try:
msg = json.loads(msg)
except ValueError:
self.log.error('[start_handshake] Bad JSON response: %s', msg)
return

# Update Information
self.guid = msg['senderGUID']
self.sin = self.generate_sin(self.guid)
self.pub = msg['pubkey']
self.nickname = msg['senderNick']

self._peer_alive = True

# Add this peer to active peers list
for idx, peer in enumerate(self.transport.dht.activePeers):
if peer.guid == self.guid or\
peer.address == self.address:
self.transport.dht.activePeers[idx] = self
self.transport.dht.add_peer(
self.transport,
self.address,
self.pub,
self.guid,
self.nickname
)
return
# Update Information
self.guid = msg['senderGUID']
self.sin = self.generate_sin(self.guid)
self.pub = msg['pubkey']
self.nickname = msg['senderNick']

# Add this peer to active peers list
for idx, peer in enumerate(self.transport.dht.activePeers):
if peer.guid == self.guid or peer.address == self.address:
self.transport.dht.activePeers[idx] = self
self.transport.dht.add_peer(
self.transport,
self.address,
self.pub,
self.guid,
self.nickname
)
return

self.transport.dht.activePeers.append(self)
self.transport.dht.routingTable.addContact(self)
self.transport.dht.activePeers.append(self)
self.transport.dht.routingTable.addContact(self)

if handshake_cb is not None:
handshake_cb()
if handshake_cb is not None:
handshake_cb()

self.send_raw(
json.dumps({
Expand Down Expand Up @@ -178,45 +167,41 @@ def encrypt(self, data):
cryptor = Cryptor(pubkey_hex=self.pub)
return cryptor.encrypt(data)

def send(self, data, callback=lambda msg: None):
def send(self, data, callback=None):
assert self.guid, 'Uninitialized own guid'

if hasattr(self, 'guid'):
# Include sender information and version
data['guid'] = self.guid
data['senderGUID'] = self.transport.guid
data['uri'] = self.transport.uri
data['pubkey'] = self.transport.pubkey
data['senderNick'] = self.transport.nickname
data['v'] = constants.VERSION
if not self.pub:
self.log.warn('There is no public key for encryption')
return

self.log.debug(
'Sending to peer: %s %s',
self.ip, pformat(data)
)
# Include sender information and version
data['guid'] = self.guid
data['senderGUID'] = self.transport.guid
data['uri'] = self.transport.uri
data['pubkey'] = self.transport.pubkey
data['senderNick'] = self.transport.nickname
data['v'] = constants.VERSION

if self.pub == '':
self.log.info('There is no public key for encryption')
else:
signature = self.sign(json.dumps(data))
self.log.debug('Sending to peer: %s %s', self.ip, pformat(data))

try:
data = self.encrypt(json.dumps(data))
except Exception as e:
self.log.error('Encryption failed. %s', e)
return
jdata = json.dumps(data)
try:
cipher_data = self.encrypt(jdata)
signature = self.sign(jdata)
except Exception as e:
self.log.error('Encryption failed. %s', e)
return

try:
self.send_raw(
json.dumps({
'sig': signature.encode('hex'),
'data': data.encode('hex')
}),
callback
)
except Exception as e:
self.log.error("Was not able to encode empty data: %s", e)
else:
self.log.error('Cannot send to peer')
try:
self.send_raw(
json.dumps({
'sig': signature.encode('hex'),
'data': cipher_data.encode('hex')
}),
callback
)
except Exception as e:
self.log.error("Was not able to encode empty data: %s", e)

def peer_to_tuple(self):
return self.ip, self.port, self.guid
Expand Down Expand Up @@ -280,13 +265,11 @@ def listen(self):
"\n\t$ sudo ifconfig lo0 alias 127.0.0.2",
"\n\n")
raise Exception(error_message)

elif '[' in self.ip:
self.socket.ipv6 = True
self.socket.bind('tcp://[*]:%s' % self.port)
else:
if self.ip.find('[') != -1:
self.socket.ipv6 = True
self.socket.bind('tcp://[*]:%s' % self.port)
else:
self.socket.bind('tcp://*:%s' % self.port)
self.socket.bind('tcp://*:%s' % self.port)

self.stream = zmqstream.ZMQStream(
self.socket, io_loop=ioloop.IOLoop.current()
Expand Down Expand Up @@ -314,12 +297,6 @@ def _on_raw_message(self, serialized):

self._data_cb(msg)

def stop(self):
if self.ctx:
print "PeerListener.stop() destroying zmq socket."
self.ctx.destroy(linger=None)
self.is_listening = False


class CryptoPeerListener(PeerListener):

Expand Down
16 changes: 6 additions & 10 deletions node/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ def valid_peer_uri(self, uri):
return True

def shutdown(self):
if self.listener is not None:
self.listener.stop()
raise NotImplementedError


class CryptoTransportLayer(TransportLayer):
Expand Down Expand Up @@ -692,16 +691,13 @@ def _on_message(self, msg):

def shutdown(self):
print "CryptoTransportLayer.shutdown()!"
try:
TransportLayer.shutdown(self)
print "CryptoTransportLayer.shutdown(): ZMQ sockets destroyed."
except Exception as e:
self.log.error("Transport shutdown error: " + e.message)

print "Notice: explicit DHT Shutdown not implemented."

try:
self.bitmessage_api.close()
except Exception as e:
# might not even be open, not much more we can do on our way out if exception thrown here.
self.log.error("Could not shutdown bitmessage_api's ServerProxy. " + e.message)
# It might not even be open; we can't do much more on our
# way out if exception is thrown here.
self.log.error(
"Could not shutdown bitmessage_api's ServerProxy: %s", e.message
)
2 changes: 0 additions & 2 deletions test/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def setUpClass(cls):
cls.port = 54321
cls.address = cls._mk_address(cls.protocol, cls.hostname, cls.port)
cls.nickname = "OpenBazaar LightYear"
cls.responses_received = {}
cls.pub = "YELLOW SUBMARINE"
cls.timeout = 10
cls.transport = mock.Mock()
Expand All @@ -38,7 +37,6 @@ def test_init(self):
self.assertEqual(self.pc1.transport, self.transport)
self.assertEqual(self.pc1.address, self.address)
self.assertEqual(self.pc1.nickname, self.default_nickname)
self.assertEqual(self.pc1.responses_received, self.responses_received)
self.assertIsNotNone(self.pc1.ctx)

self.assertEqual(self.pc2.nickname, self.nickname)
Expand Down

0 comments on commit 588b31e

Please sign in to comment.