Skip to content

Commit

Permalink
Better network for the consensus client (MystenLabs#1044)
Browse files Browse the repository at this point in the history
Fix issue MystenLabs#931
  • Loading branch information
asonnino authored Mar 23, 2022
1 parent 1998ad5 commit 1ab83a3
Showing 1 changed file with 74 additions and 5 deletions.
79 changes: 74 additions & 5 deletions sui_core/src/consensus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
use crate::authority::AuthorityState;
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use std::cmp::min;
use std::cmp::Ordering;
use std::net::SocketAddr;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
use std::time::Duration;
use sui_network::transport;
use sui_network::transport::{RwChannel, TcpDataStream};
use sui_types::base_types::SequenceNumber;
Expand All @@ -16,6 +18,7 @@ use sui_types::messages::{ConfirmationTransaction, ConsensusOutput, ConsensusSyn
use sui_types::serialize::{deserialize_message, serialize_consensus_sync, SerializedMessage};
use sui_types::{fp_bail, fp_ensure};
use tokio::task::JoinHandle;
use tokio::time::sleep;

#[cfg(test)]
#[path = "unit_tests/consensus_tests.rs"]
Expand Down Expand Up @@ -170,15 +173,26 @@ impl ConsensusClient {

/// Main loop connecting to the consensus. This mainly acts as a light client.
async fn run(&mut self, address: SocketAddr, buffer_size: usize) -> SuiResult<()> {
// TODO [issue #931]: Do not try to reconnect immediately after the connection fails, use some
// sort of back off. We may also move this logic to `sui-network::transport` to
// expose a 'stream client' or something like that.
// TODO: We may also move this logic to `sui-network::transport` to expose a 'stream client'
// or something like that.

// The connection waiter ensures we do not attempt to reconnect immediately after failure.
let mut connection_waiter = ConnectionWaiter::default();

// Continuously connects to the consensus node.
'main: loop {
// Wait a bit before re-attempting connections.
connection_waiter.wait().await;

// Subscribe to the consensus' output.
let mut connection = match transport::connect(address.to_string(), buffer_size).await {
Ok(connection) => connection,
Err(e) => {
log::warn!("Failed to subscribe to consensus output: {}", e);
log::warn!(
"Failed to subscribe to consensus output (retry {}): {}",
connection_waiter.status(),
e
);
continue 'main;
}
};
Expand Down Expand Up @@ -218,11 +232,66 @@ impl ConsensusClient {
log::warn!("Failed to send sync request to consensus: {}", e);
continue 'main;
}
connection_waiter.reset();
}
// Nothing to do.
Ok(ProcessingOutcome::Ok) => (),
Ok(ProcessingOutcome::Ok) => connection_waiter.reset(),
}
}
}
}
}

/// Make the network client wait a bit before re-attempting network connections.
pub struct ConnectionWaiter {
/// The minimum delay to wait before re-attempting a connection.
min_delay: u64,
/// The maximum delay to wait before re-attempting a connection.
max_delay: u64,
/// The actual delay we wait before re-attempting a connection.
delay: u64,
/// The number of times we attempted to make a connection.
retry: usize,
}

impl Default for ConnectionWaiter {
fn default() -> Self {
Self::new(/* min_delay */ 200, /* max_delay */ 60_000)
}
}

impl ConnectionWaiter {
/// Create a new connection waiter.
pub fn new(min_delay: u64, max_delay: u64) -> Self {
Self {
min_delay,
max_delay,
delay: 0,
retry: 0,
}
}

/// Return the number of failed attempts.
pub fn status(&self) -> &usize {
&self.retry
}

/// Wait for a bit (depending on the number of failed connections).
pub async fn wait(&mut self) {
if self.delay != 0 {
sleep(Duration::from_millis(self.delay)).await;
}

self.delay = match self.delay {
0 => self.min_delay,
_ => min(2 * self.delay, self.max_delay),
};
self.retry += 1;
}

/// Reset the waiter to its initial parameters.
pub fn reset(&mut self) {
self.delay = 0;
self.retry = 0;
}
}

0 comments on commit 1ab83a3

Please sign in to comment.