Skip to content

Commit

Permalink
p2p: Misc fixes on discovery/kademlia
Browse files Browse the repository at this point in the history
- Fix Node.from_uri() and host_port_pubkey_from_uri(), which broke after
the last eth_utils update
- Change wait_pong() so that it takes a node/token and constructs the
pingid itself; that way the logged msgs can be more useful when things
go wrong
- Change a few method parameters annotated with AnyStr to bytes
- Better debug logging
- Remove some commented out code
- Move bootnode URIs to constants.py
  • Loading branch information
gsalgado committed Mar 5, 2018
1 parent 3784a87 commit 55b88bb
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 83 deletions.
14 changes: 14 additions & 0 deletions p2p/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,17 @@
# Types of LES Announce messages
LES_ANNOUNCE_SIMPLE = 1
LES_ANNOUNCE_SIGNED = 2

MAINNET_BOOTNODES = [
'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501
'enode://aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f@13.93.211.84:30303', # noqa: E501
'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501
'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501
'enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303', # noqa: E501
]
ROPSTEN_BOOTNODES = [
'enode://30b7ab30a01c124a6cceca36863ece12c4f5fa68e3ba9b0b51407ccc002eeed3b3102d20a88f1c1d3c3154e2449317b8ef95090e77b312d5cc39354f86d5d606@52.176.7.10:30303', # noqa: E501
'enode://865a63255b3bb68023b6bffd5095118fcc13e79dcf014fe4e47e065c350c7cc72af2e53eff895f11ba1bbb6a2b33271c1116ee870f266618eadfc2e78aa7349c@52.176.100.77:30303', # noqa: E501
'enode://6332792c4a00e3e4ee0926ed89e0d27ef985424d97b6a45bf0f23e51f0dcb5e66b875777506458aea7af6f9e4ffb69f43f3778ee73c81ed9d34c51c4b16b0b0f@52.232.243.152:30303', # noqa: E501
'enode://94c15d1b9e2fe7ce56e458b9a3b672ef11894ddedd0c6f247e0f1d3487f52b66208fb4aeb8179fce6e3a749ea93ed147c37976d67af557508d199d9594c35f09@192.81.208.223:30303', # noqa: E501
]
65 changes: 21 additions & 44 deletions p2p/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import rlp
from eth_utils import (
decode_hex,
encode_hex,
keccak,
to_bytes,
to_list,
Expand Down Expand Up @@ -78,10 +78,10 @@ class DiscoveryProtocol(asyncio.DatagramProtocol):
_max_neighbours_per_packet_cache = None

def __init__(self, privkey: datatypes.PrivateKey, address: kademlia.Address,
bootstrap_nodes: List[kademlia.Node]) -> None:
bootstrap_nodes: List[str]) -> None:
self.privkey = privkey
self.address = address
self.bootstrap_nodes = bootstrap_nodes
self.bootstrap_nodes = [kademlia.Node.from_uri(node) for node in bootstrap_nodes]
self.this_node = kademlia.Node(self.pubkey, address)
self.kademlia = kademlia.KademliaProtocol(self.this_node, wire=self)

Expand Down Expand Up @@ -143,7 +143,7 @@ def receive(self, address: kademlia.Address, message: AnyStr) -> None:
try:
remote_pubkey, cmd_id, payload, message_hash = _unpack(message)
except DefectiveMessage as e:
self.logger.error('error unpacking message: %s', e)
self.logger.error('error unpacking message (%s) from %s: %s', message, address, e)
return

# As of discovery version 4, expiration is the last element for all packets, so
Expand Down Expand Up @@ -182,13 +182,14 @@ def recv_find_node(self, node: kademlia.Node, payload: List[Any], _: AnyStr) ->
self.kademlia.recv_find_node(node, big_endian_to_int(node_id))

def send_ping(self, node: kademlia.Node) -> bytes:
self.logger.debug('>>> pinging %s', node)
version = rlp.sedes.big_endian_int.serialize(PROTO_VERSION)
payload = [version, self.address.to_endpoint(), node.address.to_endpoint()]
message = _pack(CMD_PING.id, payload, self.privkey)
self.send(node, message)
# Return the msg hash, which is used as a token to identify pongs.
return message[:MAC_SIZE]
token = message[:MAC_SIZE]
self.logger.debug('>>> ping %s (token == %s)', node, encode_hex(token))
return token

