Skip to content

Commit

Permalink
Make a clone of active authority at gossip loop (MystenLabs#2412)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 6, 2022
1 parent 192cbc7 commit a14d3ae
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 29 deletions.
10 changes: 10 additions & 0 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ impl<A> ActiveAuthority<A> {
}
}

impl<A> Clone for ActiveAuthority<A> {
fn clone(&self) -> Self {
ActiveAuthority {
state: self.state.clone(),
net: ArcSwap::from(self.net.load().clone()),
health: self.health.clone(),
}
}
}

impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
Expand Down
85 changes: 56 additions & 29 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::{
use async_trait::async_trait;
use futures::stream::FuturesOrdered;
use futures::{stream::FuturesUnordered, StreamExt};
use std::future::Future;
use std::ops::Deref;
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_types::committee::StakeUnit;
use sui_types::{
Expand Down Expand Up @@ -51,17 +53,18 @@ where
}

pub async fn gossip_process_with_start_seq<A>(
active_authority: &ActiveAuthority<A>,
_active_authority: &ActiveAuthority<A>,
degree: usize,
start_seq: Option<TxSequenceNumber>,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// A copy of the committee
let committee = &active_authority.net.load().committee;
// Make a clone of the active authority and committee, and keep using it until epoch changes.
let mut local_active = Arc::new(_active_authority.clone());
let mut committee = local_active.state.committee.load().deref().clone();

// Number of tasks at most "degree" and no more than committee - 1
let target_num_tasks: usize = usize::min(committee.voting_rights.len() - 1, degree);
let mut target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree);

// If we do not expect to connect to anyone
if target_num_tasks == 0 {
Expand All @@ -75,11 +78,26 @@ pub async fn gossip_process_with_start_seq<A>(
let mut gossip_tasks = FuturesUnordered::new();

loop {
if _active_authority.state.committee.load().epoch != committee.epoch {
// If epoch has changed, we need to make a new copy of the active authority,
// and update all local variables.
// We also need to remove any authority that's no longer a valid validator
// from the list of peer names.
// It's ok to keep the existing gossip tasks running even for peers that are no longer
// validators, and let them end naturally.
local_active = Arc::new(_active_authority.clone());
committee = local_active.state.committee.load().deref().clone();
target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree);
peer_names = peer_names
.into_iter()
.filter(|name| committee.voting_rights.contains_key(name))
.collect();
}
let mut k = 0;
while gossip_tasks.len() < target_num_tasks {
// Find out what is the earliest time that we are allowed to reconnect
// to at least 2f+1 nodes.
let next_connect = active_authority
let next_connect = local_active
.minimum_wait_for_majority_honest_available()
.await;
debug!(
Expand All @@ -88,20 +106,18 @@ pub async fn gossip_process_with_start_seq<A>(
);
tokio::time::sleep_until(next_connect).await;

let name_result = select_gossip_peer(
active_authority.state.name,
peer_names.clone(),
active_authority,
)
.await;
let name_result =
select_gossip_peer(local_active.state.name, peer_names.clone(), &local_active)
.await;
if name_result.is_err() {
continue;
}
let name = name_result.unwrap();

peer_names.insert(name);
let local_active_ref_copy = local_active.clone();
gossip_tasks.push(async move {
let peer_gossip = PeerGossip::new(name, active_authority, start_seq);
let peer_gossip = PeerGossip::new(name, &local_active_ref_copy, start_seq);
// Add more duration if we make more than 1 to ensure overlap
debug!("Starting gossip from peer {:?}", name);
peer_gossip
Expand All @@ -116,7 +132,7 @@ pub async fn gossip_process_with_start_seq<A>(
.iter()
.map(|name| committee.weight(name))
.sum::<StakeUnit>()
+ committee.weight(&active_authority.state.name);
+ committee.weight(&local_active.state.name);
if total_stake_used >= committee.quorum_threshold() {
break;
}
Expand All @@ -127,23 +143,34 @@ pub async fn gossip_process_with_start_seq<A>(
continue;
}

// Let the peer gossip task finish
let (finished_name, _result) = gossip_tasks.select_next_some().await;
if let Err(err) = _result {
active_authority.set_failure_backoff(finished_name).await;
active_authority.state.metrics.gossip_task_error_count.inc();
error!("Peer {:?} returned error: {:?}", finished_name, err);
} else {
active_authority.set_success_backoff(finished_name).await;
active_authority
.state
.metrics
.gossip_task_success_count
.inc();
debug!("End gossip from peer {:?}", finished_name);
}
peer_names.remove(&finished_name);
wait_for_one_gossip_task_to_finish(&local_active, &mut peer_names, &mut gossip_tasks).await;
}
}

async fn wait_for_one_gossip_task_to_finish<A>(
active_authority: &ActiveAuthority<A>,
peer_names: &mut HashSet<AuthorityName>,
gossip_tasks: &mut FuturesUnordered<
impl Future<Output = (AuthorityName, Result<(), SuiError>)>,
>,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let (finished_name, _result) = gossip_tasks.select_next_some().await;
if let Err(err) = _result {
active_authority.set_failure_backoff(finished_name).await;
active_authority.state.metrics.gossip_task_error_count.inc();
error!("Peer {:?} returned error: {:?}", finished_name, err);
} else {
active_authority.set_success_backoff(finished_name).await;
active_authority
.state
.metrics
.gossip_task_success_count
.inc();
debug!("End gossip from peer {:?}", finished_name);
}
peer_names.remove(&finished_name);
}

struct LocalConfirmationTransactionHandler {
Expand Down

0 comments on commit a14d3ae

Please sign in to comment.