Skip to content

Commit

Permalink
[network] Fix peer selection to rotate through all peers
Browse files Browse the repository at this point in the history
In the event that a peer fails to connect, we won't try to reconnect to it for a given amount of time (unless all nodes have been dialed)
  • Loading branch information
gregnazario authored and aptos-bot committed Mar 30, 2022
1 parent 4f2f5fd commit 1854066
Showing 1 changed file with 67 additions and 13 deletions.
80 changes: 67 additions & 13 deletions network/src/connectivity_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ use rand::{
use serde::Serialize;
use short_hex_str::AsShortHexStr;
use std::{
cmp::min,
cmp::{min, Ordering},
collections::{hash_map::Entry, HashMap, HashSet},
fmt, mem,
sync::Arc,
time::Duration,
time::{Duration, SystemTime},
};
use tokio_retry::strategy::jitter;

Expand All @@ -75,6 +75,11 @@ mod test;
/// around the same time at startup.
const MAX_CONNECTION_DELAY_JITTER: Duration = Duration::from_millis(100);

/// The amount of time to try other peers until dialing this peer again.
///
/// It's currently set to 5 minutes to ensure rotation through all (or most) peers
const TRY_DIAL_BACKOFF_TIME: Duration = Duration::from_secs(300);

/// The ConnectivityManager actor.
pub struct ConnectivityManager<TBackoff> {
network_context: NetworkContext,
Expand Down Expand Up @@ -162,6 +167,10 @@ pub enum ConnectivityRequest {
struct DiscoveredPeerSet(HashMap<PeerId, DiscoveredPeer>);

impl DiscoveredPeerSet {
fn get_mut(&mut self, peer_id: &PeerId) -> Option<&mut DiscoveredPeer> {
self.0.get_mut(peer_id)
}

fn try_remove_empty(&mut self, peer_id: &PeerId) -> bool {
match self.0.entry(*peer_id) {
Entry::Occupied(entry) => {
Expand All @@ -188,14 +197,25 @@ impl DiscoveredPeerSet {
}
}

#[derive(Clone, Debug, Default, PartialEq, Serialize)]
/// Represents all the information for a discovered peer
#[derive(Clone, Debug, PartialEq, Serialize)]
struct DiscoveredPeer {
role: PeerRole,
addrs: Addresses,
keys: PublicKeys,
/// The last time the node was dialed
last_dial_time: SystemTime,
}

impl DiscoveredPeer {
pub fn new(role: PeerRole) -> Self {
Self {
role,
addrs: Addresses::default(),
keys: PublicKeys::default(),
last_dial_time: SystemTime::UNIX_EPOCH,
}
}
/// Peers without keys are not able to be mutually authenticated to
pub fn is_eligible(&self) -> bool {
!self.keys.is_empty()
Expand All @@ -205,6 +225,36 @@ impl DiscoveredPeer {
pub fn is_eligible_to_be_dialed(&self) -> bool {
self.is_eligible() && !self.addrs.is_empty()
}

/// Updates the last time we tried to connect to this node
pub fn set_last_dial_time(&mut self, time: SystemTime) {
self.last_dial_time = time;
}

/// Based on input, backoff on amount of time to dial a peer again
pub fn has_dialed_recently(&self, backoff_duration: Duration) -> bool {
if let Ok(duration_since_last_dial) = self.last_dial_time.elapsed() {
duration_since_last_dial < backoff_duration
} else {
false
}
}
}

impl PartialOrd for DiscoveredPeer {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
let self_dialed_recently = self.has_dialed_recently(TRY_DIAL_BACKOFF_TIME);
let other_dialed_recently = other.has_dialed_recently(TRY_DIAL_BACKOFF_TIME);

// Less recently dialed is prioritized over recently dialed
if !self_dialed_recently && other_dialed_recently {
Some(Ordering::Less)
} else if self_dialed_recently && !other_dialed_recently {
Some(Ordering::Greater)
} else {
self.role.partial_cmp(&other.role)
}
}
}

impl From<&DiscoveredPeer> for Peer {
Expand Down Expand Up @@ -446,23 +496,25 @@ where
.iter()
.filter(|(peer_id, peer)| {
peer.is_eligible_to_be_dialed() // The node is eligible to dial
&& !self.connected.contains_key(peer_id) // The node is not already connected.
&& !self.dial_queue.contains_key(peer_id) // There is no pending dial to this node.
&& roles_to_dial.contains(&peer.role) // We can dial this role
&& !self.connected.contains_key(peer_id) // The node is not already connected.
&& !self.dial_queue.contains_key(peer_id) // There is no pending dial to this node.
&& roles_to_dial.contains(&peer.role) // We can dial this role
})
.collect();

// Prioritize by PeerRole
// Shuffle so we don't get stuck on certain peers
eligible.shuffle(&mut self.rng);
eligible.sort_by(|(_, peer), (_, other)| peer.role.cmp(&other.role));

let num_eligible = eligible.len();
// Sort by peer priority
eligible
.sort_by(|(_, peer), (_, other)| peer.partial_cmp(other).unwrap_or(Ordering::Equal));

// Limit the number of dialed connections from a Full Node
// This does not limit the number of incoming connections
// It enforces that a full node cannot have more outgoing connections than `connection_limit`
// including in flight dials.
let num_eligible = eligible.len();
let to_connect = if let Some(conn_limit) = self.outbound_connection_limit {
let outbound_connections = self
.connected
Expand All @@ -478,6 +530,7 @@ where
num_eligible
};

// Take peers to connect to in priority order
eligible
.iter()
.take(to_connect)
Expand Down Expand Up @@ -557,6 +610,11 @@ where
peer_id
};
pending_dials.push(f.boxed());

// Update last dial time
if let Some(discovered_peer) = self.discovered_peers.get_mut(&peer_id) {
discovered_peer.set_last_dial_time(SystemTime::now())
}
self.dial_queue.insert(peer_id, cancel_tx);
}

Expand Down Expand Up @@ -672,11 +730,7 @@ where
.discovered_peers
.0
.entry(peer_id)
.or_insert(DiscoveredPeer {
role: discovered_peer.role,
addrs: Addresses::default(),
keys: PublicKeys::default(),
});
.or_insert_with(|| DiscoveredPeer::new(discovered_peer.role));
let mut peer_updated = false;
// Update peer's pubkeys
if peer.keys.update(src, discovered_peer.keys) {
Expand Down

0 comments on commit 1854066

Please sign in to comment.