Skip to content

Commit

Permalink
Updates sync nodes performance
Browse files Browse the repository at this point in the history
  • Loading branch information
howardwu committed Dec 1, 2021
1 parent 8f5746d commit d7af274
Showing 1 changed file with 60 additions and 45 deletions.
105 changes: 60 additions & 45 deletions src/network/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,29 @@ impl<N: Network, E: Environment> Ledger<N, E> {
}
}

///
/// Disconnects and restricts the given peer from the ledger.
///
async fn disconnect_and_restrict(&self, peer_ip: SocketAddr, message: &str) {
info!("Disconnecting and restricting {} ({})", peer_ip, message);
// Remove all entries of the peer from the ledger.
self.remove_peer(&peer_ip).await;
// Update the status of the ledger.
self.update_status().await;
// Send a `Disconnect` message to the peer.
if let Err(error) = self
.peers_router
.send(PeersRequest::MessageSend(peer_ip, Message::Disconnect))
.await
{
warn!("[Disconnect] {}", error);
}
// Route a `PeerRestricted` to the peers.
if let Err(error) = self.peers_router.send(PeersRequest::PeerRestricted(peer_ip)).await {
warn!("[PeerRestricted] {}", error);
}
}

///
/// Attempt to fast-forward the ledger with unconfirmed blocks.
///
Expand Down Expand Up @@ -903,51 +926,43 @@ impl<N: Network, E: Environment> Ledger<N, E> {
}
}

// // TODO (howardwu): TEMPORARY - Evaluate the merits of this experiment after seeing the results.
// // If the node is a sync node and the node is currently syncing,
// // reduce the number of connections down to the minimum threshold,
// // to improve the speed with which the node syncs back to tip.
// if E::NODE_TYPE == NodeType::Sync && self.status.is_syncing() && self.number_of_block_requests().await > 0 {
// debug!("Temporarily reducing the number of connected peers to sync");
//
// // Lock peers_state and block_requests for further processing.
// let peers_state = self.peers_state.read().await;
// let block_requests = self.block_requests.read().await;
//
// // Determine the peers to disconnect from.
// // Attention - We are reducing this to the `MINIMUM_NUMBER_OF_PEERS`, *not* `MAXIMUM_NUMBER_OF_PEERS`.
// let num_excess_peers = peers_state.len().saturating_sub(E::MINIMUM_NUMBER_OF_PEERS);
// let peer_ips_to_disconnect = peers_state
// .iter()
// .filter(|(&peer_ip, _)| {
// let peer_str = peer_ip.to_string();
// !E::SYNC_NODES.contains(&peer_str.as_str())
// && !E::BEACON_NODES.contains(&peer_str.as_str())
// && !block_requests.contains_key(&peer_ip)
// })
// .take(num_excess_peers)
// .map(|(&ip, _)| ip)
// .collect::<Vec<SocketAddr>>();
//
// // Release the lock over peers_state and block_requests.
// drop(peers_state);
// drop(block_requests);
//
// trace!("Found {} peers to temporarily disconnect", peer_ips_to_disconnect.len());
//
// // Proceed to send disconnect requests to these peers.
// for peer_ip in peer_ips_to_disconnect {
// info!("Disconnecting from {} (disconnecting to sync)", peer_ip);
// // Remove all entries of the peer from the ledger.
// self.remove_peer(&peer_ip).await;
// // Update the status of the ledger.
// self.update_status().await;
// // Route a `PeerRestricted` to the peers.
// if let Err(error) = self.peers_router.send(PeersRequest::PeerRestricted(peer_ip)).await {
// warn!("[PeerRestricted] {}", error);
// }
// }
// }
// TODO (howardwu): TEMPORARY - Evaluate the merits of this experiment after seeing the results.
// If the node is a sync node and the node is currently syncing,
// reduce the number of connections down to the minimum threshold,
// to improve the speed with which the node syncs back to tip.
if E::NODE_TYPE == NodeType::Sync && self.status.is_syncing() && self.number_of_block_requests().await > 0 {
debug!("Temporarily reducing the number of connected peers to sync");

// Lock peers_state and block_requests for further processing.
let peers_state = self.peers_state.read().await;
let block_requests = self.block_requests.read().await;

// Determine the peers to disconnect from.
// Attention - We are reducing this to the `MINIMUM_NUMBER_OF_PEERS`, *not* `MAXIMUM_NUMBER_OF_PEERS`.
let num_excess_peers = peers_state.len().saturating_sub(E::MINIMUM_NUMBER_OF_PEERS);
let peer_ips_to_disconnect = peers_state
.iter()
.filter(|(&peer_ip, _)| {
let peer_str = peer_ip.to_string();
!E::SYNC_NODES.contains(&peer_str.as_str())
&& !E::BEACON_NODES.contains(&peer_str.as_str())
&& !block_requests.get(&peer_ip).is_some()
})
.take(num_excess_peers)
.map(|(&ip, _)| ip)
.collect::<Vec<SocketAddr>>();

// Release the lock over peers_state and block_requests.
drop(peers_state);
drop(block_requests);

trace!("Found {} peers to temporarily disconnect", peer_ips_to_disconnect.len());

// Proceed to disconnect and restrict these peers.
for peer_ip in peer_ips_to_disconnect {
self.disconnect_and_restrict(peer_ip, "disconnecting to sync").await;
}
}
}
}

Expand Down

0 comments on commit d7af274

Please sign in to comment.