Skip to content

Commit

Permalink
[State Sync] Remove disconnected peers from in-memory maps.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and bors-libra committed Mar 30, 2021
1 parent 6570f74 commit 9e7c97c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 83 deletions.
8 changes: 3 additions & 5 deletions state-sync/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use futures::{
stream::select_all,
StreamExt,
};
use netcore::transport::ConnectionOrigin;
use network::{protocols::network::Event, transport::ConnectionMetadata};
use std::{
cmp,
Expand Down Expand Up @@ -198,7 +197,7 @@ impl<T: ExecutorProxyTrait> StateSyncCoordinator<T> {
}
}
Event::LostPeer(metadata) => {
if let Err(e) = self.process_lost_peer(network_id, metadata.remote_peer_id, metadata.origin) {
if let Err(e) = self.process_lost_peer(network_id, metadata.remote_peer_id) {
error!(LogSchema::new(LogEntry::LostPeer).error(&e));
}
}
Expand Down Expand Up @@ -238,10 +237,9 @@ impl<T: ExecutorProxyTrait> StateSyncCoordinator<T> {
&mut self,
network_id: NodeNetworkId,
peer_id: PeerId,
origin: ConnectionOrigin,
) -> Result<(), Error> {
let peer = PeerNetworkId(network_id, peer_id);
self.request_manager.disable_peer(&peer, origin)
self.request_manager.disable_peer(&peer)
}

pub(crate) async fn process_chunk_message(
Expand Down Expand Up @@ -1974,7 +1972,7 @@ mod tests {

// Verify no error is returned when removing the node
validator_coordinator
.process_lost_peer(node_network_id, peer_id, ConnectionOrigin::Outbound)
.process_lost_peer(node_network_id, peer_id)
.unwrap();
}

Expand Down
2 changes: 2 additions & 0 deletions state-sync/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ impl<'a> LogSchema<'a> {
pub enum LogEntry {
Reconfig,
NewPeer,
NewPeerAlreadyExists,
LostPeer,
LostPeerNotKnown,
Waypoint,
RuntimeStart,
ConsensusCommit,
Expand Down
141 changes: 63 additions & 78 deletions state-sync/src/request_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use rand::{
};
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
collections::{
hash_map::Entry::{Occupied, Vacant},
BTreeMap, HashMap,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

Expand All @@ -32,18 +35,6 @@ const MIN_SCORE: f64 = 1.0;
const STARTING_SCORE: f64 = 50.0;
const STARTING_SCORE_PREFERRED: f64 = 100.0;

#[derive(Clone, Debug)]
struct PeerInfo {
is_alive: bool,
score: f64,
}

impl PeerInfo {
pub fn new(is_alive: bool, score: f64) -> Self {
Self { is_alive, score }
}
}

/// Basic metadata about the chunk request.
#[derive(Clone, Debug)]
pub struct ChunkRequestInfo {
Expand Down Expand Up @@ -82,10 +73,8 @@ enum PeerScoreUpdateType {
}

pub struct RequestManager {
// The list of peers that are eligible for this node to send sync requests
// to (grouped by network).
eligible_peers: BTreeMap<NetworkId, (Vec<PeerNetworkId>, Option<WeightedIndex<f64>>)>,
peers: HashMap<PeerNetworkId, PeerInfo>,
// Maps each peer to their peer score
peer_scores: HashMap<PeerNetworkId, f64>,
requests: BTreeMap<u64, ChunkRequestInfo>,
// duration with the same version before the next attempt to get the next chunk
request_timeout: Duration,
Expand All @@ -108,8 +97,7 @@ impl RequestManager {
update_multicast_network_counter(multicast_network_level.clone());

Self {
eligible_peers: BTreeMap::new(),
peers: HashMap::new(),
peer_scores: HashMap::new(),
requests: BTreeMap::new(),
request_timeout,
multicast_timeout,
Expand Down Expand Up @@ -137,95 +125,91 @@ impl RequestManager {
.with_label_values(&[&peer.raw_network_id().to_string()])
.inc();

if let Some(peer_info) = self.peers.get_mut(&peer) {
peer_info.is_alive = true;
} else {
let peer_score = if metadata.role == PeerRole::PreferredUpstream {
STARTING_SCORE_PREFERRED
} else {
STARTING_SCORE
};
self.peers.insert(peer, PeerInfo::new(true, peer_score));
match self.peer_scores.entry(peer) {
Occupied(occupied_entry) => {
warn!(LogSchema::new(LogEntry::NewPeerAlreadyExists).peer(occupied_entry.key()));
}
Vacant(vacant_entry) => {
let peer_score = if metadata.role == PeerRole::PreferredUpstream {
STARTING_SCORE_PREFERRED
} else {
STARTING_SCORE
};
vacant_entry.insert(peer_score);
}
}
self.update_peer_selection_data();

Ok(())
}

pub fn disable_peer(
&mut self,
peer: &PeerNetworkId,
origin: ConnectionOrigin,
) -> Result<(), Error> {
info!(LogSchema::new(LogEntry::LostPeer)
.peer(&peer)
.is_valid_peer(self.is_valid_state_sync_peer(&peer, origin)));
pub fn disable_peer(&mut self, peer: &PeerNetworkId) -> Result<(), Error> {
info!(LogSchema::new(LogEntry::LostPeer).peer(&peer));

if let Some(peer_info) = self.peers.get_mut(peer) {
if self.peer_scores.contains_key(peer) {
counters::ACTIVE_UPSTREAM_PEERS
.with_label_values(&[&peer.raw_network_id().to_string()])
.dec();
peer_info.is_alive = false;
self.peer_scores.remove(peer);
} else {
warn!(LogSchema::new(LogEntry::LostPeerNotKnown).peer(&peer));
}
self.update_peer_selection_data();

Ok(())
}

pub fn no_available_peers(&self) -> bool {
self.eligible_peers.is_empty()
self.peer_scores.is_empty()
}

fn update_score(&mut self, peer: &PeerNetworkId, update_type: PeerScoreUpdateType) {
if let Some(peer_info) = self.peers.get_mut(peer) {
let old_score = peer_info.score;
match update_type {
if let Some(score) = self.peer_scores.get_mut(peer) {
let old_score = *score;
let new_score = match update_type {
PeerScoreUpdateType::Success => {
let new_score = peer_info.score + 1.0;
peer_info.score = new_score.min(MAX_SCORE);
let new_score = old_score + 1.0;
new_score.min(MAX_SCORE)
}
PeerScoreUpdateType::InvalidChunk
| PeerScoreUpdateType::ChunkVersionCannotBeApplied => {
let new_score = peer_info.score * 0.8;
peer_info.score = new_score.max(MIN_SCORE);
let new_score = old_score * 0.8;
new_score.max(MIN_SCORE)
}
PeerScoreUpdateType::TimeOut
| PeerScoreUpdateType::EmptyChunk
| PeerScoreUpdateType::InvalidChunkRequest => {
let new_score = peer_info.score * 0.95;
peer_info.score = new_score.max(MIN_SCORE);
let new_score = old_score * 0.95;
new_score.max(MIN_SCORE)
}
}
if (old_score - peer_info.score).abs() > std::f64::EPSILON {
self.update_peer_selection_data();
}
};
*score = new_score;
}
}

// Updates the information used to select a peer to send a chunk request to.
fn update_peer_selection_data(&mut self) {
// Group active peers by network level
let active_peers = self
.peers
// Calculates a weighted index for each peer per network. This is used to probabilistically
// select a peer (per network) to send a chunk request to.
fn calculate_weighted_peers_per_network(
&mut self,
) -> BTreeMap<NetworkId, (Vec<PeerNetworkId>, Option<WeightedIndex<f64>>)> {
// Group peers by network level
let peers_by_network_level = self
.peer_scores
.iter()
.filter(|(_peer, peer_info)| peer_info.is_alive)
.map(|(peer, peer_info)| (peer.raw_network_id(), (peer, peer_info)))
.map(|(peer, peer_score)| (peer.raw_network_id(), (peer, peer_score)))
.into_group_map();

// For each network, compute the peer selection data (i.e., the weighted index
// that holds the probability of the peer being chosen for the next chunk request)
self.eligible_peers = active_peers
// For each network, compute the weighted index
peers_by_network_level
.into_iter()
.map(|(network_level, peers)| {
let mut eligible_peers = vec![];
let weights: Vec<_> = peers
.iter()
.map(|(peer, peer_info)| {
.map(|(peer, peer_score)| {
eligible_peers.push((*peer).clone());
peer_info.score
*peer_score
})
.collect();
let weighted_index = WeightedIndex::new(&weights)
let weighted_index = WeightedIndex::new(weights)
.map_err(|error| {
error!(
"Failed to compute weighted index for eligible peers: {:?}",
Expand All @@ -236,19 +220,22 @@ impl RequestManager {
.ok();
(network_level, (eligible_peers, weighted_index))
})
.collect();
.collect()
}

/// Picks a set of peers to send chunk requests to. Here, we attempt to pick one peer
/// per network, in order of network level preference. The set of networks selected is
/// determined by the multicast network level. All networks with preference
/// level <= multicast level are sampled. If there are no live peers in these networks,
/// the multicast level is updated to the preference level of the first chosen network.
pub fn pick_peers(&mut self) -> Vec<PeerNetworkId> {
fn pick_peers(&mut self) -> Vec<PeerNetworkId> {
// Calculate a weighted peer selection map per network level
let weighted_peers_per_network = self.calculate_weighted_peers_per_network();

let mut chosen_peers = vec![];
let mut new_multicast_network_level = None;

for (network_level, (peers, weighted_index)) in &self.eligible_peers {
for (network_level, (peers, weighted_index)) in &weighted_peers_per_network {
if let Some(peer) = pick_peer(peers, weighted_index) {
chosen_peers.push(peer)
}
Expand All @@ -271,7 +258,6 @@ impl RequestManager {
pub fn send_chunk_request(&mut self, req: GetChunkRequest) -> Result<(), Error> {
let log = LogSchema::new(LogEntry::SendChunkRequest).chunk_request(req.clone());

// update internal state
let peers = self.pick_peers();
if peers.is_empty() {
warn!(log.event(LogEvent::MissingPeers));
Expand Down Expand Up @@ -509,7 +495,7 @@ impl RequestManager {
}

pub fn is_known_state_sync_peer(&self, peer: &PeerNetworkId) -> bool {
self.peers.contains_key(peer)
self.peer_scores.contains_key(peer)
}

fn update_multicast_network_level(
Expand Down Expand Up @@ -596,16 +582,15 @@ mod tests {
assert!(!request_manager.no_available_peers());

// Disable validator 0
request_manager
.disable_peer(&validator_0, ConnectionOrigin::Outbound)
.unwrap();
request_manager.disable_peer(&validator_0).unwrap();

// Verify validator 0 is still known, but no longer available
assert!(request_manager.is_known_state_sync_peer(&validator_0));
// Verify validator 0 is now unknown
assert!(!request_manager.is_known_state_sync_peer(&validator_0));
assert!(request_manager.no_available_peers());

// Add validator 0 and verify it's now enabled
// Add validator 0 and verify it's now re-enabled
add_validator_to_request_manager(&mut request_manager, &validator_0, PeerRole::Validator);
assert!(request_manager.is_known_state_sync_peer(&validator_0));
assert!(!request_manager.no_available_peers());
}

Expand Down

0 comments on commit 9e7c97c

Please sign in to comment.