Skip to content

Commit

Permalink
Clarify timeout confusion in AuthorityAggregator, use config instead …
Browse files Browse the repository at this point in the history
…of params (MystenLabs#2513)

* Clarify timeout confusion in AuthorityAggregator, use config instead of params

* Remove unused timeout
  • Loading branch information
mystenmark authored Jun 11, 2022
1 parent d18aebc commit 0638aad
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
while let Some(t) = transactions.pop() {
// Get a cert
let new_certificate = sender_aggregator
.process_transaction(t.clone(), Duration::from_secs(60))
.process_transaction(t.clone())
.await
.expect("Unexpected crash");

Expand Down Expand Up @@ -208,7 +208,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
while let Some(t) = transactions.pop() {
// Get a cert
let new_certificate = sender_aggregator
.process_transaction(t.clone(), Duration::from_secs(60))
.process_transaction(t.clone())
.await
.expect("Unexpected crash");

Expand Down
112 changes: 62 additions & 50 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use sui_types::committee::StakeUnit;
use tokio::sync::mpsc::Receiver;
use tokio::time::timeout;

// TODO: Make timeout duration configurable.
const AUTHORITY_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
const OBJECT_DOWNLOAD_CHANNEL_BOUND: usize = 1024;
pub const DEFAULT_RETRIES: usize = 4;

Expand All @@ -37,6 +35,23 @@ pub mod authority_aggregator_tests;

pub type AsyncResult<'a, T, E> = future::BoxFuture<'a, Result<T, E>>;

#[derive(Clone)]
pub struct TimeoutConfig {
pub authority_request_timeout: Duration,
pub pre_quorum_timeout: Duration,
pub post_quorum_timeout: Duration,
}

impl Default for TimeoutConfig {
fn default() -> Self {
Self {
authority_request_timeout: Duration::from_secs(60),
pre_quorum_timeout: Duration::from_secs(60),
post_quorum_timeout: Duration::from_secs(30),
}
}
}

#[derive(Clone)]
pub struct AuthorityAggregator<A> {
/// Our Sui committee.
Expand All @@ -45,17 +60,27 @@ pub struct AuthorityAggregator<A> {
pub authority_clients: BTreeMap<AuthorityName, SafeClient<A>>,
// Metrics
pub metrics: &'static GatewayMetrics,
pub timeouts: TimeoutConfig,
}

impl<A> AuthorityAggregator<A> {
pub fn new(committee: Committee, authority_clients: BTreeMap<AuthorityName, A>) -> Self {
Self::new_with_timeouts(committee, authority_clients, Default::default())
}

pub fn new_with_timeouts(
committee: Committee,
authority_clients: BTreeMap<AuthorityName, A>,
timeouts: TimeoutConfig,
) -> Self {
Self {
committee: committee.clone(),
authority_clients: authority_clients
.into_iter()
.map(|(name, api)| (name, SafeClient::new(api, committee.clone(), name)))
.collect(),
metrics: &METRICS,
timeouts,
}
}

Expand Down Expand Up @@ -255,6 +280,21 @@ where
Ok(())
}

pub async fn sync_certificate_to_authority(
&self,
cert: ConfirmationTransaction,
destination_authority: AuthorityName,
retries: usize,
) -> Result<(), SuiError> {
self.sync_certificate_to_authority_with_timeout(
cert,
destination_authority,
self.timeouts.authority_request_timeout,
retries,
)
.await
}

/// Sync a certificate to an authority.
///
/// This function infers which authorities have the history related to
Expand Down Expand Up @@ -461,7 +501,6 @@ where
async fn get_object_by_id(
&self,
object_id: ObjectID,
timeout_after_quorum: Duration,
) -> Result<
(
BTreeMap<
Expand Down Expand Up @@ -534,13 +573,13 @@ where
// After we reach threshold we wait for potentially less time.
Ok(ReduceOutput::ContinueWithTimeout(
state,
timeout_after_quorum,
self.timeouts.post_quorum_timeout,
))
}
})
},
// A long timeout before we hear back from a quorum
Duration::from_secs(60),
self.timeouts.pre_quorum_timeout,
)
.await?;

