Skip to content

Commit

Permalink
Authority Active net use ArcSwap (MystenLabs#2391)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 6, 2022
1 parent 893b4b4 commit 192cbc7
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 25 deletions.
18 changes: 11 additions & 7 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/

use arc_swap::ArcSwap;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
Expand Down Expand Up @@ -96,12 +97,11 @@ impl AuthorityHealth {
}
}

#[derive(Clone)]
pub struct ActiveAuthority<A> {
// The local authority state
pub state: Arc<AuthorityState>,
// The network interfaces to other authorities
pub net: Arc<AuthorityAggregator<A>>,
pub net: ArcSwap<AuthorityAggregator<A>>,
// Network health
pub health: Arc<Mutex<HashMap<AuthorityName, AuthorityHealth>>>,
}
Expand All @@ -122,7 +122,10 @@ impl<A> ActiveAuthority<A> {
.collect(),
)),
state: authority,
net: Arc::new(AuthorityAggregator::new(committee, authority_clients)),
net: ArcSwap::from(Arc::new(AuthorityAggregator::new(
committee,
authority_clients,
))),
})
}

Expand All @@ -133,10 +136,10 @@ impl<A> ActiveAuthority<A> {
/// even if we have a few connections.
pub async fn minimum_wait_for_majority_honest_available(&self) -> Instant {
let lock = self.health.lock().await;
let (_, instant) = self.net.committee.robust_value(
let (_, instant) = self.net.load().committee.robust_value(
lock.iter().map(|(name, h)| (*name, h.no_contact_before)),
// At least one honest node is at or above it.
self.net.committee.quorum_threshold(),
self.net.load().committee.quorum_threshold(),
);
instant
}
Expand Down Expand Up @@ -182,16 +185,17 @@ where

/// Spawn all active tasks.
pub async fn spawn_active_processes(self, gossip: bool, checkpoint: bool) {
let active = Arc::new(self);
// Spawn a task to take care of gossip
let gossip_locals = self.clone();
let gossip_locals = active.clone();
let _gossip_join = tokio::task::spawn(async move {
if gossip {
gossip_process(&gossip_locals, 4).await;
}
});

// Spawn task to take care of checkpointing
let checkpoint_locals = self; // .clone();
let checkpoint_locals = active; // .clone();
let _checkpoint_join = tokio::task::spawn(async move {
if checkpoint {
checkpoint_process(&checkpoint_locals, &CheckpointProcessControl::default()).await;
Expand Down
36 changes: 21 additions & 15 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Deref;
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
Expand Down Expand Up @@ -93,10 +94,16 @@ pub async fn checkpoint_process<A>(
tokio::time::sleep(timing.long_pause_between_checkpoints).await;

loop {
let committee = &active_authority.net.committee;
let net = active_authority.net.load().deref().clone();
let committee = &net.committee;
if committee != active_authority.state.committee.load().deref().deref() {
warn!("Inconsistent committee between authority state and authority active");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
// (1) Get the latest summaries and proposals
let state_of_world = get_latest_proposal_and_checkpoint_from_all(
active_authority.net.clone(),
net.clone(),
timing.extra_time_after_quorum,
timing.timeout_until_quorum,
)
Expand All @@ -123,7 +130,7 @@ pub async fn checkpoint_process<A>(
// TODO log error
if let Err(err) = sync_to_checkpoint(
active_authority.state.name,
active_authority.net.clone(),
net.clone(),
state_checkpoints.clone(),
checkpoint.clone(),
)
Expand All @@ -144,7 +151,7 @@ pub async fn checkpoint_process<A>(
state_checkpoints.lock().handle_checkpoint_certificate(
&checkpoint,
&None,
&active_authority.state.committee.load(),
committee,
)
}; // unlock

Expand All @@ -156,18 +163,15 @@ pub async fn checkpoint_process<A>(
// the full contents of the checkpoint. So we try to download it.
// TODO: clean up the errors to get here only when the error is
// "No checkpoint set at this sequence."
if let Ok(contents) = get_checkpoint_contents(
active_authority.state.name,
active_authority.net.clone(),
&checkpoint,
)
.await
if let Ok(contents) =
get_checkpoint_contents(active_authority.state.name, net.clone(), &checkpoint)
.await
{
// Retry with contents
let _ = state_checkpoints.lock().handle_checkpoint_certificate(
&checkpoint,
&Some(contents),
&active_authority.state.committee.load(),
committee,
);
}
}
Expand Down Expand Up @@ -603,10 +607,10 @@ pub async fn diff_proposals<A>(
break;
}

let random_authority = active_authority.net.committee.sample();
if available_authorities.remove(random_authority) {
let random_authority = *active_authority.net.load().committee.sample();
if available_authorities.remove(&random_authority) {
// Get a client
let client = active_authority.net.authority_clients[random_authority].clone();
let client = active_authority.net.load().authority_clients[&random_authority].clone();

if let Ok(response) = client
.handle_checkpoint(CheckpointRequest::latest(true))
Expand Down Expand Up @@ -704,6 +708,7 @@ where
// download them from the remote node.
let client = active_authority
.net
.load()
.clone_client(&fragment.other.0.authority);
for tx_digest in &fragment.diff.first.items {
let response = client
Expand Down Expand Up @@ -760,6 +765,7 @@ where
{
active_authority
.net
.load()
.sync_certificate_to_authority_with_timeout(
ConfirmationTransaction::new(cert.clone()),
active_authority.state.name,
Expand All @@ -784,7 +790,7 @@ where
debug!("Try sync for digest: {digest:?}");
if let Err(err) = sync_digest(
active_authority.state.name,
active_authority.net.clone(),
active_authority.net.load().clone(),
digest.transaction,
per_other_authority_delay,
)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn gossip_process_with_start_seq<A>(
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// A copy of the committee
let committee = &active_authority.net.committee;
let committee = &active_authority.net.load().committee;

// Number of tasks at most "degree" and no more than committee - 1
let target_num_tasks: usize = usize::min(committee.voting_rights.len() - 1, degree);
Expand Down Expand Up @@ -200,10 +200,10 @@ where
) -> PeerGossip<A> {
PeerGossip {
peer_name,
client: active_authority.net.authority_clients[&peer_name].clone(),
client: active_authority.net.load().authority_clients[&peer_name].clone(),
state: active_authority.state.clone(),
max_seq: start_seq,
aggregator: active_authority.net.clone(),
aggregator: active_authority.net.load().clone(),
}
}

Expand Down

0 comments on commit 192cbc7

Please sign in to comment.