Skip to content

Commit

Permalink
Merge pull request ProvableHQ#3307 from ProvableHQ/improve_bootstrap_…
Browse files Browse the repository at this point in the history
…peering

Introduce --rotate-external-peers flag to improve bootstrap client peering
  • Loading branch information
zosorock authored Oct 22, 2024
2 parents fb06556 + 9dccba4 commit 38d0973
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 13 deletions.
7 changes: 5 additions & 2 deletions cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,12 @@ pub struct Start {
/// Specify the IP address and port of the validator(s) to connect to
#[clap(default_value = "", long = "validators")]
pub validators: String,
/// If the flag is set, a node will allow untrusted peers to connect
/// If the flag is set, a validator will allow untrusted peers to connect
#[clap(long = "allow-external-peers")]
pub allow_external_peers: bool,
/// If the flag is set, a client will periodically evict more external peers
#[clap(long = "rotate-external-peers")]
pub rotate_external_peers: bool,

/// Specify the IP address and port for the REST server
#[clap(long = "rest")]
Expand Down Expand Up @@ -601,7 +604,7 @@ impl Start {
match node_type {
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, self.allow_external_peers, dev_txs, shutdown.clone()).await,
NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, storage_mode, shutdown.clone()).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, shutdown).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, self.rotate_external_peers, shutdown).await,
}
}

Expand Down
32 changes: 23 additions & 9 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
use snarkvm::prelude::Network;

use colored::Colorize;
use rand::{prelude::IteratorRandom, rngs::OsRng};
use rand::{prelude::IteratorRandom, rngs::OsRng, Rng};

