From 05ccf54db0c5209b0ecd197dfe2836cccd7e66c2 Mon Sep 17 00:00:00 2001 From: ShahakShama <70578257+ShahakShama@users.noreply.github.com> Date: Tue, 23 Jul 2024 09:18:47 +0300 Subject: [PATCH] fix(network): if all peers are blocked, act as if there are no peers (#2222) --- Cargo.lock | 1 - crates/papyrus_network/Cargo.toml | 1 - .../src/peer_manager/behaviour_impl.rs | 14 ++- .../papyrus_network/src/peer_manager/mod.rs | 34 +++++++- .../papyrus_network/src/peer_manager/peer.rs | 23 +++-- .../papyrus_network/src/peer_manager/test.rs | 87 ++++++++++++++++--- 6 files changed, 134 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 951bcea195..593ddda598 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6037,7 +6037,6 @@ dependencies = [ "assert_matches", "async-stream", "bytes", - "chrono", "clap", "deadqueue", "defaultmap", diff --git a/crates/papyrus_network/Cargo.toml b/crates/papyrus_network/Cargo.toml index 7000a3b1a3..d0aabc1d50 100644 --- a/crates/papyrus_network/Cargo.toml +++ b/crates/papyrus_network/Cargo.toml @@ -16,7 +16,6 @@ path = "src/bin/streamed_bytes_benchmark.rs" [dependencies] async-stream.workspace = true bytes.workspace = true -chrono.workspace = true defaultmap.workspace = true derive_more.workspace = true futures.workspace = true diff --git a/crates/papyrus_network/src/peer_manager/behaviour_impl.rs b/crates/papyrus_network/src/peer_manager/behaviour_impl.rs index f0b7380e5c..ec5ffc6bf1 100644 --- a/crates/papyrus_network/src/peer_manager/behaviour_impl.rs +++ b/crates/papyrus_network/src/peer_manager/behaviour_impl.rs @@ -1,4 +1,4 @@ -use std::task::Poll; +use std::task::{ready, Poll}; use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::{ @@ -175,9 +175,19 @@ where fn poll( &mut self, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll>> { + if let Some(event) = self.pending_events.pop() { + return Poll::Ready(event); + } + if let Some(sleep_future) = &mut self.sleep_waiting_for_unblocked_peer { + ready!(sleep_future.as_mut().poll(cx)); + for outbound_session_id in std::mem::take(&mut self.sessions_received_when_no_peers) { + self.assign_peer_to_session(outbound_session_id); + } + } + self.sleep_waiting_for_unblocked_peer = None; self.pending_events.pop().map(Poll::Ready).unwrap_or(Poll::Pending) } } diff --git a/crates/papyrus_network/src/peer_manager/mod.rs b/crates/papyrus_network/src/peer_manager/mod.rs index d9c060d7dd..95dd26daa5 100644 --- a/crates/papyrus_network/src/peer_manager/mod.rs +++ b/crates/papyrus_network/src/peer_manager/mod.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; +use std::time::Duration; -use chrono::Duration; +use futures::future::BoxFuture; +use futures::FutureExt; use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::ToSwarm; use libp2p::PeerId; @@ -30,9 +32,11 @@ pub struct PeerManager { session_to_peer_map: HashMap, config: PeerManagerConfig, last_peer_index: usize, + // TODO(shahak): Change to VecDeque and awake when item is added. pending_events: Vec>>, peers_pending_dial_with_sessions: HashMap>, sessions_received_when_no_peers: Vec, + sleep_waiting_for_unblocked_peer: Option>, } #[derive(Clone)] @@ -53,7 +57,11 @@ pub(crate) enum PeerManagerError { impl Default for PeerManagerConfig { fn default() -> Self { - Self { target_num_for_peers: 100, blacklist_timeout: Duration::max_value() } + Self { + target_num_for_peers: 100, + // 1 year. + blacklist_timeout: Duration::from_secs(3600 * 24 * 365), + } } } @@ -72,6 +80,7 @@ where pending_events: Vec::new(), peers_pending_dial_with_sessions: HashMap::new(), sessions_received_when_no_peers: Vec::new(), + sleep_waiting_for_unblocked_peer: None, } } @@ -79,6 +88,8 @@ where info!("Peer Manager found new peer {:?}", peer.peer_id()); peer.set_timeout_duration(self.config.blacklist_timeout); self.peers.insert(peer.peer_id(), peer); + // The new peer is unblocked so we don't need to wait for unblocked peer. + self.sleep_waiting_for_unblocked_peer = None; for outbound_session_id in std::mem::take(&mut self.sessions_received_when_no_peers) { self.assign_peer_to_session(outbound_session_id); } @@ -90,10 +101,12 @@ where } // TODO(shahak): Remove return value and use events in tests. + // TODO(shahak): Split this function for readability. fn assign_peer_to_session(&mut self, outbound_session_id: OutboundSessionId) -> Option { // TODO: consider moving this logic to be async (on a different tokio task) // until then we can return the assignment even if we use events for the notification. if self.peers.is_empty() { + info!("No peers. Waiting for a new peer to be connected for {outbound_session_id:?}"); self.sessions_received_when_no_peers.push(outbound_session_id); return None; } @@ -106,6 +119,23 @@ where self.peers.iter().take(self.last_peer_index).find(|(_, peer)| !peer.is_blocked()) }); self.last_peer_index = (self.last_peer_index + 1) % self.peers.len(); + if peer.is_none() { + info!( + "No unblocked peers. Waiting for a new peer to be connected or for a peer to \ + become unblocked for {outbound_session_id:?}" + ); + self.sessions_received_when_no_peers.push(outbound_session_id); + // Find the peer closest to becoming unblocked. + let sleep_deadline = self + .peers + .values() + .map(|peer| peer.blocked_until()) + .min() + .expect("min should not return None on a non-empty iterator"); + self.sleep_waiting_for_unblocked_peer = + Some(tokio::time::sleep_until(sleep_deadline.into()).boxed()); + return None; + } peer.map(|(peer_id, peer)| { // TODO: consider not allowing reassignment of the same session self.session_to_peer_map.insert(outbound_session_id, *peer_id); diff --git a/crates/papyrus_network/src/peer_manager/peer.rs b/crates/papyrus_network/src/peer_manager/peer.rs index 0d7711869b..0fa6459deb 100644 --- a/crates/papyrus_network/src/peer_manager/peer.rs +++ b/crates/papyrus_network/src/peer_manager/peer.rs @@ -1,6 +1,5 @@ -// using chrono time and not std since std does not have the ability for std::time::Instance to -// represent the maximum time of the system. -use chrono::{DateTime, Duration, Utc}; +use std::time::{Duration, Instant}; + use libp2p::swarm::ConnectionId; use libp2p::{Multiaddr, PeerId}; #[cfg(test)] @@ -23,6 +22,9 @@ pub trait PeerTrait { fn is_blocked(&self) -> bool; + /// Returns Instant::now if not blocked. + fn blocked_until(&self) -> Instant; + fn connection_ids(&self) -> &Vec; fn add_connection_id(&mut self, connection_id: ConnectionId); @@ -34,7 +36,7 @@ pub trait PeerTrait { pub struct Peer { peer_id: PeerId, multiaddr: Multiaddr, - timed_out_until: Option>, + timed_out_until: Option, timeout_duration: Option, connection_ids: Vec, } @@ -52,11 +54,10 @@ impl PeerTrait for Peer { fn update_reputation(&mut self, _reason: ReputationModifier) { if let Some(timeout_duration) = self.timeout_duration { - self.timed_out_until = - Utc::now().checked_add_signed(timeout_duration).or(Some(DateTime::::MAX_UTC)); - return; + self.timed_out_until = Some(Instant::now() + timeout_duration); + } else { + debug!("Timeout duration not set for peer: {:?}", self.peer_id); } - debug!("Timeout duration not set for peer: {:?}", self.peer_id); } fn peer_id(&self) -> PeerId { @@ -73,12 +74,16 @@ impl PeerTrait for Peer { fn is_blocked(&self) -> bool { if let Some(timed_out_until) = self.timed_out_until { - timed_out_until > Utc::now() + timed_out_until > Instant::now() } else { false } } + fn blocked_until(&self) -> Instant { + self.timed_out_until.unwrap_or_else(Instant::now) + } + fn connection_ids(&self) -> &Vec { &self.connection_ids } diff --git a/crates/papyrus_network/src/peer_manager/test.rs b/crates/papyrus_network/src/peer_manager/test.rs index d02cacd866..d4ca95c897 100644 --- a/crates/papyrus_network/src/peer_manager/test.rs +++ b/crates/papyrus_network/src/peer_manager/test.rs @@ -1,15 +1,19 @@ // TODO(shahak): Add tests for multiple connection ids use core::{panic, time}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use assert_matches::assert_matches; -use chrono::Duration; use futures::future::poll_fn; +use futures::{FutureExt, Stream, StreamExt}; use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::{ConnectionId, NetworkBehaviour, ToSwarm}; use libp2p::{Multiaddr, PeerId}; use mockall::predicate::eq; use tokio::time::sleep; +use void::Void; use super::behaviour_impl::ToOtherBehaviourEvent; use crate::discovery::identify_impl::IdentifyToOtherBehaviourEvent; @@ -19,6 +23,19 @@ use crate::peer_manager::peer::{MockPeerTrait, Peer, PeerTrait}; use crate::peer_manager::{PeerManager, PeerManagerConfig, ReputationModifier}; use crate::sqmr::OutboundSessionId; +impl Unpin for PeerManager

{} + +impl Stream for PeerManager

{ + type Item = ToSwarm; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::into_inner(self).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(event) => Poll::Ready(Some(event)), + } + } +} + #[test] fn peer_assignment_round_robin() { // Create a new peer manager @@ -102,8 +119,8 @@ fn peer_assignment_round_robin() { } } -#[test] -fn peer_assignment_no_peers() { +#[tokio::test] +async fn peer_assignment_no_peers() { // Create a new peer manager let config = PeerManagerConfig::default(); let mut peer_manager: PeerManager = PeerManager::new(config.clone()); @@ -113,6 +130,7 @@ fn peer_assignment_no_peers() { // Assign a peer to the session assert_matches!(peer_manager.assign_peer_to_session(outbound_session_id), None); + assert!(peer_manager.next().now_or_never().is_none()); // Now the peer manager finds a new peer and can assign the session. let connection_id = ConnectionId::new_unchecked(0); @@ -120,18 +138,64 @@ fn peer_assignment_no_peers() { create_mock_peer(config.blacklist_timeout, false, Some(connection_id)); peer.expect_is_blocked().times(1).return_const(false); peer_manager.add_peer(peer); - assert_eq!(peer_manager.pending_events.len(), 1); assert_matches!( - peer_manager.pending_events.first().unwrap(), + peer_manager.next().await.unwrap(), ToSwarm::GenerateEvent(ToOtherBehaviourEvent::SessionAssigned { outbound_session_id: event_outbound_session_id, peer_id: event_peer_id, connection_id: event_connection_id, } - ) if outbound_session_id == *event_outbound_session_id && - peer_id == *event_peer_id && - connection_id == *event_connection_id + ) if outbound_session_id == event_outbound_session_id && + peer_id == event_peer_id && + connection_id == event_connection_id ); + assert!(peer_manager.next().now_or_never().is_none()); +} + +#[tokio::test] +async fn peer_assignment_no_unblocked_peers() { + const BLOCKED_UNTIL: Duration = Duration::from_secs(5); + const TIMEOUT: Duration = Duration::from_secs(1); + // Create a new peer manager + let config = PeerManagerConfig::default(); + let mut peer_manager: PeerManager = PeerManager::new(config.clone()); + + // Create a session + let outbound_session_id = OutboundSessionId { value: 1 }; + + // Create a peer + let connection_id = ConnectionId::new_unchecked(0); + let (mut peer, peer_id) = create_mock_peer(config.blacklist_timeout, true, Some(connection_id)); + peer.expect_is_blocked().times(1).return_const(true); + peer.expect_is_blocked().times(1).return_const(false); + peer.expect_blocked_until().times(1).returning(|| Instant::now() + BLOCKED_UNTIL); + + peer_manager.add_peer(peer); + peer_manager.report_peer(peer_id, ReputationModifier::Bad {}).unwrap(); + + // Try to assign a peer to the session, and check there wasn't any assignment. + assert_matches!(peer_manager.assign_peer_to_session(outbound_session_id), None); + assert!(peer_manager.next().now_or_never().is_none()); + + // Simulate that BLOCKED_UNTIL has passed. + tokio::time::pause(); + tokio::time::advance(BLOCKED_UNTIL).await; + tokio::time::resume(); + + // After BLOCKED_UNTIL has passed, the peer manager can assign the session. + let event = tokio::time::timeout(TIMEOUT, peer_manager.next()).await.unwrap().unwrap(); + assert_matches!( + event, + ToSwarm::GenerateEvent(ToOtherBehaviourEvent::SessionAssigned { + outbound_session_id: event_outbound_session_id, + peer_id: event_peer_id, + connection_id: event_connection_id, + } + ) if outbound_session_id == event_outbound_session_id && + peer_id == event_peer_id && + connection_id == event_connection_id + ); + assert!(peer_manager.next().now_or_never().is_none()); } #[test] @@ -155,7 +219,7 @@ fn report_peer_calls_update_reputation() { async fn peer_block_realeased_after_timeout() { const DURATION_IN_MILLIS: u64 = 50; let mut peer = Peer::new(PeerId::random(), Multiaddr::empty()); - peer.set_timeout_duration(Duration::milliseconds(DURATION_IN_MILLIS as i64)); + peer.set_timeout_duration(Duration::from_millis(DURATION_IN_MILLIS)); peer.update_reputation(ReputationModifier::Bad {}); assert!(peer.is_blocked()); sleep(time::Duration::from_millis(DURATION_IN_MILLIS)).await; @@ -236,8 +300,8 @@ fn more_peers_needed() { assert!(!peer_manager.more_peers_needed()); } -#[test] -fn timed_out_peer_not_assignable_to_queries() { +#[tokio::test] +async fn timed_out_peer_not_assignable_to_queries() { // Create a new peer manager let config = PeerManagerConfig::default(); let mut peer_manager: PeerManager = PeerManager::new(config.clone()); @@ -245,6 +309,7 @@ fn timed_out_peer_not_assignable_to_queries() { // Create a mock peer let (mut peer, peer_id) = create_mock_peer(config.blacklist_timeout, true, None); peer.expect_is_blocked().times(1).return_const(true); + peer.expect_blocked_until().times(1).returning(|| Instant::now() + Duration::from_secs(1)); // Add the mock peer to the peer manager peer_manager.add_peer(peer);