Skip to content

Commit

Permalink
Node sync waits for finality before applying TXes (MystenLabs#2570)
Browse files Browse the repository at this point in the history
* Node sync process now waits for finality.

- Certificates are not processed locally in the node until finality has
  been established.
- Trustworthy TransactionEffects are downloaded, so we can easily add
  shared object support after this commit.

* self: Arc<Self> works fine apparently

* Fix typo

* Simplify EffectsStakeMap, delete old entries

* Revert unused changes

* Use where clause

* Delay Arc creation

* Don't need tokio Mutex since we don't hold locks across await
  • Loading branch information
mystenmark authored Jun 17, 2022
1 parent 98d674f commit cc952e3
Show file tree
Hide file tree
Showing 9 changed files with 679 additions and 25 deletions.
9 changes: 6 additions & 3 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
sync::Arc,
time::Duration,
};
use sui_storage::follower_store::FollowerStore;
use sui_storage::{follower_store::FollowerStore, node_sync_store::NodeSyncStore};
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -244,7 +244,10 @@ where
})
}

pub async fn spawn_node_sync_process(self) -> JoinHandle<()> {
pub async fn spawn_node_sync_process(
self,
node_sync_store: Arc<NodeSyncStore>,
) -> JoinHandle<()> {
let active = Arc::new(self);
let committee = active.state.committee.load().deref().clone();
// nodes follow all validators to ensure they can eventually determine
Expand All @@ -253,7 +256,7 @@ where
let target_num_tasks = committee.voting_rights.len();

tokio::task::spawn(async move {
node_sync_process(&active, target_num_tasks).await;
node_sync_process(&active, target_num_tasks, node_sync_store).await;
})
}
}
26 changes: 20 additions & 6 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{
use std::future::Future;
use std::ops::Deref;
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_storage::follower_store::FollowerStore;
use sui_storage::{follower_store::FollowerStore, node_sync_store::NodeSyncStore};
use sui_types::committee::StakeUnit;
use sui_types::{
base_types::{AuthorityName, ExecutionDigests},
Expand All @@ -34,6 +34,9 @@ mod configurable_batch_action_client;
#[cfg(test)]
pub(crate) mod tests;

mod node_sync;
use node_sync::NodeSyncDigestHandler;

struct Follower<A> {
peer_name: AuthorityName,
client: SafeClient<A>,
Expand All @@ -56,15 +59,25 @@ where
follower_process(active_authority, degree, GossipDigestHandler::new()).await;
}

pub async fn node_sync_process<A>(active_authority: &ActiveAuthority<A>, degree: usize)
where
pub async fn node_sync_process<A>(
active_authority: &ActiveAuthority<A>,
degree: usize,
node_sync_store: Arc<NodeSyncStore>,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// TODO: special case follower for node sync.
follower_process(active_authority, degree, GossipDigestHandler::new()).await;
let state = active_authority.state.clone();
let aggregator = active_authority.net.load().clone();
follower_process(
active_authority,
degree,
NodeSyncDigestHandler::new(state, aggregator, node_sync_store),
)
.await;
}

async fn follower_process<A, Handler: DigestHandler<A> + Copy>(
async fn follower_process<A, Handler: DigestHandler<A> + Clone>(
active_authority: &ActiveAuthority<A>,
degree: usize,
handler: Handler,
Expand Down Expand Up @@ -128,14 +141,15 @@ async fn follower_process<A, Handler: DigestHandler<A> + Copy>(

peer_names.insert(name);
let local_active_ref_copy = local_active.clone();
let handler_clone = handler.clone();
gossip_tasks.push(async move {
let follower = Follower::new(name, &local_active_ref_copy);
// Add more duration if we make more than 1 to ensure overlap
debug!(peer = ?name, "Starting gossip from peer");
follower
.start(
Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15),
handler,
handler_clone,
)
.await
});
Expand Down
Loading

0 comments on commit cc952e3

Please sign in to comment.