/// A helper function to compute the maximum of two numbers.
/// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
Expand Down Expand Up @@ -141,15 +141,27 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
/// TODO (howardwu): If the node is a validator, keep the validator.
/// This function keeps the number of connected peers within the allowed range.
fn handle_connected_peers(&self) {
// Initialize an RNG.
let rng = &mut OsRng;

// Obtain the number of connected peers.
let num_connected = self.router().number_of_connected_peers();
// Compute the total number of surplus peers.
let num_surplus_peers = num_connected.saturating_sub(Self::MAXIMUM_NUMBER_OF_PEERS);

// Obtain the number of connected provers.
let num_connected_provers = self.router().number_of_connected_provers();

// Consider rotating more external peers every ~10 heartbeats.
let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
// Determine the maximum number of peers and provers to keep.
let (max_peers, max_provers) = if reduce_peers {
(Self::MEDIAN_NUMBER_OF_PEERS, 0)
} else {
(Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS)
};

// Compute the number of surplus peers.
let num_surplus_peers = num_connected.saturating_sub(max_peers);
// Compute the number of surplus provers.
let num_surplus_provers = num_connected_provers.saturating_sub(Self::MAXIMUM_NUMBER_OF_PROVERS);
let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
// Compute the number of provers remaining connected.
let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
// Compute the number of surplus clients and validators.
Expand All @@ -165,9 +177,6 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Retrieve the bootstrap peers.
let bootstrap = self.router().bootstrap_peers();

// Initialize an RNG.
let rng = &mut OsRng;

// Determine the provers to disconnect from.
let prover_ips_to_disconnect = self
.router()
Expand All @@ -185,7 +194,12 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
.into_iter()
.filter_map(|peer| {
let peer_ip = peer.ip();
if !peer.is_prover() && !trusted.contains(&peer_ip) && !bootstrap.contains(&peer_ip) {
if !peer.is_prover() && // Skip if the peer is a prover.
!trusted.contains(&peer_ip) && // Skip if the peer is trusted.
!bootstrap.contains(&peer_ip) && // Skip if the peer is a bootstrap peer.
// Skip if you are syncing from this peer.
(self.is_block_synced() || (!self.is_block_synced() && self.router().cache.num_outbound_block_requests(&peer.ip()) == 0))
{
Some(peer_ip)
} else {
None
Expand Down
10 changes: 10 additions & 0 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub struct InnerRouter<N: Network> {
restricted_peers: RwLock<HashMap<SocketAddr, Instant>>,
/// The spawned handles.
handles: Mutex<Vec<JoinHandle<()>>>,
/// If the flag is set, the node will periodically evict more external peers.
rotate_external_peers: bool,
/// If the flag is set, the node will engage in P2P gossip to request more peers.
allow_external_peers: bool,
/// The boolean flag for the development mode.
Expand All @@ -112,12 +114,14 @@ impl<N: Network> Router<N> {

impl<N: Network> Router<N> {
/// Initializes a new `Router` instance.
#[allow(clippy::too_many_arguments)]
pub async fn new(
node_ip: SocketAddr,
node_type: NodeType,
account: Account<N>,
trusted_peers: &[SocketAddr],
max_peers: u16,
rotate_external_peers: bool,
allow_external_peers: bool,
is_dev: bool,
) -> Result<Self> {
Expand All @@ -136,6 +140,7 @@ impl<N: Network> Router<N> {
candidate_peers: Default::default(),
restricted_peers: Default::default(),
handles: Default::default(),
rotate_external_peers,
allow_external_peers,
is_dev,
})))
Expand Down Expand Up @@ -256,6 +261,11 @@ impl<N: Network> Router<N> {
self.is_dev
}

/// Returns `true` if the node is periodically evicting more external peers.
pub fn rotate_external_peers(&self) -> bool {
self.rotate_external_peers
}

/// Returns `true` if the node is engaging in P2P gossip to request more peers.
pub fn allow_external_peers(&self) -> bool {
self.allow_external_peers
Expand Down
3 changes: 3 additions & 0 deletions node/router/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub async fn client(listening_port: u16, max_peers: u16) -> TestRouter<CurrentNe
sample_account(),
&[],
max_peers,
false,
true,
true,
)
Expand All @@ -95,6 +96,7 @@ pub async fn prover(listening_port: u16, max_peers: u16) -> TestRouter<CurrentNe
sample_account(),
&[],
max_peers,
false,
true,
true,
)
Expand All @@ -117,6 +119,7 @@ pub async fn validator(
sample_account(),
trusted_peers,
max_peers,
false,
allow_external_peers,
true,
)
Expand Down
2 changes: 2 additions & 0 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
genesis: Block<N>,
cdn: Option<String>,
storage_mode: StorageMode,
rotate_external_peers: bool,
shutdown: Arc<AtomicBool>,
) -> Result<Self> {
// Initialize the signal handler.
Expand Down Expand Up @@ -117,6 +118,7 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
account,
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
rotate_external_peers,
allow_external_peers,
matches!(storage_mode, StorageMode::Development(_)),
)
Expand Down
5 changes: 5 additions & 0 deletions node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use snarkos_node_router::{
DataBlocks,
DisconnectReason,
MessageCodec,
PeerRequest,
Ping,
Pong,
PuzzleResponse,
Expand Down Expand Up @@ -68,6 +69,10 @@ where
async fn on_connect(&self, peer_addr: SocketAddr) {
// Resolve the peer address to the listener address.
let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return };
// If it's a bootstrap peer, first request its peers.
if self.router.bootstrap_peers().contains(&peer_ip) {
Outbound::send(self, peer_ip, Message::PeerRequest(PeerRequest));
}
// Retrieve the block locators.
let block_locators = match self.sync.get_block_locators() {
Ok(block_locators) => Some(block_locators),
Expand Down
16 changes: 14 additions & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,23 @@ impl<N: Network> Node<N> {
genesis: Block<N>,
cdn: Option<String>,
storage_mode: StorageMode,
rotate_external_peers: bool,
shutdown: Arc<AtomicBool>,
) -> Result<Self> {
Ok(Self::Client(Arc::new(
Client::new(node_ip, rest_ip, rest_rps, account, trusted_peers, genesis, cdn, storage_mode, shutdown)
.await?,
Client::new(
node_ip,
rest_ip,
rest_rps,
account,
trusted_peers,
genesis,
cdn,
storage_mode,
rotate_external_peers,
shutdown,
)
.await?,
)))
}

Expand Down
3 changes: 3 additions & 0 deletions node/src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());
// Determine if the prover should allow external peers.
let allow_external_peers = true;
// Determine if the prover should rotate external peers.
let rotate_external_peers = false;

// Initialize the node router.
let router = Router::new(
Expand All @@ -112,6 +114,7 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
account,
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
rotate_external_peers,
allow_external_peers,
matches!(storage_mode, StorageMode::Development(_)),
)
Expand Down
3 changes: 3 additions & 0 deletions node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
let (primary_sender, primary_receiver) = init_primary_channels::<N>();
// Start the consensus.
consensus.run(primary_sender, primary_receiver).await?;
// Determine if the validator should rotate external peers.
let rotate_external_peers = false;

// Initialize the node router.
let router = Router::new(
Expand All @@ -125,6 +127,7 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
account,
trusted_peers,
Self::MAXIMUM_NUMBER_OF_PEERS as u16,
rotate_external_peers,
allow_external_peers,
matches!(storage_mode, StorageMode::Development(_)),
)
Expand Down
1 change: 1 addition & 0 deletions node/tests/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub async fn client() -> Client<CurrentNetwork, ConsensusMemory<CurrentNetwork>>
sample_genesis_block(),
None, // No CDN.
StorageMode::Production,
false, // No extra peer rotation.
Default::default(),
)
.await
Expand Down

0 comments on commit 38d0973

Please sign in to comment.