Skip to content

Commit

Permalink
Do not clone ActiveAuthority (MystenLabs#2632)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 21, 2022
1 parent 15703a0 commit 1f38582
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 57 deletions.
33 changes: 12 additions & 21 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,20 @@ impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn spawn_checkpoint_process(self) {
pub async fn spawn_checkpoint_process(self: Arc<Self>) {
self.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.await
}

/// Spawn all active tasks.
pub async fn spawn_checkpoint_process_with_config(
self,
self: Arc<Self>,
checkpoint_process_control: Option<CheckpointProcessControl>,
) {
let active = Arc::new(self);

// Spawn task to take care of checkpointing
let checkpoint_locals = active; // .clone();
let _checkpoint_join = tokio::task::spawn(async move {
if let Some(checkpoint) = checkpoint_process_control {
checkpoint_process(&checkpoint_locals, &checkpoint).await;
checkpoint_process(&self, &checkpoint).await;
}
});

Expand All @@ -233,42 +230,36 @@ where
}

/// Spawn gossip process
pub async fn spawn_gossip_process(self, degree: usize) -> JoinHandle<()> {
let active = Arc::new(self);

pub async fn spawn_gossip_process(self: Arc<Self>, degree: usize) -> JoinHandle<()> {
// Number of tasks at most "degree" and no more than committee - 1
// (validators do not follow themselves for gossip)
let committee = active.state.committee.load().deref().clone();
let committee = self.state.committee.load().deref().clone();
let target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree);

tokio::task::spawn(async move {
gossip_process(&active, target_num_tasks).await;
gossip_process(&self, target_num_tasks).await;
})
}

pub async fn spawn_node_sync_process(
self,
self: Arc<Self>,
node_sync_store: Arc<NodeSyncStore>,
) -> JoinHandle<()> {
let active = Arc::new(self);
let committee = active.state.committee.load().deref().clone();
let committee = self.state.committee.load().deref().clone();
// nodes follow all validators to ensure they can eventually determine
// finality of certs. We need to follow 2f+1 _honest_ validators to
// eventually find finality, therefore we must follow all validators.
let target_num_tasks = committee.voting_rights.len();

tokio::task::spawn(async move {
node_sync_process(&active, target_num_tasks, node_sync_store).await;
node_sync_process(&self, target_num_tasks, node_sync_store).await;
})
}

/// Spawn gossip process
pub async fn spawn_execute_process(self) -> JoinHandle<()> {
let active = Arc::new(self);

let locals = active;
/// Spawn pending certificate execution process
pub async fn spawn_execute_process(self: Arc<Self>) -> JoinHandle<()> {
tokio::task::spawn(async move {
execution_process(&locals).await;
execution_process(&self).await;
})
}
}
38 changes: 22 additions & 16 deletions crates/sui-core/src/authority_active/checkpoint_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
safe_client::SafeClient,
};

use std::{collections::BTreeSet, time::Duration};
use std::{collections::BTreeSet, sync::Arc, time::Duration};
use sui_types::messages::{ConfirmationTransaction, ExecutionStatus};

use crate::checkpoints::checkpoint_tests::checkpoint_tests_setup;
Expand All @@ -31,11 +31,13 @@ async fn checkpoint_active_flow_happy_path() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap(),
);
active_state.spawn_checkpoint_process().await
});
}
Expand Down Expand Up @@ -106,11 +108,13 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap(),
);
// Spin the gossip service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
Expand Down Expand Up @@ -194,11 +198,13 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap(),
);
// Spin the gossip service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
Expand Down
25 changes: 15 additions & 10 deletions crates/sui-core/src/authority_active/execution_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{authority_active::ActiveAuthority, checkpoints::checkpoint_tests::TestSetup};

use std::sync::Arc;
use std::time::Duration;
use sui_types::messages::ExecutionStatus;

Expand All @@ -28,11 +29,13 @@ async fn pending_exec_storage_notify() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap(),
);
active_state.spawn_checkpoint_process().await
});
}
Expand Down Expand Up @@ -106,11 +109,13 @@ async fn pending_exec_full() {
for inner_state in authorities.clone() {
let clients = aggregator.clone_inner_clients();
let _active_handle = tokio::task::spawn(async move {
let active_state = ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap();
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(
inner_state.authority.clone(),
clients,
)
.unwrap(),
);

active_state.clone().spawn_execute_process().await;
active_state.spawn_checkpoint_process().await;
Expand Down
10 changes: 6 additions & 4 deletions crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ pub async fn test_gossip_plain() {
let inner_clients = clients.clone();

let handle = tokio::task::spawn(async move {
let active_state =
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(inner_state, inner_clients)
.unwrap();
.unwrap(),
);
active_state.spawn_gossip_process(3).await;
});

Expand Down Expand Up @@ -67,9 +68,10 @@ pub async fn test_gossip_error() {
let inner_clients = clients.clone();

let handle = tokio::task::spawn(async move {
let active_state =
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(inner_state, inner_clients)
.unwrap();
.unwrap(),
);
active_state.spawn_gossip_process(3).await;
});
active_authorities.push(handle);
Expand Down
7 changes: 5 additions & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ impl SuiNode {
authority_clients.insert(validator.public_key(), client);
}

let active_authority =
ActiveAuthority::new(state.clone(), follower_store, authority_clients)?;
let active_authority = Arc::new(ActiveAuthority::new(
state.clone(),
follower_store,
authority_clients,
)?);

Some(if is_validator {
// TODO: get degree from config file.
Expand Down
11 changes: 7 additions & 4 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use rand::{rngs::StdRng, SeedableRng};
use std::collections::HashSet;
use std::sync::Arc;
use sui_core::{
authority::AuthorityState,
authority_active::{checkpoint_driver::CheckpointProcessControl, ActiveAuthority},
Expand Down Expand Up @@ -142,8 +143,9 @@ async fn end_to_end() {
let state = authority.state().clone();
let clients = aggregator.clone_inner_clients();
let _active_authority_handle = tokio::spawn(async move {
let active_state =
ActiveAuthority::new_with_ephemeral_follower_store(state, clients).unwrap();
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(state, clients).unwrap(),
);
let checkpoint_process_control = CheckpointProcessControl {
long_pause_between_checkpoints: Duration::from_millis(10),
..CheckpointProcessControl::default()
Expand Down Expand Up @@ -226,8 +228,9 @@ async fn checkpoint_with_shared_objects() {
let state = authority.state().clone();
let clients = aggregator.clone_inner_clients();
let _active_authority_handle = tokio::spawn(async move {
let active_state =
ActiveAuthority::new_with_ephemeral_follower_store(state, clients).unwrap();
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_follower_store(state, clients).unwrap(),
);
let checkpoint_process_control = CheckpointProcessControl {
long_pause_between_checkpoints: Duration::from_millis(10),
..CheckpointProcessControl::default()
Expand Down

0 comments on commit 1f38582

Please sign in to comment.