Skip to content

Commit

Permalink
[QD Reconfig] 1. Refactor Quorum Driver (MystenLabs#6988)
Browse files Browse the repository at this point in the history
This PR refactors Quorum Driver (QD). The changes include:
1. `QuorumDriver(Handler)` becomes an active process to constantly
dequeue `QuorumDriverTask` to execute.
2. `QuorumDriverTask` is enqueued by the producer, e.g.
TransactionOrchestrator (TO), or other clients where QD is embedded.
3. QD has a notifier (`NotifyRead`). When submitting a transaction (`fn
submit_transaction`), a ticker is returned to the callsite which can be
awaited. In TO, this is a bit different - TO has access to the notifier,
so before it submits the transaction, it already registers the ticket,
then it calls `fn submit_transaction_no_ticket` to save a new ticket's
overhead.
4. When a client (e.g. a dapp) submits a transaction multiple times
simultaneously, only one `QuorumDriverTask` will be enqueued into QD,
although every request will be given a ticket for notification.
5. Transaction Failures: Failures are categorized into retryable
failures and unretryable failures. Retryable failures are those caused
by transient errors (usually happens between fullnode/QD and validators)
such as network partition or reconfiguration and could succeed after the
transient errors go away. Non-retryable failures usually mean the
transaction is badly constructed, for example, using the wrong object
version, insufficient gas balance etc.
6. Retry: For transient retryable failures, QD keeps retrying them until
it succeeds or surpasses maximal retry times (it's 10 in TO meaning the
total attempt times is 11 for one transaction). Retry is executed in
exponential back-off interval ( `200ms * 2^retry_times` so a transaction
times out ~3.5 minutes)
7. This PR does not make a very thorough categorization for every
possible error in the write path, which we will do in subsequent code
changes
8. Note this PR breaks the reconfigurability of load gen, because the
epoch transition error is now invisible to clients. In subsequent PRs,
we will add relevant things like `ReconfigObserver` to make it work
again.
  • Loading branch information
longbowlu authored Jan 5, 2023
1 parent b0dff03 commit c53c945
Show file tree
Hide file tree
Showing 15 changed files with 959 additions and 607 deletions.
30 changes: 4 additions & 26 deletions crates/sui-benchmark/src/drivers/bench_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use prometheus::GaugeVec;
use prometheus::HistogramVec;
use prometheus::IntCounterVec;
use prometheus::Registry;
use rand::Rng;
use tokio::sync::OnceCell;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;

use crate::drivers::driver::Driver;
Expand Down Expand Up @@ -51,8 +49,6 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[
0.01, 0.05, 0.1, 0.25, 0.5, 1., 2.5, 5., 10., 20., 30., 60., 90.,
];

const RECONFIG_QUIESCENCE_TIME_SEC: u64 = 5;

impl BenchMetrics {
fn new(registry: &Registry) -> Self {
BenchMetrics {
Expand Down Expand Up @@ -327,7 +323,6 @@ impl Driver<BenchmarkStats> for BenchDriver {
let metrics_cloned = metrics_cloned.clone();
// TODO: clone committee for each request is not ideal.
let committee_cloned = Arc::new(proxy.clone_committee());
let proxy_clone = proxy.clone();
let start = Arc::new(Instant::now());
let res = proxy
.execute_transaction(b.0.clone().into())
Expand All @@ -352,16 +347,8 @@ impl Driver<BenchmarkStats> for BenchDriver {
))
}
Err(err) => {
if err.indicates_epoch_change() {
let mut rng = rand::rngs::OsRng;
let jitter = rng.gen_range(0..RECONFIG_QUIESCENCE_TIME_SEC);
sleep(Duration::from_secs(RECONFIG_QUIESCENCE_TIME_SEC + jitter)).await;

proxy_clone.reconfig().await;
} else {
error!("{}", err);
metrics_cloned.num_error.with_label_values(&[&b.1.get_workload_type().to_string()]).inc();
}
error!("{}", err);
metrics_cloned.num_error.with_label_values(&[&b.1.get_workload_type().to_string()]).inc();
NextOp::Retry(b)
}
}
Expand All @@ -382,7 +369,6 @@ impl Driver<BenchmarkStats> for BenchDriver {
let tx = payload.make_transaction();
let start = Arc::new(Instant::now());
let metrics_cloned = metrics_cloned.clone();
let proxy_clone = proxy.clone();
// TODO: clone committee for each request is not ideal.
let committee_cloned = Arc::new(proxy.clone_committee());
let res = proxy
Expand All @@ -405,16 +391,8 @@ impl Driver<BenchmarkStats> for BenchDriver {
)))
}
Err(err) => {
if err.indicates_epoch_change() {
let mut rng = rand::rngs::OsRng;
let jitter = rng.gen_range(0..RECONFIG_QUIESCENCE_TIME_SEC);
sleep(Duration::from_secs(RECONFIG_QUIESCENCE_TIME_SEC + jitter)).await;

proxy_clone.reconfig().await;
} else {
error!("Retry due to error: {}", err);
metrics_cloned.num_error.with_label_values(&[&payload.get_workload_type().to_string()]).inc();
}
error!("Retry due to error: {}", err);
metrics_cloned.num_error.with_label_values(&[&payload.get_workload_type().to_string()]).inc();
NextOp::Retry(Box::new((tx, payload)))
}
}
Expand Down
41 changes: 17 additions & 24 deletions crates/sui-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ use sui_sdk::SuiClient;
use sui_types::{
base_types::ObjectID,
committee::{Committee, EpochId},
error::SuiError,
messages::{CertifiedTransactionEffects, QuorumDriverResponse, Transaction},
object::{Object, ObjectRead},
};
use sui_types::{
base_types::ObjectRef,
crypto::AuthorityStrongQuorumSignInfo,
messages::{ExecuteTransactionRequestType, QuorumDriverRequest},
object::Owner,
base_types::ObjectRef, crypto::AuthorityStrongQuorumSignInfo,
messages::ExecuteTransactionRequestType, object::Owner,
};
use tracing::{error, info};