Expand Down Expand Up @@ -697,7 +736,7 @@ where
})
},
// A long timeout before we hear back from a quorum
Duration::from_secs(60),
self.timeouts.pre_quorum_timeout,
)
.await?;
Ok((final_state.object_map, final_state.responded_authorities))
Expand All @@ -708,7 +747,6 @@ where
pub async fn sync_all_given_objects(
&self,
objects: &[ObjectID],
timeout_after_quorum: Duration,
) -> Result<
(
Vec<(
Expand All @@ -733,9 +771,7 @@ where
.map(|(name, _)| *name)
.collect();

let (aggregate_object_info, certificates) = self
.get_object_by_id(*object_id, timeout_after_quorum)
.await?;
let (aggregate_object_info, certificates) = self.get_object_by_id(*object_id).await?;

let mut aggregate_object_info: Vec<_> = aggregate_object_info.into_iter().collect();

Expand Down Expand Up @@ -798,10 +834,9 @@ where
// NOTE: this is right now done sequentially, we should do them in parallel using
// the usual FuturesUnordered.
let _result = self
.sync_certificate_to_authority_with_timeout(
.sync_certificate_to_authority(
ConfirmationTransaction::new(cert.clone()),
name,
timeout_after_quorum,
DEFAULT_RETRIES,
)
.await;
Expand Down Expand Up @@ -843,19 +878,15 @@ where
let all_object_ids: HashSet<_> = object_map.keys().map(|object_ref| object_ref.0).collect();

// Then sync all the owned objects
self.sync_all_given_objects(
&all_object_ids.into_iter().collect::<Vec<_>>(),
timeout_after_quorum,
)
.await
self.sync_all_given_objects(&all_object_ids.into_iter().collect::<Vec<_>>())
.await
}

/// Takes a transaction, brings all authorities up to date with the versions of the
/// objects needed, and then submits the transaction to make a certificate.
pub async fn process_transaction(
&self,
transaction: Transaction,
timeout_after_quorum: Duration,
) -> Result<CertifiedTransaction, SuiError> {
// Find out which objects are required by this transaction and
// ensure they are synced on authorities.
Expand All @@ -866,17 +897,15 @@ where
.map(|o| o.object_id())
.collect();

let (_active_objects, _deleted_objects) = self
.sync_all_given_objects(&required_ids, timeout_after_quorum)
.await?;
let (_active_objects, _deleted_objects) =
self.sync_all_given_objects(&required_ids).await?;

// Now broadcast the transaction to all authorities.
let threshold = self.committee.quorum_threshold();
let validity = self.committee.validity_threshold();
debug!(
quorum_threshold = threshold,
validity_threshold = validity,
?timeout_after_quorum,
"Broadcasting transaction request to authorities"
);
trace!("Transaction data: {:?}", transaction.data);
Expand Down Expand Up @@ -1007,7 +1036,7 @@ where
})
},
// A long timeout before we hear back from a quorum
Duration::from_secs(60),
self.timeouts.pre_quorum_timeout,
)
.await?;

Expand Down Expand Up @@ -1040,7 +1069,6 @@ where
async fn process_certificate(
&self,
certificate: CertifiedTransaction,
timeout_after_quorum: Duration,
) -> Result<TransactionEffects, SuiError> {
struct ProcessCertificateState {
// Different authorities could return different effects. We want at least one effect to come
Expand All @@ -1055,6 +1083,8 @@ where
bad_stake: 0,
};

let timeout_after_quorum = self.timeouts.post_quorum_timeout;

let cert_ref = &certificate;
let threshold = self.committee.quorum_threshold();
let validity = self.committee.validity_threshold();
Expand Down Expand Up @@ -1106,10 +1136,9 @@ where
debug!(authority =? name, error =? res, ?timeout_after_quorum, "Authority out of date - syncing certificates");
// If we got LockErrors, we try to update the authority.
self
.sync_certificate_to_authority_with_timeout(
.sync_certificate_to_authority(
ConfirmationTransaction::new(cert_ref.clone()),
name,
timeout_after_quorum,
DEFAULT_RETRIES,
)
.instrument(tracing::trace_span!("sync_cert", authority =? name))
Expand Down Expand Up @@ -1161,7 +1190,7 @@ where
})
},
// A long timeout before we hear back from a quorum
Duration::from_secs(60),
self.timeouts.pre_quorum_timeout,
)
.await?;

Expand Down Expand Up @@ -1191,44 +1220,30 @@ where
/// NOTE: This is only reliable in the synchronous model, with a sufficient timeout value.
#[cfg(test)]
async fn get_latest_sequence_number(&self, object_id: ObjectID) -> SequenceNumber {
let (object_infos, _certificates) = self
.get_object_by_id(object_id, Duration::from_secs(60))
.await
.unwrap(); // Not safe, but want to blow up if testing.
let (object_infos, _certificates) = self.get_object_by_id(object_id).await.unwrap(); // Not safe, but want to blow up if testing.
let top_ref = object_infos.keys().last().unwrap().0;
top_ref.1
}

