Skip to content

Commit

Permalink
Slight gossip refactoring to support a different process for full nod…
Browse files Browse the repository at this point in the history
…e sync (MystenLabs#2526)

* Refactor follower implementation to support cases other than gossip

* Stub of node_sync_process()
  • Loading branch information
mystenmark authored Jun 15, 2022
1 parent ca37ecc commit 8ceb58b
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 44 deletions.
24 changes: 21 additions & 3 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
use arc_swap::ArcSwap;
use std::{
collections::{BTreeMap, HashMap},
ops::Deref,
sync::Arc,
time::Duration,
};
Expand All @@ -48,7 +49,7 @@ use crate::{
use tokio::time::Instant;

pub mod gossip;
use gossip::gossip_process;
use gossip::{gossip_process, node_sync_process};

pub mod checkpoint_driver;
use checkpoint_driver::checkpoint_process;
Expand Down Expand Up @@ -233,9 +234,26 @@ where
pub async fn spawn_gossip_process(self, degree: usize) -> JoinHandle<()> {
let active = Arc::new(self);

let gossip_locals = active;
// Number of tasks at most "degree" and no more than committee - 1
// (validators do not follow themselves for gossip)
let committee = active.state.committee.load().deref().clone();
let target_num_tasks = usize::min(committee.voting_rights.len() - 1, degree);

tokio::task::spawn(async move {
gossip_process(&active, target_num_tasks).await;
})
}

pub async fn spawn_node_sync_process(self) -> JoinHandle<()> {
let active = Arc::new(self);
let committee = active.state.committee.load().deref().clone();
// nodes follow all validators to ensure they can eventually determine
// finality of certs. We need to follow 2f+1 _honest_ validators to
// eventually find finality, therefore we must follow all validators.
let target_num_tasks = committee.voting_rights.len();

tokio::task::spawn(async move {
gossip_process(&gossip_locals, degree).await;
node_sync_process(&active, target_num_tasks).await;
})
}
}
151 changes: 110 additions & 41 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ use crate::{
safe_client::SafeClient,
};
use async_trait::async_trait;
use futures::stream::FuturesOrdered;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{
stream::{FuturesOrdered, FuturesUnordered},
StreamExt,
};
use std::future::Future;
use std::ops::Deref;
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_storage::follower_store::FollowerStore;
use sui_types::committee::StakeUnit;
use sui_types::{
base_types::AuthorityName,
base_types::{AuthorityName, ExecutionDigests},
batch::{TxSequenceNumber, UpdateItem},
error::{SuiError, SuiResult},
messages::{
Expand All @@ -32,7 +34,7 @@ mod configurable_batch_action_client;
#[cfg(test)]
pub(crate) mod tests;

struct PeerGossip<A> {
struct Follower<A> {
peer_name: AuthorityName,
client: SafeClient<A>,
state: Arc<AuthorityState>,
Expand All @@ -50,6 +52,24 @@ use super::ActiveAuthority;
pub async fn gossip_process<A>(active_authority: &ActiveAuthority<A>, degree: usize)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
follower_process(active_authority, degree, GossipDigestHandler::new()).await;
}

pub async fn node_sync_process<A>(active_authority: &ActiveAuthority<A>, degree: usize)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// TODO: special case follower for node sync.
follower_process(active_authority, degree, GossipDigestHandler::new()).await;
}

async fn follower_process<A, Handler: DigestHandler<A> + Copy>(
active_authority: &ActiveAuthority<A>,
degree: usize,
handler: Handler,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Make a clone of the active authority and committee, and keep using it until epoch changes.
let mut local_active = Arc::new(active_authority.clone());
Expand Down Expand Up @@ -109,11 +129,14 @@ where
peer_names.insert(name);
let local_active_ref_copy = local_active.clone();
gossip_tasks.push(async move {
let peer_gossip = PeerGossip::new(name, &local_active_ref_copy);
let follower = Follower::new(name, &local_active_ref_copy);
// Add more duration if we make more than 1 to ensure overlap
debug!(peer = ?name, "Starting gossip from peer");
peer_gossip
.start(Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15))
follower
.start(
Duration::from_secs(REFRESH_FOLLOWER_PERIOD_SECS + k * 15),
handler,
)
.await
});
k += 1;
Expand Down Expand Up @@ -208,11 +231,77 @@ where
})
}