Expand Down Expand Up @@ -97,7 +94,7 @@ pub trait ValidatorProxy {
async fn execute_transaction(
&self,
tx: Transaction,
) -> Result<(SuiCertifiedTransaction, ExecutionEffects), SuiError>;
) -> anyhow::Result<(SuiCertifiedTransaction, ExecutionEffects)>;

async fn reconfig(&self);

Expand All @@ -115,7 +112,8 @@ pub struct LocalValidatorAggregatorProxy {

impl LocalValidatorAggregatorProxy {
pub fn from_auth_agg(agg: Arc<AuthorityAggregator<NetworkAuthorityClient>>) -> Self {
let qd_handler = QuorumDriverHandler::new(agg, QuorumDriverMetrics::new_for_tests());
let qd_handler =
QuorumDriverHandler::new(agg, Arc::new(QuorumDriverMetrics::new_for_tests()));
let qd = qd_handler.clone_quorum_driver();
Self {
_qd_handler: qd_handler,
Expand All @@ -137,17 +135,16 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
async fn execute_transaction(
&self,
tx: Transaction,
) -> Result<(SuiCertifiedTransaction, ExecutionEffects), SuiError> {
let QuorumDriverResponse::EffectsCert(result) = self
.qd
.execute_transaction(QuorumDriverRequest {
transaction: tx.verify()?,
})
.await?;
let (tx_cert, effects_cert) = *result;
let tx_cert: SuiCertifiedTransaction = tx_cert.try_into().unwrap();
let effects = ExecutionEffects::CertifiedTransactionEffects(effects_cert.into());
Ok((tx_cert, effects))
) -> anyhow::Result<(SuiCertifiedTransaction, ExecutionEffects)> {
let ticket = self.qd.submit_transaction(tx.verify()?).await?;
let QuorumDriverResponse {
tx_cert,
effects_cert,
} = ticket.await?;
Ok((
tx_cert.try_into().unwrap(),
ExecutionEffects::CertifiedTransactionEffects(effects_cert.into()),
))
}

async fn reconfig(&self) {
Expand Down Expand Up @@ -247,7 +244,7 @@ impl ValidatorProxy for FullNodeProxy {
async fn execute_transaction(
&self,
tx: Transaction,
) -> Result<(SuiCertifiedTransaction, ExecutionEffects), SuiError> {
) -> anyhow::Result<(SuiCertifiedTransaction, ExecutionEffects)> {
let result = self
.sui_client
.quorum_driver()
Expand All @@ -256,11 +253,7 @@ impl ValidatorProxy for FullNodeProxy {
tx.verify()?,
Some(ExecuteTransactionRequestType::WaitForLocalExecution),
)
.await
// TODO make sure RpcExecuteTransactionError covers epoch change identified on FN
.map_err(|e| SuiError::RpcExecuteTransactionError {
error: e.to_string(),
})?;
.await?;
let tx_cert = result.tx_cert.unwrap();
let effects = ExecutionEffects::SuiTransactionEffects(result.effects.unwrap());
Ok((tx_cert, effects))
Expand Down
42 changes: 22 additions & 20 deletions crates/sui-core/src/quorum_driver/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge,
Registry,
};

#[derive(Clone, Debug)]
pub struct QuorumDriverMetrics {
pub(crate) total_requests_wait_for_effects_cert: IntCounter,
pub(crate) total_ok_responses_wait_for_effects_cert: IntCounter,

pub(crate) latency_sec_wait_for_effects_cert: Histogram,
pub(crate) total_requests: IntCounter,
pub(crate) total_enqueued: IntCounter,
pub(crate) total_ok_responses: IntCounter,
pub(crate) total_err_responses: IntCounter,

// TODO: add histogram of attempt that tx succeeds
pub(crate) current_requests_in_flight: IntGauge,

pub(crate) total_err_process_tx_responses_with_nonzero_conflicting_transactions: IntCounter,
Expand All @@ -24,29 +25,30 @@ pub struct QuorumDriverMetrics {
pub(crate) total_equivocation_detected: IntCounter,
}

const LATENCY_SEC_BUCKETS: &[f64] = &[
0.01, 0.05, 0.1, 0.25, 0.5, 1., 2., 4., 6., 8., 10., 20., 30., 60., 90.,
];

impl QuorumDriverMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
total_requests_wait_for_effects_cert: register_int_counter_with_registry!(
"quorum_driver_total_requests_wait_for_effects_cert",
"Total number of wait_for_effects_cert requests received",
total_requests: register_int_counter_with_registry!(
"quorum_driver_total_requests",
"Total number of requests received",
registry,
)
.unwrap(),
total_enqueued: register_int_counter_with_registry!(
"quorum_driver_total_enqueued",
"Total number of requests enqueued",
registry,
)
.unwrap(),
total_ok_responses_wait_for_effects_cert: register_int_counter_with_registry!(
"quorum_driver_total_ok_responses_wait_for_effects_cert",
"Total number of wait_for_effects_cert requests processed with Ok responses",
total_ok_responses: register_int_counter_with_registry!(
"quorum_driver_total_ok_responses",
"Total number of requests processed with Ok responses",
registry,
)
.unwrap(),
latency_sec_wait_for_effects_cert: register_histogram_with_registry!(
"quorum_driver_latency_sec_wait_for_effects_cert",
"Latency of processing an wait_for_effects_cert execution request, in sec",
LATENCY_SEC_BUCKETS.to_vec(),
total_err_responses: register_int_counter_with_registry!(
"quorum_driver_total_err_responses",
"Total number of requests processed with Err responses",
registry,
)
.unwrap(),
Expand Down
Loading

0 comments on commit c53c945

Please sign in to comment.