Skip to content

Commit

Permalink
Beginning of code to fetch single, digest-authenticated objects in a …
Browse files Browse the repository at this point in the history
…byzantine tolerate way (MystenLabs#2822)
  • Loading branch information
mystenmark authored Jun 30, 2022
1 parent 00e76ed commit 992af87
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 8 deletions.
11 changes: 3 additions & 8 deletions crates/sui-core/src/authority_active/gossip/node_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,10 @@ where
) -> SuiResult {
let digest = digests.transaction;

// TODO: Add a function to AuthorityAggregator to try multiple validators - even
// though we are fetching the cert/effects from the same validator that sent us the tx
// digest, and even though we know the cert is final, any given validator may be byzantine
// and refuse to give us the cert and effects.
let client = aggregator.clone_client(&peer);
let resp = client
// TODO: should we suggest that we try peer first?
let resp = aggregator
.handle_transaction_and_effects_info_request(digests)
.await
.expect("TODO: need to use authority aggregator to download cert");
.await?;

let cert = resp.certified_transaction.ok_or_else(|| {
info!(?digest, ?peer, "validator did not return cert");
Expand Down
66 changes: 66 additions & 0 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ pub type AsyncResult<'a, T, E> = future::BoxFuture<'a, Result<T, E>>;

#[derive(Clone)]
pub struct TimeoutConfig {
// Timeout used when making many concurrent requests - ok if it is large because a slow
// authority won't block other authorities from being contacted.
pub authority_request_timeout: Duration,
pub pre_quorum_timeout: Duration,
pub post_quorum_timeout: Duration,

// Timeout used when making serial requests. Should be smaller, since we wait to hear from each
// authority before continuing.
pub serial_authority_request_timeout: Duration,
}

impl Default for TimeoutConfig {
Expand All @@ -48,6 +54,7 @@ impl Default for TimeoutConfig {
authority_request_timeout: Duration::from_secs(60),
pre_quorum_timeout: Duration::from_secs(60),
post_quorum_timeout: Duration::from_secs(30),
serial_authority_request_timeout: Duration::from_secs(5),
}
}
}
Expand Down Expand Up @@ -517,6 +524,47 @@ where
Ok(accumulated_state)
}

/// Like quorum_map_then_reduce_with_timeout, but for things that need only a single
/// successful response, such as fetching a Transaction from some authority.
/// This is intended for cases in which byzantine authorities can time out or slow-loris, but
/// can't give a false answer, because e.g. the digest of the response is known, or a
/// quorum-signed object such as a checkpoint has been requested.
pub(crate) async fn quorum_once_with_timeout<'a, S, FMap>(
&'a self,
// The async function used to apply to each authority. It takes an authority name,
// and authority client parameter and returns a Result<V>.
map_each_authority: FMap,
timeout_each_authority: Duration,
) -> Result<S, SuiError>
where
FMap: Fn(AuthorityName, &'a SafeClient<A>) -> AsyncResult<'a, S, SuiError>,
{
let authorities_shuffled = self.committee.shuffle_by_stake();

let mut authority_errors: Vec<(AuthorityName, SuiError)> = Vec::new();

// TODO: possibly increase concurrency after first failure to reduce latency.
for name in authorities_shuffled {
let client = &self.authority_clients[name];

let res = timeout(timeout_each_authority, map_each_authority(*name, client)).await;

match res {
// timeout
Err(_) => authority_errors.push((*name, SuiError::TimeoutError)),
// request completed
Ok(inner_res) => match inner_res {
Err(e) => authority_errors.push((*name, e)),
Ok(_) => return inner_res,
},
}
}

Err(SuiError::TooManyIncorrectAuthorities {
errors: authority_errors,
})
}

/// Return all the information in the network regarding the latest state of a specific object.
/// For each authority queried, we obtain the latest object state along with the certificate that
/// lead up to that state. The results from each authority are aggreated for the return.
Expand Down Expand Up @@ -1409,4 +1457,22 @@ where
.await
.expect("Cannot send object on channel after object fetch attempt");
}

pub async fn handle_transaction_and_effects_info_request(
&self,
digests: &ExecutionDigests,
) -> Result<TransactionInfoResponse, SuiError> {
self.quorum_once_with_timeout(
|_name, client| {
Box::pin(async move {
client
.handle_transaction_and_effects_info_request(digests)
.await
})
},
// A long timeout before we hear back from a quorum
self.timeouts.serial_authority_request_timeout,
)
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub async fn init_local_authorities_with_genesis(
authority_request_timeout: Duration::from_secs(5),
pre_quorum_timeout: Duration::from_secs(5),
post_quorum_timeout: Duration::from_secs(5),
serial_authority_request_timeout: Duration::from_secs(1),
};
(
AuthorityAggregator::new_with_timeouts(
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ pub enum SuiError {
#[error("Unable to communicate with the Quorum Driver channel: {:?}", error)]
QuorumDriverCommunicationError { error: String },

#[error("Operation timed out")]
TimeoutError,

#[error("Error executing {0}")]
ExecutionError(String),

Expand Down

0 comments on commit 992af87

Please sign in to comment.