def send_find_node(self, node: kademlia.Node, target_node_id: int) -> None:
target_node_id = int_to_big_endian(
Expand All @@ -198,7 +199,7 @@ def send_find_node(self, node: kademlia.Node, target_node_id: int) -> None:
self.send(node, message)

def send_pong(self, node: kademlia.Node, token: AnyStr) -> None:
self.logger.debug('>>> ponging %s', node)
self.logger.debug('>>> pong %s', node)
payload = [node.address.to_endpoint(), token]
message = _pack(CMD_PONG.id, payload, self.privkey)
self.send(node, message)
Expand Down Expand Up @@ -265,7 +266,7 @@ def _unpack(message: AnyStr) -> Tuple[datatypes.PublicKey, int, List[Any], AnySt
"""
message_hash = message[:MAC_SIZE]
if message_hash != keccak(message[MAC_SIZE:]):
raise WrongMAC()
raise WrongMAC("Wrong msg mac")
signature = keys.Signature(message[MAC_SIZE:HEAD_SIZE])
signed_data = message[HEAD_SIZE:]
remote_pubkey = signature.recover_public_key_from_msg(signed_data)
Expand All @@ -278,59 +279,35 @@ def _unpack(message: AnyStr) -> Tuple[datatypes.PublicKey, int, List[Any], AnySt


def _test():
# async def show_tasks():
# while True:
# tasks = []
# for task in asyncio.Task.all_tasks():
# if task._coro.__name__ != "show_tasks":
# tasks.append(task._coro.__name__)
# if tasks:
# logger.debug("Active tasks: %s", tasks)
# await asyncio.sleep(3)

privkey_hex = '65462b0520ef7d3df61b9992ed3bea0c56ead753be7c8b3614e0ce01e4cac41b'
listen_host = '0.0.0.0'
listen_port = 30303
bootstrap_uris = [
# Local geth bootnodes
# b'enode://3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30301', # noqa: E501
# Testnet bootnodes
# b'enode://6ce05930c72abc632c58e2e4324f7c7ea478cec0ed4fa2528982cf34483094e9cbc9216e7aa349691242576d552a2a56aaeae426c5303ded677ce455ba1acd9d@13.84.180.240:30303', # noqa: E501
# b'enode://20c9ad97c081d63397d7b685a412227a40e23c8bdc6688c6f37e97cfbc22d2b4d1db1510d8f61e6a8866ad7f0e17c02b14182d37ea7c3c8b9c2683aeb6b733a1@52.169.14.227:30303', # noqa: E501
# Mainnet bootnodes
# b'enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303', # noqa: E501
# b'enode://3f1d12044546b76342d59d4a05532c14b85aa669704bfe1f864fe079415aa2c02d743e03218e57a33fb94523adb54032871a6c51b2cc5514cb7c7e35b3ed0a99@13.93.211.84:30303', # noqa: E501
b'enode://78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d@191.235.84.50:30303', # noqa: E501
b'enode://158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6@13.75.154.138:30303', # noqa: E501
b'enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303', # noqa: E501
]

# logger = logging.getLogger("p2p.discovery")
from p2p import constants
from p2p import ecies

logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(message)s')

loop = asyncio.get_event_loop()
loop.set_debug(True)

privkey = keys.PrivateKey(decode_hex(privkey_hex))
listen_host = '0.0.0.0'
# Listen on a port other than 30303 in case we want to test against a local geth instance
# running on that port.
listen_port = 30301
privkey = ecies.generate_privkey()
addr = kademlia.Address(listen_host, listen_port, listen_port)
bootstrap_nodes = [kademlia.Node.from_uri(x) for x in bootstrap_uris]
discovery = DiscoveryProtocol(privkey, addr, bootstrap_nodes)
discovery = DiscoveryProtocol(privkey, addr, constants.MAINNET_BOOTNODES)
# local_bootnodes = [
# 'enode://0x3a514176466fa815ed481ffad09110a2d344f6c9b78c1d14afc351c3a51be33d8072e77939dc03ba44790779b7a1025baf3003f6732430e20cd9b76d953391b3@127.0.0.1:30303'] # noqa: E501
# discovery = DiscoveryProtocol(privkey, addr, local_bootnodes)
loop.run_until_complete(discovery.listen(loop))

# There's no need to wait for bootstrap because we run_forever().
asyncio.ensure_future(discovery.bootstrap())

# This helps when debugging asyncio issues.
# task_monitor = asyncio.ensure_future(show_tasks())

try:
loop.run_forever()
except KeyboardInterrupt:
pass

# task_monitor.set_result(None)
discovery.stop()
# logger.info("Pending tasks at exit: %s", asyncio.Task.all_tasks(loop))
loop.close()


Expand Down
45 changes: 17 additions & 28 deletions p2p/kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
decode_hex,
encode_hex,
keccak,
to_bytes,
text_if_str,
)

from eth_keys import (
Expand Down Expand Up @@ -101,10 +99,10 @@ def __init__(self, pubkey: datatypes.PublicKey, address: Address) -> None:
self.id = big_endian_to_int(keccak(pubkey.to_bytes()))

@classmethod
def from_uri(cls, uri: bytes) -> 'Node':
ip, port, pubkey_bytes = host_port_pubkey_from_uri(uri)
pubkey = keys.PublicKey(pubkey_bytes)
return cls(pubkey, Address(ip.decode(), port))
def from_uri(cls, uri: str) -> 'Node':
parsed = urlparse.urlparse(uri)
pubkey = keys.PublicKey(decode_hex(parsed.username))
return cls(pubkey, Address(parsed.hostname, parsed.port))

def __repr__(self):
return '<Node(%s@%s)>' % (self.pubkey.to_hex()[:6], self.address.ip)
Expand Down Expand Up @@ -331,22 +329,20 @@ def recv_neighbours(self, remote: Node, neighbours: List[Node]) -> None:
self.logger.debug(
'unexpected neighbours from %s, probably came too late', remote)

def recv_pong(self, remote: Node, token: AnyStr) -> None:
def recv_pong(self, remote: Node, token: bytes) -> None:
"""Process a pong packet.
Pong packets should only be received as a response to a ping, so the actual processing is
left to the callback from pong_callbacks, which is added (and removed after it's done
or timed out) in wait_pong().
"""
self.logger.debug('<<< pong from %s', remote)
self.logger.debug('<<< pong from %s (token == %s)', remote, encode_hex(token))
pingid = self._mkpingid(token, remote)
callback = self.pong_callbacks.get(pingid)
if callback is not None:
callback()
else:
self.logger.debug(
'unexpected pong from %s with pingid %s, probably came too late',
remote, encode_hex(pingid))
self.logger.debug('unexpected pong from %s (token == %s)', remote, encode_hex(token))

def recv_ping(self, remote: Node, hash_: AnyStr) -> None:
"""Process a received ping packet.
Expand Down Expand Up @@ -408,13 +404,14 @@ async def wait_ping(self, remote: Node) -> bool:
del self.ping_callbacks[remote]
return got_ping

async def wait_pong(self, pingid: bytes) -> bool:
"""Wait for a pong with the given pingid.
async def wait_pong(self, remote: Node, token: bytes) -> bool:
"""Wait for a pong from the given remote containing the given token.
This coroutine adds a callback to pong_callbacks and yields control until that callback is
called or a timeout (k_request_timeout) occurs. At that point it returns whether or not
a pong was received with the given pingid.
"""
pingid = self._mkpingid(token, remote)
if pingid in self.pong_callbacks:
raise AlreadyWaiting(
"There's another coroutine waiting for a pong packet with id {}".format(pingid))
Expand All @@ -424,10 +421,10 @@ async def wait_pong(self, pingid: bytes) -> bool:
got_pong = False
try:
got_pong = await asyncio.wait_for(event.wait(), k_request_timeout)
self.logger.debug('got expected pong with pingid %s', encode_hex(pingid))
self.logger.debug('got expected pong with token %s', encode_hex(token))
except asyncio.futures.TimeoutError:
self.logger.debug(
'timed out waiting for pong with pingid %s', encode_hex(pingid))
'timed out waiting for pong from %s (token == %s)', remote, encode_hex(token))
# TODO: Use a contextmanager to ensure we always delete the callback from the list.
del self.pong_callbacks[pingid]
return got_pong
Expand Down Expand Up @@ -466,9 +463,7 @@ def process(response):
def ping(self, node: Node) -> bytes:
if node == self.this_node:
raise ValueError("Cannot ping self")
token = self.wire.send_ping(node)
pingid = self._mkpingid(token, node)
return pingid
return self.wire.send_ping(node)

async def bond(self, node: Node) -> bool:
"""Bond with the given node.
Expand All @@ -479,9 +474,9 @@ async def bond(self, node: Node) -> bool:
if node in self.routing:
return True

pingid = self.ping(node)
token = self.ping(node)

got_pong = await self.wait_pong(pingid)
got_pong = await self.wait_pong(node, token)
if not got_pong:
self.logger.debug("bonding failed, didn't receive pong from %s", node)
# Drop the failing node and schedule a populate_not_full_buckets() call to try and
Expand Down Expand Up @@ -559,9 +554,8 @@ def refresh_idle_buckets(self):
rid = random.randint(bucket.start, bucket.end)
asyncio.ensure_future(self.lookup(rid))

def _mkpingid(self, token: AnyStr, node: Node) -> bytes:
pid = text_if_str(to_bytes, token) + node.pubkey.to_bytes()
return pid
def _mkpingid(self, token: bytes, node: Node) -> bytes:
return token + node.pubkey.to_bytes()

async def populate_not_full_buckets(self):
"""Go through all buckets that are not full and try to fill them.
Expand Down Expand Up @@ -594,8 +588,3 @@ def to_binary(x): # left padded bit representation

def sort_by_distance(nodes: List[Node], target_id: int) -> List[Node]:
return sorted(nodes, key=operator.methodcaller('distance_to', target_id))


def host_port_pubkey_from_uri(uri: bytes) -> Tuple[bytes, int, bytes]:
parsed = urlparse.urlparse(uri)
return cast(bytes, parsed.hostname), parsed.port, cast(bytes, decode_hex(parsed.username))
33 changes: 22 additions & 11 deletions tests/p2p/test_kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,29 @@ async def test_wait_ping(echo):


@pytest.mark.asyncio
@pytest.mark.parametrize('echoed', ['echoed', b'echoed'])
async def test_wait_pong(echoed):
async def test_wait_pong():
proto = get_wired_protocol()
node = random_node()
pingid = proto._mkpingid(echoed, node)

token = b'token'
# Schedule a call to proto.recv_pong() simulating a pong from the node we expect.
recv_pong_coroutine = asyncio.coroutine(lambda: proto.recv_pong(node, echoed))
recv_pong_coroutine = asyncio.coroutine(lambda: proto.recv_pong(node, token))
asyncio.ensure_future(recv_pong_coroutine())

got_pong = await proto.wait_pong(pingid)
got_pong = await proto.wait_pong(node, token)

assert got_pong
# Ensure wait_pong() cleaned up after itself.
pingid = proto._mkpingid(token, node)
assert pingid not in proto.pong_callbacks

# If the remote node echoed something different than what we expected, wait_pong() would
# timeout.
wrong_echo = "foo"
recv_pong_coroutine = asyncio.coroutine(lambda: proto.recv_pong(node, wrong_echo))
wrong_token = b"foo"
recv_pong_coroutine = asyncio.coroutine(lambda: proto.recv_pong(node, wrong_token))
asyncio.ensure_future(recv_pong_coroutine())

got_pong = await proto.wait_pong(pingid)
got_pong = await proto.wait_pong(node, token)

assert not got_pong
assert pingid not in proto.pong_callbacks
Expand Down Expand Up @@ -126,12 +126,12 @@ async def test_bond():
proto = get_wired_protocol()
node = random_node()

token = b'token'
# Do not send pings, instead simply return the pingid we'd expect back together with the pong.
proto.ping = lambda remote: proto._mkpingid("echoed", remote)
proto.ping = lambda remote: token

# Pretend we get a pong from the node we are bonding with.
expected_pingid = proto._mkpingid("echoed", node)
proto.wait_pong = asyncio.coroutine(lambda pingid: pingid == expected_pingid)
proto.wait_pong = asyncio.coroutine(lambda n, t: t == token and n == node)

bonded = await proto.bond(node)

Expand All @@ -144,6 +144,17 @@ async def test_bond():
assert not bonded


def test_node_from_uri():
pubkey = 'a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c' # noqa: E501
ip = '52.16.188.185'
port = 30303
uri = 'enode://%s@%s:%d' % (pubkey, ip, port)
node = kademlia.Node.from_uri(uri)
assert node.address.ip == ip
assert node.address.udp_port == node.address.tcp_port == port
assert node.pubkey.to_hex() == '0x' + pubkey


def test_update_routing_table():
proto = get_wired_protocol()
node = random_node()
Expand Down

0 comments on commit 55b88bb

Please sign in to comment.