Skip to content

Commit

Permalink
[node] Log less gossip (MystenLabs#1780)
Browse files Browse the repository at this point in the history
* Implement backoff for gossip

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored May 5, 2022
1 parent fd9435d commit 52589f1
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sui_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ serde_yaml = "0.8.23"
pretty_assertions = "1.2.1"
temp_testdir = "0.2.3"
hex = "0.4.3"
tracing-test = "0.2.1"

test_utils = { path = "../test_utils" }

Expand Down
105 changes: 104 additions & 1 deletion sui_core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,73 @@
*/

use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;

use crate::{
authority::AuthorityState, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI,
};
use tokio::time::Instant;

pub mod gossip;
use gossip::gossip_process;

// TODO: Make these into a proper config
const MAX_RETRIES_RECORDED: u32 = 10;
const DELAY_FOR_1_RETRY_MS: u64 = 2_000;
const EXPONENTIAL_DELAY_BASIS: u64 = 2;
const MAX_RETRY_DELAY_MS: u64 = 30_000;

pub struct AuthorityHealth {
// Records the number of retries
pub retries: u32,
// The instant after which we should contact this
// authority again.
pub no_contact_before: Instant,
}

impl Default for AuthorityHealth {
fn default() -> AuthorityHealth {
AuthorityHealth {
retries: 0,
no_contact_before: Instant::now(),
}
}
}

impl AuthorityHealth {
/// Sets the no contact instant to be larger than what
/// is currently recorded.
pub fn set_no_contact_for(&mut self, period: Duration) {
let future_instant = Instant::now() + period;
if self.no_contact_before < future_instant {
self.no_contact_before = future_instant;
}
}

// Reset the no contact to no delay
pub fn reset_no_contact(&mut self) {
self.no_contact_before = Instant::now();
}

pub fn can_contact_now(&self) -> bool {
self.no_contact_before < Instant::now()
}
}

pub struct ActiveAuthority<A> {
// The local authority state
pub state: Arc<AuthorityState>,
// The network interfaces to other authorities
pub net: Arc<AuthorityAggregator<A>>,
// Network health
pub health: Arc<Mutex<HashMap<AuthorityName, AuthorityHealth>>>,
}

impl<A> ActiveAuthority<A> {
Expand All @@ -55,10 +106,62 @@ impl<A> ActiveAuthority<A> {
let committee = authority.committee.clone();

Ok(ActiveAuthority {
health: Arc::new(Mutex::new(
committee
.voting_rights
.iter()
.map(|(name, _)| (*name, AuthorityHealth::default()))
.collect(),
)),
state: authority,
net: Arc::new(AuthorityAggregator::new(committee, authority_clients)),
})
}

/// Returns the amount of time we should wait to be able to contact at least
/// 2/3 of the nodes in the committee according to the `no_contact_before`
/// instant stored in the authority health records. A network needs 2/3 stake
/// live nodes, so before that we are unlikely to be able to make process
/// even if we have a few connections.
pub async fn minimum_wait_for_majority_honest_available(&self) -> Instant {
let lock = self.health.lock().await;
let (_, instant) = self.net.committee.robust_value(
lock.iter().map(|(name, h)| (*name, h.no_contact_before)),
// At least one honest node is at or above it.
self.net.committee.quorum_threshold(),
);
instant
}

/// Adds one more retry to the retry counter up to MAX_RETRIES_RECORDED, and then increases
/// the`no contact` value to DELAY_FOR_1_RETRY_MS * EXPONENTIAL_DELAY_BASIS ^ retries, up to
/// a maximum delay of MAX_RETRY_DELAY_MS.
pub async fn set_failure_backoff(&self, name: AuthorityName) {
let mut lock = self.health.lock().await;
let mut entry = lock.entry(name).or_default();
entry.retries = u32::min(entry.retries + 1, MAX_RETRIES_RECORDED);
let delay: u64 = u64::min(
DELAY_FOR_1_RETRY_MS * u64::pow(EXPONENTIAL_DELAY_BASIS, entry.retries),
MAX_RETRY_DELAY_MS,
);
entry.set_no_contact_for(Duration::from_millis(delay));
}

// Resets retries to zero and sets no contact to zero delay.
pub async fn set_success_backoff(&self, name: AuthorityName) {
let mut lock = self.health.lock().await;
let mut entry = lock.entry(name).or_default();
entry.retries = 0;
entry.reset_no_contact();
}

// Checks given the current time if we should contact this authority, ie
// if we are past any `no contact` delay.
pub async fn can_contact(&self, name: AuthorityName) -> bool {
let mut lock = self.health.lock().await;
let entry = lock.entry(name).or_default();
entry.can_contact_now()
}
}

impl<A> ActiveAuthority<A>
Expand Down
81 changes: 66 additions & 15 deletions sui_core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
};

use futures::stream::FuturesOrdered;
use tracing::{error, info};
use tracing::{debug, error, info};

