Skip to content

Commit

Permalink
Improve latency when one or more validators is unhealthy (MystenLabs#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark authored Jul 26, 2022
1 parent 7b70ae0 commit 0132bef
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 20 deletions.
113 changes: 97 additions & 16 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::authority_client::AuthorityAPI;
use crate::safe_client::SafeClient;
use async_trait::async_trait;

use futures::{future, StreamExt};
use futures::{future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use move_core_types::value::MoveStructLayout;
use sui_types::crypto::AuthoritySignature;
use sui_types::object::{Object, ObjectFormatOptions, ObjectRead};
Expand Down Expand Up @@ -41,7 +41,7 @@ pub const DEFAULT_RETRIES: usize = 4;
#[path = "unit_tests/authority_aggregator_tests.rs"]
pub mod authority_aggregator_tests;

pub type AsyncResult<'a, T, E> = future::BoxFuture<'a, Result<T, E>>;
pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;

#[derive(Clone)]
pub struct TimeoutConfig {
Expand All @@ -54,6 +54,16 @@ pub struct TimeoutConfig {
// Timeout used when making serial requests. Should be smaller, since we wait to hear from each
// authority before continuing.
pub serial_authority_request_timeout: Duration,

// Timeout used to determine when to start a second "serial" request for
// quorum_once_with_timeout. This is a latency optimization that prevents us from having
// to wait an entire serial_authority_request_timeout interval before starting a second
// request.
//
// If this is set to zero, then quorum_once_with_timeout becomes completely parallelized - if
// it is set to a value greater than serial_authority_request_timeout then it becomes
// completely serial.
pub serial_authority_request_interval: Duration,
}

impl Default for TimeoutConfig {
Expand All @@ -63,6 +73,7 @@ impl Default for TimeoutConfig {
pre_quorum_timeout: Duration::from_secs(60),
post_quorum_timeout: Duration::from_secs(30),
serial_authority_request_timeout: Duration::from_secs(5),
serial_authority_request_interval: Duration::from_millis(1000),
}
}
}
Expand Down Expand Up @@ -633,29 +644,99 @@ where
authority_errors: &mut HashMap<AuthorityName, SuiError>,
) -> Result<S, SuiError>
where
FMap: Fn(AuthorityName, &'a SafeClient<A>) -> AsyncResult<'a, S, SuiError>,
FMap: Fn(AuthorityName, SafeClient<A>) -> AsyncResult<'a, S, SuiError> + Send + Clone + 'a,
S: Send,
{
let start = tokio::time::Instant::now();
let mut delay = Duration::from_secs(1);
loop {
let authorities_shuffled = self.committee.shuffle_by_stake(preferences, restrict_to);
let mut authorities_shuffled = authorities_shuffled.iter();

type RequestResult<S> = Result<Result<S, SuiError>, tokio::time::error::Elapsed>;

enum Event<S> {
StartNext,
Request(AuthorityName, RequestResult<S>),
}

// TODO: possibly increase concurrency after first failure to reduce latency.
for name in authorities_shuffled {
let client = &self.authority_clients[&name];
let mut futures = FuturesUnordered::<BoxFuture<'a, Event<S>>>::new();

let res = timeout(timeout_each_authority, map_each_authority(name, client)).await;
let start_req = |name: AuthorityName, client: SafeClient<A>| {
let map_each_authority = map_each_authority.clone();
Box::pin(async move {
trace!(?name, now = ?tokio::time::Instant::now() - start, "new request");
let map = map_each_authority(name, client);
Event::Request(name, timeout(timeout_each_authority, map).await)
})
};

let schedule_next = || {
let delay = self.timeouts.serial_authority_request_interval;
Box::pin(async move {
sleep(delay).await;
Event::StartNext
})
};

// This process is intended to minimize latency in the face of unreliable authorities,
// without creating undue load on authorities.
//
// The fastest possible process from the
// client's point of view would simply be to issue a concurrent request to every
// authority and then take the winner - this would create unnecessary load on
// authorities.
//
// The most efficient process from the network's point of view is to do one request at
// a time, however if the first validator that the client contacts is unavailable or
// slow, the client must wait for the serial_authority_request_timeout period to elapse
// before starting its next request.
//
// So, this process is designed as a compromise between these two extremes.
// - We start one request, and schedule another request to begin after
// serial_authority_request_interval.
// - Whenever a request finishes, if it succeeded, we return. if it failed, we start a
// new request.
// - If serial_authority_request_interval elapses, we begin a new request even if the
// previous one is not finished, and schedule another future request.

let name = authorities_shuffled.next().unwrap();
futures.push(start_req(*name, self.authority_clients[name].clone()));
futures.push(schedule_next());

while let Some(res) = futures.next().await {
match res {
// timeout
Err(_) => authority_errors.insert(name, SuiError::TimeoutError),
// request completed
Ok(inner_res) => match inner_res {
Err(e) => authority_errors.insert(name, e),
Ok(res) => return Ok(res),
},
};
Event::StartNext => {
trace!(now = ?tokio::time::Instant::now() - start, "eagerly beginning next request");
}
Event::Request(name, res) => {
match res {
// timeout
Err(_) => {
debug!(?name, "authority request timed out");
authority_errors.insert(name, SuiError::TimeoutError);
}
// request completed
Ok(inner_res) => {
trace!(?name, now = ?tokio::time::Instant::now() - start,
"request completed successfully");
match inner_res {
Err(e) => authority_errors.insert(name, e),
Ok(res) => return Ok(res),
};
}
};
}
}

if let Some(next_authority) = authorities_shuffled.next() {
futures.push(start_req(
*next_authority,
self.authority_clients[next_authority].clone(),
));
}
}

info!(
?authority_errors,
"quorum_once_with_timeout failed on all authorities, retrying in {:?}", delay
Expand Down Expand Up @@ -684,7 +765,7 @@ where
timeout_total: Option<Duration>,
) -> Result<S, SuiError>
where
FMap: Fn(AuthorityName, &'a SafeClient<A>) -> AsyncResult<'a, S, SuiError>,
FMap: Fn(AuthorityName, SafeClient<A>) -> AsyncResult<'a, S, SuiError> + Send + Clone + 'a,
S: Send,
{
let mut authority_errors = HashMap::new();
Expand Down
155 changes: 151 additions & 4 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use move_core_types::{account_address::AccountAddress, ident_str};
use signature::Signer;
Expand All @@ -12,13 +12,17 @@ use sui_config::ValidatorInfo;
use sui_types::crypto::{get_key_pair, PublicKeyBytes};
use sui_types::crypto::{KeyPair, KeypairTraits, Signature};

use sui_types::messages::Transaction;
use sui_types::messages::*;
use sui_types::object::{Object, GAS_VALUE_FOR_TESTING};

use super::*;
use crate::authority::AuthorityState;
use crate::authority_client::LocalAuthorityClient;
use crate::authority_client::LocalAuthorityClientFaultConfig;
use crate::authority_client::{
AuthorityAPI, BatchInfoResponseItemStream, LocalAuthorityClient,
LocalAuthorityClientFaultConfig,
};

use tokio::time::Instant;

pub async fn init_local_authorities(
committee_size: usize,
Expand Down Expand Up @@ -80,6 +84,7 @@ pub async fn init_local_authorities_with_genesis(
pre_quorum_timeout: Duration::from_secs(5),
post_quorum_timeout: Duration::from_secs(5),
serial_authority_request_timeout: Duration::from_secs(1),
serial_authority_request_interval: Duration::from_secs(1),
};
(
AuthorityAggregator::new_with_timeouts(
Expand Down Expand Up @@ -912,3 +917,145 @@ async fn test_process_transaction_fault_fail() {
.await
.is_err());
}

#[tokio::test(start_paused = true)]
async fn test_quorum_once_with_timeout() {
telemetry_subscribers::init_for_testing();

#[derive(Clone)]
struct MockAuthorityApi {
delay: Duration,
count: Arc<Mutex<u32>>,
}

#[async_trait]
impl AuthorityAPI for MockAuthorityApi {
/// Initiate a new transaction to a Sui or Primary account.
async fn handle_transaction(
&self,
_transaction: Transaction,
) -> Result<TransactionInfoResponse, SuiError> {
unreachable!();
}

/// Execute a certificate.
async fn handle_certificate(
&self,
_certificate: CertifiedTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
unreachable!()
}

/// Handle Account information requests for this account.
async fn handle_account_info_request(
&self,
_request: AccountInfoRequest,
) -> Result<AccountInfoResponse, SuiError> {
unreachable!();
}

/// Handle Object information requests for this account.
async fn handle_object_info_request(
&self,
_request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, SuiError> {
unreachable!();
}

/// Handle Object information requests for this account.
async fn handle_transaction_info_request(
&self,
_request: TransactionInfoRequest,
) -> Result<TransactionInfoResponse, SuiError> {
let count = {
let mut count = self.count.lock().unwrap();
*count += 1;
*count
};

// timeout until the 15th request
if count < 15 {
tokio::time::sleep(self.delay).await;
}

let res = TransactionInfoResponse {
signed_transaction: None,
certified_transaction: None,
signed_effects: None,
};
Ok(res)
}

async fn handle_batch_stream(
&self,
_request: BatchInfoRequest,
) -> Result<BatchInfoResponseItemStream, SuiError> {
unreachable!();
}

async fn handle_checkpoint(
&self,
_request: CheckpointRequest,
) -> Result<CheckpointResponse, SuiError> {
unreachable!();
}
}

let count = Arc::new(Mutex::new(0));

let new_client = |delay: u64| {
let delay = Duration::from_millis(delay);
let count = count.clone();
MockAuthorityApi { delay, count }
};

let mut authorities = BTreeMap::new();
let mut clients = BTreeMap::new();
for _ in 0..30 {
let (_, sec) = get_key_pair();
let name: AuthorityName = sec.public().into();
authorities.insert(name, 1);
clients.insert(name, new_client(1000));
}

let committee = Committee::new(0, authorities).unwrap();

let agg = AuthorityAggregator::new_with_timeouts(
committee,
clients,
AuthAggMetrics::new_for_tests(),
TimeoutConfig {
serial_authority_request_interval: Duration::from_millis(50),
..Default::default()
},
);

let log = Arc::new(Mutex::new(Vec::new()));
let start = Instant::now();
agg.quorum_once_with_timeout(
None,
None,
|_name, client| {
let digest = TransactionDigest::new([0u8; 32]);
let log = log.clone();
Box::pin(async move {
// log the start time of the request
log.lock().unwrap().push(Instant::now() - start);
client.handle_transaction_info_request(digest.into()).await
})
},
Duration::from_millis(100),
Some(Duration::from_millis(30 * 50)),
)
.await
.unwrap();

// New requests are started every 50ms even though each request times out individually.
// The 15th request succeeds, and we exit before processing the remaining authorities.
assert_eq!(
log.lock().unwrap().clone(),
(0..15)
.map(|d| Duration::from_millis(d * 50))
.collect::<Vec<Duration>>()
);
}

0 comments on commit 0132bef

Please sign in to comment.