Skip to content

Commit

Permalink
Use execution driver instead of sync_authority_source_to_destination …
Browse files Browse the repository at this point in the history
…for gossip (MystenLabs#3516)

* Use execution driver instead of sync_authority_source_to_destination for gossip

* Fix tests
  • Loading branch information
mystenmark authored Aug 16, 2022
1 parent d3f4690 commit adfc7bd
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 54 deletions.
57 changes: 11 additions & 46 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
authority::AuthorityState,
authority_aggregator::{AuthorityAggregator, CertificateHandler},
authority_client::AuthorityAPI,
safe_client::SafeClient,
};
use crate::{authority::AuthorityState, authority_client::AuthorityAPI, safe_client::SafeClient};
use async_trait::async_trait;
use futures::{
future::BoxFuture,
Expand Down Expand Up @@ -34,6 +29,7 @@ use sui_types::{
BatchInfoRequest, BatchInfoResponseItem, TransactionInfoRequest, TransactionInfoResponse,
},
};
use tap::TapFallible;
use tracing::{debug, error, info, trace};

#[cfg(test)]
Expand All @@ -42,8 +38,6 @@ mod configurable_batch_action_client;
#[cfg(test)]
pub(crate) mod tests;

use sui_types::messages::CertifiedTransaction;

#[derive(Copy, Clone)]
pub(crate) enum GossipType {
/// Must get the full sequence of the peers it is connecting to. This is used for the full node sync logic
Expand All @@ -59,7 +53,6 @@ pub(crate) struct Follower<A> {
state: Arc<AuthorityState>,
follower_store: Arc<FollowerStore>,
max_seq: Option<TxSequenceNumber>,
aggregator: Arc<AuthorityAggregator<A>>,
}

const REQUEST_FOLLOW_NUM_DIGESTS: u64 = 100_000;
Expand Down Expand Up @@ -287,24 +280,6 @@ async fn wait_for_one_gossip_task_to_finish<A>(
peer_names.remove(&finished_name);
}

pub struct LocalCertificateHandler {
pub state: Arc<AuthorityState>,
}

#[async_trait]
impl CertificateHandler for LocalCertificateHandler {
async fn handle(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.state.handle_certificate(certificate).await
}

fn destination_name(&self) -> String {
format!("{:?}", self.state.name)
}
}

pub async fn select_gossip_peer<A>(
my_name: AuthorityName,
peer_names: HashSet<AuthorityName>,
Expand Down Expand Up @@ -357,26 +332,18 @@ impl GossipDigestHandler {
Self { metrics }
}

async fn process_response<A>(
aggregator: Arc<AuthorityAggregator<A>>,
async fn process_response(
state: Arc<AuthorityState>,
peer_name: AuthorityName,
response: TransactionInfoResponse,
) -> Result<(), SuiError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
) -> Result<(), SuiError> {
if let Some(certificate) = response.certified_transaction {
// Process the certificate from one authority to ourselves
aggregator
.sync_authority_source_to_destination(
certificate,
peer_name,
&LocalCertificateHandler {
state: state.clone(),
},
)
.await?;
let digest = *certificate.digest();
state
.database
.add_pending_certificates(vec![(digest, Some(certificate))])
.tap_err(|e| error!(?digest, "add_pending_certificates failed: {}", e))?;

state.metrics.gossip_sync_count.inc();
Ok(())
} else {
Expand Down Expand Up @@ -404,7 +371,6 @@ where
) -> SuiResult<Self::DigestResult> {
let state = follower.state.clone();
let client = follower.client.clone();
let aggregator = follower.aggregator.clone();
let name = follower.peer_name;
Ok(Box::pin(async move {
if !state.database.effects_exists(&digest.transaction)? {
Expand All @@ -414,7 +380,7 @@ where
digest.transaction,
))
.await?;
Self::process_response(aggregator, state, name, response).await?;
Self::process_response(state, name, response).await?;
}
Ok(())
}))
Expand Down Expand Up @@ -463,7 +429,6 @@ where
state: active_authority.state.clone(),
follower_store: active_authority.follower_store.clone(),
max_seq,
aggregator: active_authority.net.load().clone(),
}
}

Expand Down
6 changes: 5 additions & 1 deletion crates/sui-core/src/authority_active/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::authority_active::gossip::configurable_batch_action_client::{
init_configurable_authorities, BatchAction, ConfigurableBatchActionClient,
};
use crate::authority_active::MAX_RETRY_DELAY_MS;
use crate::authority_aggregator::AuthorityAggregator;
use std::time::Duration;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -67,6 +68,8 @@ pub async fn test_gossip_error() {

#[tokio::test(flavor = "current_thread", start_paused = true)]
pub async fn test_gossip_after_revert() {
telemetry_subscribers::init_for_testing();

let action_sequence = vec![BatchAction::EmitUpdateItem(), BatchAction::EmitUpdateItem()];
let (net, states, digests) = init_configurable_authorities(action_sequence).await;

Expand Down Expand Up @@ -162,7 +165,8 @@ async fn start_gossip_process(
let active_state = Arc::new(
ActiveAuthority::new_with_ephemeral_storage_for_test(state, inner_net).unwrap(),
);
active_state.spawn_gossip_process(3).await;
active_state.clone().spawn_gossip_process(3).await;
active_state.spawn_execute_process().await;
});
active_authorities.push(handle);
}
Expand Down
12 changes: 5 additions & 7 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub enum ReduceOutput<S> {
}

#[async_trait]
pub trait CertificateHandler {
trait CertificateHandler {
async fn handle(&self, certificate: CertifiedTransaction)
-> SuiResult<TransactionInfoResponse>;

Expand Down Expand Up @@ -277,7 +277,7 @@ where
level = "trace",
skip_all
)]
pub async fn sync_authority_source_to_destination<CertHandler: CertificateHandler>(
async fn sync_authority_source_to_destination<CertHandler: CertificateHandler>(
&self,
cert: CertifiedTransaction,
source_authority: AuthorityName,
Expand Down Expand Up @@ -403,7 +403,7 @@ where
Ok(())
}

pub async fn sync_certificate_to_authority(
async fn sync_certificate_to_authority(
&self,
cert: CertifiedTransaction,
destination_authority: AuthorityName,
Expand All @@ -418,7 +418,7 @@ where
.await
}

pub async fn sync_certificate_to_authority_with_timeout(
async fn sync_certificate_to_authority_with_timeout(
&self,
cert: CertifiedTransaction,
destination_authority: AuthorityName,
Expand Down Expand Up @@ -449,9 +449,7 @@ where
/// stake, in order to bring the destination authority up to date to accept
/// the certificate. The time devoted to each attempt is bounded by
/// `timeout_milliseconds`.
pub async fn sync_certificate_to_authority_with_timeout_inner<
CertHandler: CertificateHandler,
>(
async fn sync_certificate_to_authority_with_timeout_inner<CertHandler: CertificateHandler>(
&self,
cert: CertifiedTransaction,
destination_authority: AuthorityName,
Expand Down

0 comments on commit adfc7bd

Please sign in to comment.