#[cfg(test)]
mod tests;
Expand All @@ -41,46 +41,89 @@ pub async fn gossip_process<A>(active_authority: &ActiveAuthority<A>, degree: us
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// A copy of the committee
let committee = &active_authority.net.committee;

// Number of tasks at most "degree" and no more than committee - 1
let target_num_tasks: usize = usize::min(
active_authority.state.committee.voting_rights.len() - 1,
degree,
);

// If we do not expect to connect to anyone
if target_num_tasks == 0 {
info!("Turn off gossip mechanism");
return;
}
info!("Turn on gossip mechanism");

// Keep track of names of active peers
let mut peer_names = HashSet::new();
let mut gossip_tasks = FuturesUnordered::new();

// TODO: provide a clean way to get out of the loop.
loop {
debug!("Seek new peers");

// Find out what is the earliest time that we are allowed to reconnect
// to at least 2f+1 nodes.
let next_connect = active_authority
.minimum_wait_for_majority_honest_available()
.await;
debug!(
"Waiting for {:?}",
next_connect - tokio::time::Instant::now()
);
tokio::time::sleep_until(next_connect).await;

let mut k = 0;
while gossip_tasks.len() < target_num_tasks {
let name = active_authority.state.committee.sample();
if peer_names.contains(name) || *name == active_authority.state.name {
if peer_names.contains(name)
|| *name == active_authority.state.name
|| !active_authority.can_contact(*name).await
{
// Are we likely to never terminate because of this condition?
// - We check we have nodes left by stake
// - We check that we have at least 2/3 of nodes that can be contacted.
continue;
}
peer_names.insert(*name);
gossip_tasks.push(async move {
let peer_gossip = PeerGossip::new(*name, active_authority);
// Add more duration if we make more than 1 to ensure overlap
info!("Gossip: Start gossip from peer {:?}", *name);
debug!("Start gossip from peer {:?}", *name);
peer_gossip
.spawn(Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15))
.await
});
k += 1;

// If we have already used all the good stake, then stop here and
// wait for some node to become available.
let total_stake_used: usize = peer_names
.iter()
.map(|name| committee.weight(name))
.sum::<usize>()
+ committee.weight(&active_authority.state.name);
if committee.quorum_threshold() <= total_stake_used {
break;
}
}

// If we have no peers no need to wait for one
if gossip_tasks.is_empty() {
continue;
}

// Let the peer gossip task finish
debug_assert!(!gossip_tasks.is_empty());
let (finished_name, _result) = gossip_tasks.select_next_some().await;
if let Err(err) = _result {
error!(
"Gossip: Peer {:?} finished with error: {}",
finished_name, err
);
active_authority.set_failure_backoff(finished_name).await;
error!("Peer {:?} returned error: {}", finished_name, err);
} else {
info!("Gossip: End gossip from peer {:?}", finished_name);
active_authority.set_success_backoff(finished_name).await;
debug!("End gossip from peer {:?}", finished_name);
}
peer_names.remove(&finished_name);
}
Expand All @@ -102,13 +145,21 @@ where

pub async fn spawn(mut self, duration: Duration) -> (AuthorityName, Result<(), SuiError>) {
let peer_name = self.peer_name;
let result = tokio::task::spawn(async move { self.gossip_timeout(duration).await })
.await
.map(|_| ())
.map_err(|_err| SuiError::GenericAuthorityError {
error: "Gossip Join Error".to_string(),
});
let result = tokio::task::spawn(async move { self.gossip_timeout(duration).await }).await;

// Return a join error.
if result.is_err() {
return (
peer_name,
Err(SuiError::GenericAuthorityError {
error: "Gossip Join Error".to_string(),
}),
);
};

// Return the internal result
let result = result.unwrap();
// minimum_time.await;
(peer_name, result)
}

Expand Down
60 changes: 59 additions & 1 deletion sui_core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
use std::time::Duration;

use sui_adapter::genesis;
use sui_network::network::NetworkClient;
use sui_types::{base_types::SequenceNumber, crypto::get_key_pair, object::Object};
use tracing_test::traced_test;

use super::*;
use crate::authority_aggregator::authority_aggregator_tests::*;
use crate::{
authority_aggregator::authority_aggregator_tests::*, authority_client::NetworkAuthorityClient,
};

#[tokio::test]
pub async fn test_gossip() {
Expand Down Expand Up @@ -65,3 +69,57 @@ pub async fn test_gossip() {

assert_eq!(gas_ref_1.1, SequenceNumber::from(1));
}

#[tokio::test]
#[traced_test]
pub async fn test_gossip_no_network() {
info!("Start running test");

let (addr1, _) = get_key_pair();
let gas_object1 = Object::with_owner_for_testing(addr1);
let gas_object2 = Object::with_owner_for_testing(addr1);
let genesis_objects =
authority_genesis_objects(4, vec![gas_object1.clone(), gas_object2.clone()]);

let (_aggregator, states) = init_local_authorities(genesis_objects).await;

// Connect to non-existing peer
let aggregator = AuthorityAggregator::new(
_aggregator.committee.clone(),
_aggregator
.authority_clients
.iter()
.map(|(name, _)| {
let net = NetworkAuthorityClient::new(NetworkClient::new(
"127.0.0.1".to_string(),
// !!! This port does not exist
332,
65_000,
Duration::from_secs(1),
Duration::from_secs(1),
));
(*name, net)
})
.collect(),
);

let clients = aggregator.authority_clients.clone();

// Start batch processes, and active processes.
if let Some(state) = states.into_iter().next() {
let inner_state = state;
let inner_clients = clients.clone();

let _active_handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new(inner_state, inner_clients).unwrap();
active_state.spawn_all_active_processes().await
});
}

// Let the helper tasks start
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_secs(10)).await;

// There have been timeouts and as a result the logs contain backoff messages
assert!(logs_contain("Waiting for 3.99"));
}
Loading

0 comments on commit 52589f1

Please sign in to comment.