From a14d3aef2fd2bdd396773ffcd3dc3666ea419cac Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 6 Jun 2022 10:32:38 -0700 Subject: [PATCH] Make a clone of active authority at gossip loop (#2412) --- crates/sui-core/src/authority_active.rs | 10 +++ .../src/authority_active/gossip/mod.rs | 85 ++++++++++++------- 2 files changed, 66 insertions(+), 29 deletions(-) diff --git a/crates/sui-core/src/authority_active.rs b/crates/sui-core/src/authority_active.rs index c4d9c01f0c7b2..86b45fee15e54 100644 --- a/crates/sui-core/src/authority_active.rs +++ b/crates/sui-core/src/authority_active.rs @@ -175,6 +175,16 @@ impl ActiveAuthority { } } +impl Clone for ActiveAuthority { + fn clone(&self) -> Self { + ActiveAuthority { + state: self.state.clone(), + net: ArcSwap::from(self.net.load().clone()), + health: self.health.clone(), + } + } +} + impl ActiveAuthority where A: AuthorityAPI + Send + Sync + 'static + Clone, diff --git a/crates/sui-core/src/authority_active/gossip/mod.rs b/crates/sui-core/src/authority_active/gossip/mod.rs index df7513c8eccc8..012ed1b717b09 100644 --- a/crates/sui-core/src/authority_active/gossip/mod.rs +++ b/crates/sui-core/src/authority_active/gossip/mod.rs @@ -10,6 +10,8 @@ use crate::{ use async_trait::async_trait; use futures::stream::FuturesOrdered; use futures::{stream::FuturesUnordered, StreamExt}; +use std::future::Future; +use std::ops::Deref; use std::{collections::HashSet, sync::Arc, time::Duration}; use sui_types::committee::StakeUnit; use sui_types::{ @@ -51,17 +53,18 @@ where } pub async fn gossip_process_with_start_seq( - active_authority: &ActiveAuthority, + _active_authority: &ActiveAuthority, degree: usize, start_seq: Option, ) where A: AuthorityAPI + Send + Sync + 'static + Clone, { - // A copy of the committee - let committee = &active_authority.net.load().committee; + // Make a clone of the active authority and committee, and keep using it until epoch changes. + let mut local_active = Arc::new(_active_authority.clone()); + let mut committee = local_active.state.committee.load().deref().clone(); // Number of tasks at most "degree" and no more than committee - 1 - let target_num_tasks: usize = usize::min(committee.voting_rights.len() - 1, degree); + let mut target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree); // If we do not expect to connect to anyone if target_num_tasks == 0 { @@ -75,11 +78,26 @@ pub async fn gossip_process_with_start_seq( let mut gossip_tasks = FuturesUnordered::new(); loop { + if _active_authority.state.committee.load().epoch != committee.epoch { + // If epoch has changed, we need to make a new copy of the active authority, + // and update all local variables. + // We also need to remove any authority that's no longer a valid validator + // from the list of peer names. + // It's ok to keep the existing gossip tasks running even for peers that are no longer + // validators, and let them end naturally. + local_active = Arc::new(_active_authority.clone()); + committee = local_active.state.committee.load().deref().clone(); + target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree); + peer_names = peer_names + .into_iter() + .filter(|name| committee.voting_rights.contains_key(name)) + .collect(); + } let mut k = 0; while gossip_tasks.len() < target_num_tasks { // Find out what is the earliest time that we are allowed to reconnect // to at least 2f+1 nodes. - let next_connect = active_authority + let next_connect = local_active .minimum_wait_for_majority_honest_available() .await; debug!( @@ -88,20 +106,18 @@ pub async fn gossip_process_with_start_seq( ); tokio::time::sleep_until(next_connect).await; - let name_result = select_gossip_peer( - active_authority.state.name, - peer_names.clone(), - active_authority, - ) - .await; + let name_result = + select_gossip_peer(local_active.state.name, peer_names.clone(), &local_active) + .await; if name_result.is_err() { continue; } let name = name_result.unwrap(); peer_names.insert(name); + let local_active_ref_copy = local_active.clone(); gossip_tasks.push(async move { - let peer_gossip = PeerGossip::new(name, active_authority, start_seq); + let peer_gossip = PeerGossip::new(name, &local_active_ref_copy, start_seq); // Add more duration if we make more than 1 to ensure overlap debug!("Starting gossip from peer {:?}", name); peer_gossip @@ -116,7 +132,7 @@ pub async fn gossip_process_with_start_seq( .iter() .map(|name| committee.weight(name)) .sum::() - + committee.weight(&active_authority.state.name); + + committee.weight(&local_active.state.name); if total_stake_used >= committee.quorum_threshold() { break; } @@ -127,23 +143,34 @@ pub async fn gossip_process_with_start_seq( continue; } - // Let the peer gossip task finish - let (finished_name, _result) = gossip_tasks.select_next_some().await; - if let Err(err) = _result { - active_authority.set_failure_backoff(finished_name).await; - active_authority.state.metrics.gossip_task_error_count.inc(); - error!("Peer {:?} returned error: {:?}", finished_name, err); - } else { - active_authority.set_success_backoff(finished_name).await; - active_authority - .state - .metrics - .gossip_task_success_count - .inc(); - debug!("End gossip from peer {:?}", finished_name); - } - peer_names.remove(&finished_name); + wait_for_one_gossip_task_to_finish(&local_active, &mut peer_names, &mut gossip_tasks).await; + } +} + +async fn wait_for_one_gossip_task_to_finish( + active_authority: &ActiveAuthority, + peer_names: &mut HashSet, + gossip_tasks: &mut FuturesUnordered< + impl Future)>, + >, +) where + A: AuthorityAPI + Send + Sync + 'static + Clone, +{ + let (finished_name, _result) = gossip_tasks.select_next_some().await; + if let Err(err) = _result { + active_authority.set_failure_backoff(finished_name).await; + active_authority.state.metrics.gossip_task_error_count.inc(); + error!("Peer {:?} returned error: {:?}", finished_name, err); + } else { + active_authority.set_success_backoff(finished_name).await; + active_authority + .state + .metrics + .gossip_task_success_count + .inc(); + debug!("End gossip from peer {:?}", finished_name); } + peer_names.remove(&finished_name); } struct LocalConfirmationTransactionHandler {