diff --git a/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs b/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs index 4e681684fd5ec..8dd1cb3dd254a 100644 --- a/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs +++ b/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs @@ -120,7 +120,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() { while let Some(t) = transactions.pop() { // Get a cert let new_certificate = sender_aggregator - .process_transaction(t.clone(), Duration::from_secs(60)) + .process_transaction(t.clone()) .await .expect("Unexpected crash"); @@ -208,7 +208,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() { while let Some(t) = transactions.pop() { // Get a cert let new_certificate = sender_aggregator - .process_transaction(t.clone(), Duration::from_secs(60)) + .process_transaction(t.clone()) .await .expect("Unexpected crash"); diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index 14c05523adfc5..a6afae55920f9 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -26,8 +26,6 @@ use sui_types::committee::StakeUnit; use tokio::sync::mpsc::Receiver; use tokio::time::timeout; -// TODO: Make timeout duration configurable. -const AUTHORITY_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); const OBJECT_DOWNLOAD_CHANNEL_BOUND: usize = 1024; pub const DEFAULT_RETRIES: usize = 4; @@ -37,6 +35,23 @@ pub mod authority_aggregator_tests; pub type AsyncResult<'a, T, E> = future::BoxFuture<'a, Result>; +#[derive(Clone)] +pub struct TimeoutConfig { + pub authority_request_timeout: Duration, + pub pre_quorum_timeout: Duration, + pub post_quorum_timeout: Duration, +} + +impl Default for TimeoutConfig { + fn default() -> Self { + Self { + authority_request_timeout: Duration::from_secs(60), + pre_quorum_timeout: Duration::from_secs(60), + post_quorum_timeout: Duration::from_secs(30), + } + } +} + #[derive(Clone)] pub struct AuthorityAggregator { /// Our Sui committee. @@ -45,10 +60,19 @@ pub struct AuthorityAggregator { pub authority_clients: BTreeMap>, // Metrics pub metrics: &'static GatewayMetrics, + pub timeouts: TimeoutConfig, } impl AuthorityAggregator { pub fn new(committee: Committee, authority_clients: BTreeMap) -> Self { + Self::new_with_timeouts(committee, authority_clients, Default::default()) + } + + pub fn new_with_timeouts( + committee: Committee, + authority_clients: BTreeMap, + timeouts: TimeoutConfig, + ) -> Self { Self { committee: committee.clone(), authority_clients: authority_clients @@ -56,6 +80,7 @@ impl AuthorityAggregator { .map(|(name, api)| (name, SafeClient::new(api, committee.clone(), name))) .collect(), metrics: &METRICS, + timeouts, } } @@ -255,6 +280,21 @@ where Ok(()) } + pub async fn sync_certificate_to_authority( + &self, + cert: ConfirmationTransaction, + destination_authority: AuthorityName, + retries: usize, + ) -> Result<(), SuiError> { + self.sync_certificate_to_authority_with_timeout( + cert, + destination_authority, + self.timeouts.authority_request_timeout, + retries, + ) + .await + } + /// Sync a certificate to an authority. /// /// This function infers which authorities have the history related to @@ -461,7 +501,6 @@ where async fn get_object_by_id( &self, object_id: ObjectID, - timeout_after_quorum: Duration, ) -> Result< ( BTreeMap< @@ -534,13 +573,13 @@ where // After we reach threshold we wait for potentially less time. Ok(ReduceOutput::ContinueWithTimeout( state, - timeout_after_quorum, + self.timeouts.post_quorum_timeout, )) } }) }, // A long timeout before we hear back from a quorum - Duration::from_secs(60), + self.timeouts.pre_quorum_timeout, ) .await?; @@ -697,7 +736,7 @@ where }) }, // A long timeout before we hear back from a quorum - Duration::from_secs(60), + self.timeouts.pre_quorum_timeout, ) .await?; Ok((final_state.object_map, final_state.responded_authorities)) @@ -708,7 +747,6 @@ where pub async fn sync_all_given_objects( &self, objects: &[ObjectID], - timeout_after_quorum: Duration, ) -> Result< ( Vec<( @@ -733,9 +771,7 @@ where .map(|(name, _)| *name) .collect(); - let (aggregate_object_info, certificates) = self - .get_object_by_id(*object_id, timeout_after_quorum) - .await?; + let (aggregate_object_info, certificates) = self.get_object_by_id(*object_id).await?; let mut aggregate_object_info: Vec<_> = aggregate_object_info.into_iter().collect(); @@ -798,10 +834,9 @@ where // NOTE: this is right now done sequentially, we should do them in parallel using // the usual FuturesUnordered. let _result = self - .sync_certificate_to_authority_with_timeout( + .sync_certificate_to_authority( ConfirmationTransaction::new(cert.clone()), name, - timeout_after_quorum, DEFAULT_RETRIES, ) .await; @@ -843,11 +878,8 @@ where let all_object_ids: HashSet<_> = object_map.keys().map(|object_ref| object_ref.0).collect(); // Then sync all the owned objects - self.sync_all_given_objects( - &all_object_ids.into_iter().collect::>(), - timeout_after_quorum, - ) - .await + self.sync_all_given_objects(&all_object_ids.into_iter().collect::>()) + .await } /// Takes a transaction, brings all authorities up to date with the versions of the @@ -855,7 +887,6 @@ where pub async fn process_transaction( &self, transaction: Transaction, - timeout_after_quorum: Duration, ) -> Result { // Find out which objects are required by this transaction and // ensure they are synced on authorities. @@ -866,9 +897,8 @@ where .map(|o| o.object_id()) .collect(); - let (_active_objects, _deleted_objects) = self - .sync_all_given_objects(&required_ids, timeout_after_quorum) - .await?; + let (_active_objects, _deleted_objects) = + self.sync_all_given_objects(&required_ids).await?; // Now broadcast the transaction to all authorities. let threshold = self.committee.quorum_threshold(); @@ -876,7 +906,6 @@ where debug!( quorum_threshold = threshold, validity_threshold = validity, - ?timeout_after_quorum, "Broadcasting transaction request to authorities" ); trace!("Transaction data: {:?}", transaction.data); @@ -1007,7 +1036,7 @@ where }) }, // A long timeout before we hear back from a quorum - Duration::from_secs(60), + self.timeouts.pre_quorum_timeout, ) .await?; @@ -1040,7 +1069,6 @@ where async fn process_certificate( &self, certificate: CertifiedTransaction, - timeout_after_quorum: Duration, ) -> Result { struct ProcessCertificateState { // Different authorities could return different effects. We want at least one effect to come @@ -1055,6 +1083,8 @@ where bad_stake: 0, }; + let timeout_after_quorum = self.timeouts.post_quorum_timeout; + let cert_ref = &certificate; let threshold = self.committee.quorum_threshold(); let validity = self.committee.validity_threshold(); @@ -1106,10 +1136,9 @@ where debug!(authority =? name, error =? res, ?timeout_after_quorum, "Authority out of date - syncing certificates"); // If we got LockErrors, we try to update the authority. self - .sync_certificate_to_authority_with_timeout( + .sync_certificate_to_authority( ConfirmationTransaction::new(cert_ref.clone()), name, - timeout_after_quorum, DEFAULT_RETRIES, ) .instrument(tracing::trace_span!("sync_cert", authority =? name)) @@ -1161,7 +1190,7 @@ where }) }, // A long timeout before we hear back from a quorum - Duration::from_secs(60), + self.timeouts.pre_quorum_timeout, ) .await?; @@ -1191,10 +1220,7 @@ where /// NOTE: This is only reliable in the synchronous model, with a sufficient timeout value. #[cfg(test)] async fn get_latest_sequence_number(&self, object_id: ObjectID) -> SequenceNumber { - let (object_infos, _certificates) = self - .get_object_by_id(object_id, Duration::from_secs(60)) - .await - .unwrap(); // Not safe, but want to blow up if testing. + let (object_infos, _certificates) = self.get_object_by_id(object_id).await.unwrap(); // Not safe, but want to blow up if testing. let top_ref = object_infos.keys().last().unwrap().0; top_ref.1 } @@ -1202,23 +1228,14 @@ where pub async fn execute_transaction( &self, transaction: &Transaction, - ) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> { - self.execute_transaction_with_timeout(transaction, Duration::from_secs(60)) - .await - } - - pub async fn execute_transaction_with_timeout( - &self, - transaction: &Transaction, - timeout: Duration, ) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> { let new_certificate = self - .process_transaction(transaction.clone(), timeout) + .process_transaction(transaction.clone()) .instrument(tracing::debug_span!("process_tx")) .await?; self.metrics.total_tx_certificates.inc(); let response = self - .process_certificate(new_certificate.clone(), timeout) + .process_certificate(new_certificate.clone()) .instrument(tracing::debug_span!("process_cert")) .await?; @@ -1226,9 +1243,7 @@ where } pub async fn get_object_info_execute(&self, object_id: ObjectID) -> SuiResult { - let (object_map, cert_map) = self - .get_object_by_id(object_id, AUTHORITY_REQUEST_TIMEOUT) - .await?; + let (object_map, cert_map) = self.get_object_by_id(object_id).await?; let mut object_ref_stack: Vec<_> = object_map.into_iter().collect(); while let Some(((obj_ref, tx_digest), (obj_option, layout_option, authorities))) = @@ -1246,10 +1261,7 @@ where } else if cert_map.contains_key(&tx_digest) { // If we have less stake telling us about the latest state of an object // we re-run the certificate on all authorities to ensure it is correct. - if let Ok(effects) = self - .process_certificate(cert_map[&tx_digest].clone(), AUTHORITY_REQUEST_TIMEOUT) - .await - { + if let Ok(effects) = self.process_certificate(cert_map[&tx_digest].clone()).await { if effects.is_object_mutated_here(obj_ref) { is_ok = true; } else { @@ -1285,7 +1297,7 @@ where tokio::spawn(Self::fetch_one_object( self.authority_clients.clone(), object_ref, - AUTHORITY_REQUEST_TIMEOUT, + self.timeouts.authority_request_timeout, sender, )); } diff --git a/crates/sui-core/src/epoch/reconfiguration.rs b/crates/sui-core/src/epoch/reconfiguration.rs index b34c73bd79fbf..953aaf101c5e4 100644 --- a/crates/sui-core/src/epoch/reconfiguration.rs +++ b/crates/sui-core/src/epoch/reconfiguration.rs @@ -124,10 +124,7 @@ where // and execute it locally. loop { if let Ok(certificate) = new_net - .process_transaction( - advance_epoch_tx.clone().to_transaction(), - Duration::from_secs(0), - ) + .process_transaction(advance_epoch_tx.clone().to_transaction()) .await { self.state diff --git a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs index 4cebe8f42f46b..2c9a29fb6b2c7 100644 --- a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs @@ -72,7 +72,15 @@ pub async fn init_local_authorities_with_genesis( states.push(client.state.clone()); clients.insert(authority_name, client); } - (AuthorityAggregator::new(committee, clients), states) + let timeouts = TimeoutConfig { + authority_request_timeout: Duration::from_secs(5), + pre_quorum_timeout: Duration::from_secs(5), + post_quorum_timeout: Duration::from_secs(5), + }; + ( + AuthorityAggregator::new_with_timeouts(committee, clients, timeouts), + states, + ) } pub fn get_local_client( @@ -341,9 +349,7 @@ async fn execute_transaction_with_fault_configs( gas_object1.compute_object_reference(), gas_object2.compute_object_reference(), ); - let cert = authorities - .process_transaction(tx, Duration::from_secs(5)) - .await?; + let cert = authorities.process_transaction(tx).await?; for client in authorities.authority_clients.values_mut() { client.authority_client_mut().fault_config.reset(); @@ -352,9 +358,7 @@ async fn execute_transaction_with_fault_configs( get_local_client(&mut authorities, *index).fault_config = *config; } - authorities - .process_certificate(cert, Duration::from_secs(5)) - .await?; + authorities.process_certificate(cert).await?; Ok(()) } @@ -720,7 +724,7 @@ async fn test_process_transaction1() { // on all of them. Note that one authority has processed cert 1, and none cert2, // and auth 3 has no seen either. authorities - .process_transaction(create2.clone(), Duration::from_secs(10)) + .process_transaction(create2.clone()) .await .unwrap(); @@ -793,10 +797,7 @@ async fn test_process_certificate() { // Test: process the certificate, including bring up to date authority 3. // which is 2 certs behind. - authorities - .process_certificate(cert2, Duration::from_secs(10)) - .await - .unwrap(); + authorities.process_certificate(cert2).await.unwrap(); // As a result, we have 2 gas objects and 1 created object. let owned_object = get_owned_objects(&authorities, addr1).await;