diff --git a/node/src/new_beacon/mod.rs b/node/src/new_beacon/mod.rs index df467a63fe..26ea8f7d0b 100644 --- a/node/src/new_beacon/mod.rs +++ b/node/src/new_beacon/mod.rs @@ -254,6 +254,12 @@ impl> Beacon { Ok(()) } + async fn process_peer_response(&self, source: SocketAddr, message: PeerResponse) -> anyhow::Result<()> { + self.router().insert_candidate_peers(&message.peers); + + Ok(()) + } + async fn process_unconfirmed_block(&self, source: SocketAddr, message: UnconfirmedBlock) -> anyhow::Result<()> { let message_clone = message.clone(); @@ -388,7 +394,7 @@ impl> Reading for Beacon { Message::Pong(pong) => todo!(), Message::PeerRequest(peer_request) => self.process_peer_request(source, peer_request).await, - Message::PeerResponse(peer_response) => todo!(), + Message::PeerResponse(peer_response) => self.process_peer_response(source, peer_response).await, Message::UnconfirmedBlock(unconfirmed_block) => { // TODO(nkls): spawn task. diff --git a/node/src/new_beacon/router.rs b/node/src/new_beacon/router.rs index 7c728f9b17..ded6ba9732 100644 --- a/node/src/new_beacon/router.rs +++ b/node/src/new_beacon/router.rs @@ -53,6 +53,21 @@ impl Router { &self.network } + pub fn is_local_addr(&self, addr: SocketAddr) -> bool { + let local_addr = self.network().listening_addr().expect("listening addr must be present"); + addr == local_addr + || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == local_addr.port() + } + + pub fn is_connected(&self, addr: SocketAddr) -> bool { + self.network().is_connected(addr) + || self.current_peers.read().iter().find(|(_, meta)| meta.listening_addr() == addr).is_some() + } + + pub fn is_restricted(&self, addr: SocketAddr) -> bool { + self.restricted_peers.read().contains_key(&addr) + } + pub fn insert_peer(&self, addr: SocketAddr, meta: PeerMeta) { self.current_peers.write().insert(addr, meta); } @@ -77,6 +92,18 @@ impl Router { self.candidate_peers.write().remove(&addr); } + pub fn insert_candidate_peers(&self, addrs: &[SocketAddr]) { + let candidate_peers = self.candidate_peers.write(); + + for &addr in addrs { + if self.is_local_addr(addr) || self.is_connected(addr) || self.is_restricted(addr) { + continue; + } + + self.candidate_peers.write().insert(addr); + } + } + pub fn insert_restricted_peer(&self, addr: SocketAddr) { self.restricted_peers.write().insert(addr, OffsetDateTime::now_utc()); }