pub async fn execute_transaction(
&self,
transaction: &Transaction,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
self.execute_transaction_with_timeout(transaction, Duration::from_secs(60))
.await
}

pub async fn execute_transaction_with_timeout(
&self,
transaction: &Transaction,
timeout: Duration,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
let new_certificate = self
.process_transaction(transaction.clone(), timeout)
.process_transaction(transaction.clone())
.instrument(tracing::debug_span!("process_tx"))
.await?;
self.metrics.total_tx_certificates.inc();
let response = self
.process_certificate(new_certificate.clone(), timeout)
.process_certificate(new_certificate.clone())
.instrument(tracing::debug_span!("process_cert"))
.await?;

Ok((new_certificate, response))
}

pub async fn get_object_info_execute(&self, object_id: ObjectID) -> SuiResult<ObjectRead> {
let (object_map, cert_map) = self
.get_object_by_id(object_id, AUTHORITY_REQUEST_TIMEOUT)
.await?;
let (object_map, cert_map) = self.get_object_by_id(object_id).await?;
let mut object_ref_stack: Vec<_> = object_map.into_iter().collect();

while let Some(((obj_ref, tx_digest), (obj_option, layout_option, authorities))) =
Expand All @@ -1246,10 +1261,7 @@ where
} else if cert_map.contains_key(&tx_digest) {
// If we have less stake telling us about the latest state of an object
// we re-run the certificate on all authorities to ensure it is correct.
if let Ok(effects) = self
.process_certificate(cert_map[&tx_digest].clone(), AUTHORITY_REQUEST_TIMEOUT)
.await
{
if let Ok(effects) = self.process_certificate(cert_map[&tx_digest].clone()).await {
if effects.is_object_mutated_here(obj_ref) {
is_ok = true;
} else {
Expand Down Expand Up @@ -1285,7 +1297,7 @@ where
tokio::spawn(Self::fetch_one_object(
self.authority_clients.clone(),
object_ref,
AUTHORITY_REQUEST_TIMEOUT,
self.timeouts.authority_request_timeout,
sender,
));
}
Expand Down
5 changes: 1 addition & 4 deletions crates/sui-core/src/epoch/reconfiguration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ where
// and execute it locally.
loop {
if let Ok(certificate) = new_net
.process_transaction(
advance_epoch_tx.clone().to_transaction(),
Duration::from_secs(0),
)
.process_transaction(advance_epoch_tx.clone().to_transaction())
.await
{
self.state
Expand Down
25 changes: 13 additions & 12 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ pub async fn init_local_authorities_with_genesis(
states.push(client.state.clone());
clients.insert(authority_name, client);
}
(AuthorityAggregator::new(committee, clients), states)
let timeouts = TimeoutConfig {
authority_request_timeout: Duration::from_secs(5),
pre_quorum_timeout: Duration::from_secs(5),
post_quorum_timeout: Duration::from_secs(5),
};
(
AuthorityAggregator::new_with_timeouts(committee, clients, timeouts),
states,
)
}

pub fn get_local_client(
Expand Down Expand Up @@ -341,9 +349,7 @@ async fn execute_transaction_with_fault_configs(
gas_object1.compute_object_reference(),
gas_object2.compute_object_reference(),
);
let cert = authorities
.process_transaction(tx, Duration::from_secs(5))
.await?;
let cert = authorities.process_transaction(tx).await?;

for client in authorities.authority_clients.values_mut() {
client.authority_client_mut().fault_config.reset();
Expand All @@ -352,9 +358,7 @@ async fn execute_transaction_with_fault_configs(
get_local_client(&mut authorities, *index).fault_config = *config;
}

authorities
.process_certificate(cert, Duration::from_secs(5))
.await?;
authorities.process_certificate(cert).await?;
Ok(())
}

Expand Down Expand Up @@ -720,7 +724,7 @@ async fn test_process_transaction1() {
// on all of them. Note that one authority has processed cert 1, and none cert2,
// and auth 3 has no seen either.
authorities
.process_transaction(create2.clone(), Duration::from_secs(10))
.process_transaction(create2.clone())
.await
.unwrap();

Expand Down Expand Up @@ -793,10 +797,7 @@ async fn test_process_certificate() {

// Test: process the certificate, including bring up to date authority 3.
// which is 2 certs behind.
authorities
.process_certificate(cert2, Duration::from_secs(10))
.await
.unwrap();
authorities.process_certificate(cert2).await.unwrap();

// As a result, we have 2 gas objects and 1 created object.
let owned_object = get_owned_objects(&authorities, addr1).await;
Expand Down

0 comments on commit 0638aad

Please sign in to comment.