impl<A> PeerGossip<A>
#[async_trait]
trait DigestHandler<A> {
async fn handle_digest(&self, follower: &Follower<A>, digest: ExecutionDigests) -> SuiResult;
}

#[derive(Clone, Copy)]
struct GossipDigestHandler {}

impl GossipDigestHandler {
fn new() -> Self {
Self {}
}

async fn process_response<A>(
follower: &Follower<A>,
response: TransactionInfoResponse,
) -> Result<(), SuiError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
if let Some(certificate) = response.certified_transaction {
// Process the certificate from one authority to ourselves
follower
.aggregator
.sync_authority_source_to_destination(
ConfirmationTransaction { certificate },
follower.peer_name,
LocalConfirmationTransactionHandler {
state: follower.state.clone(),
},
)
.await?;
follower.state.metrics.gossip_sync_count.inc();
Ok(())
} else {
// The authority did not return the certificate, despite returning info
// But it should know the certificate!
Err(SuiError::ByzantineAuthoritySuspicion {
authority: follower.peer_name,
})
}
}
}

#[async_trait]
impl<A> DigestHandler<A> for GossipDigestHandler
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub fn new(peer_name: AuthorityName, active_authority: &ActiveAuthority<A>) -> PeerGossip<A> {
async fn handle_digest(&self, follower: &Follower<A>, digest: ExecutionDigests) -> SuiResult {
if !follower
.state
.database
.effects_exists(&digest.transaction)?
{
// Download the certificate
let response = follower
.client
.handle_transaction_info_request(TransactionInfoRequest::from(digest.transaction))
.await?;
Self::process_response(follower, response).await?;
}
Ok(())
}
}

impl<A> Follower<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub fn new(peer_name: AuthorityName, active_authority: &ActiveAuthority<A>) -> Self {
// TODO: for validator gossip, we should always use None as the start_seq, but we should
// consult the start_seq we retrieved from the db to make sure that the peer is giving
// us new txes.
Expand All @@ -239,13 +328,21 @@ where
}
}

pub async fn start(mut self, duration: Duration) -> (AuthorityName, Result<(), SuiError>) {
pub async fn start<Handler: DigestHandler<A>>(
self,
duration: Duration,
handler: Handler,
) -> (AuthorityName, Result<(), SuiError>) {
let peer_name = self.peer_name;
let result = self.peer_gossip_for_duration(duration).await;
let result = self.follow_peer_for_duration(duration, handler).await;
(peer_name, result)
}

async fn peer_gossip_for_duration(&mut self, duration: Duration) -> Result<(), SuiError> {
async fn follow_peer_for_duration<'a, Handler: DigestHandler<A>>(
&self,
duration: Duration,
handler: Handler,
) -> SuiResult {
// Global timeout, we do not exceed this time in this task.
let mut timeout = Box::pin(tokio::time::sleep(duration));
let mut queue = FuturesOrdered::new();
Expand Down Expand Up @@ -307,38 +404,10 @@ where
}
},
digest = &mut queue.next() , if !queue.is_empty() => {
let digest = digest.unwrap();
if !self.state.database.effects_exists(&digest.transaction)? {
// Download the certificate
let response = self.client.handle_transaction_info_request(TransactionInfoRequest::from(digest.transaction)).await?;
self.process_response(response).await?;
}
handler.handle_digest(self, digest.unwrap()).await?;
}
};
}
Ok(())
}

async fn process_response(&self, response: TransactionInfoResponse) -> Result<(), SuiError> {
if let Some(certificate) = response.certified_transaction {
// Process the certificate from one authority to ourselves
self.aggregator
.sync_authority_source_to_destination(
ConfirmationTransaction { certificate },
self.peer_name,
LocalConfirmationTransactionHandler {
state: self.state.clone(),
},
)
.await?;
self.state.metrics.gossip_sync_count.inc();
Ok(())
} else {
// The authority did not return the certificate, despite returning info
// But it should know the certificate!
Err(SuiError::ByzantineAuthoritySuspicion {
authority: self.peer_name,
})
}
}
}

0 comments on commit 8ceb58b

Please sign in to comment.