Skip to content

Commit

Permalink
Improve sampling of consensus submission latency (MystenLabs#17961)
Browse files Browse the repository at this point in the history
## Description 

1. Do not report latency from sampler until enough samples are taken.
This avoids issues with wildly off latency estimates at the start of an
epoch.
2. Reduce max submission delay step to 10s, which is still significantly
larger than consensus latency.
3. Do not report latency from system transactions that do no require
quorum to have formed. These transactions can distort the latency
estimate, their submission attempts start before a consensus quorum has
formed.
4. Increase expected latency samples to 128.

## Test plan 

CI
Private testnet

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
mwtian authored May 30, 2024
1 parent 078903c commit ff78dd0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 28 deletions.
55 changes: 35 additions & 20 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ pub mod consensus_tests;

const SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS: &[f64] = &[
0.1, 0.25, 0.5, 0.75, 1., 1.25, 1.5, 1.75, 2., 2.25, 2.5, 2.75, 3., 4., 5., 6., 7., 10., 15.,
20., 25., 30., 60.,
20., 25., 30., 60., 90., 120., 150., 180., 210., 240., 270., 300.,
];

const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[0., 1., 2., 3., 5., 10.];
const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[
0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200.,
];

pub struct ConsensusAdapterMetrics {
// Certificate sequencing metrics
Expand Down Expand Up @@ -416,14 +418,16 @@ impl ConsensusAdapter {
let (position, positions_moved, preceding_disconnected) =
self.submission_position(committee, tx_digest);

const MAX_LATENCY: Duration = Duration::from_secs(5 * 60);
const DEFAULT_LATENCY: Duration = Duration::from_secs(3); // > p50 consensus latency with global deployment
const MIN_LATENCY: Duration = Duration::from_millis(150);
const MAX_LATENCY: Duration = Duration::from_secs(10);

let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY);
self.metrics
.sequencing_estimated_latency
.set(latency.as_millis() as i64);

let latency = std::cmp::max(latency, DEFAULT_LATENCY);
let latency = std::cmp::max(latency, MIN_LATENCY);
let latency = std::cmp::min(latency, MAX_LATENCY);
let latency = latency * 2;
let latency = self.override_by_throughput_profiler(position, latency);
Expand Down Expand Up @@ -719,7 +723,7 @@ impl ConsensusAdapter {
let (await_submit, position, positions_moved, preceding_disconnected) =
self.await_submit_delay(epoch_store.committee(), &transactions[..]);

let mut guard = InflightDropGuard::acquire(&self, tx_type.to_string());
let mut guard = InflightDropGuard::acquire(&self, tx_type);
let processed_waiter = tokio::select! {
// We need to wait for some delay until we submit transaction to the consensus
_ = await_submit => Some(processed_waiter),
Expand Down Expand Up @@ -1022,24 +1026,24 @@ struct InflightDropGuard<'a> {
position: Option<usize>,
positions_moved: Option<usize>,
preceding_disconnected: Option<usize>,
tx_type: String,
tx_type: &'static str,
}

impl<'a> InflightDropGuard<'a> {
pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: String) -> Self {
let inflight = adapter
pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self {
adapter
.num_inflight_transactions
.fetch_add(1, Ordering::SeqCst);
adapter
.metrics
.sequencing_certificate_attempt
.sequencing_certificate_inflight
.with_label_values(&[&tx_type])
.inc();
adapter
.metrics
.sequencing_certificate_inflight
.sequencing_certificate_attempt
.with_label_values(&[&tx_type])
.set(inflight as i64);
.inc();
Self {
adapter,
start: Instant::now(),
Expand All @@ -1053,16 +1057,14 @@ impl<'a> InflightDropGuard<'a> {

impl<'a> Drop for InflightDropGuard<'a> {
fn drop(&mut self) {
let inflight = self
.adapter
self.adapter
.num_inflight_transactions
.fetch_sub(1, Ordering::SeqCst);
// Store the latest latency
self.adapter
.metrics
.sequencing_certificate_inflight
.with_label_values(&[&self.tx_type])
.set(inflight as i64);
.with_label_values(&[self.tx_type])
.dec();

let position = if let Some(position) = self.position {
self.adapter
Expand All @@ -1089,14 +1091,27 @@ impl<'a> Drop for InflightDropGuard<'a> {
};

let latency = self.start.elapsed();
if self.position == Some(0) {
self.adapter.latency_observer.report(latency);
}
self.adapter
.metrics
.sequencing_certificate_latency
.with_label_values(&[&position, &self.tx_type])
.with_label_values(&[&position, self.tx_type])
.observe(latency.as_secs_f64());

// Only sample latency after consensus quorum is up. Otherwise, the wait for consensus
// quorum at the beginning of an epoch can distort the sampled latencies.
// Technically there are more system transaction types that can be included in samples
// after the first consensus commit, but this set of types should be enough.
if self.position == Some(0) {
// Transaction types below require quorum existed in the current epoch.
// TODO: refactor tx_type to enum.
let sampled = matches!(
self.tx_type,
"shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle"
);
if sampled {
self.adapter.latency_observer.report(latency);
}
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions crates/sui-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ impl LatencyObserver {
}

pub fn report(&self, latency: Duration) {
const MAX_SAMPLES: usize = 64;
const EXPECTED_SAMPLES: usize = 128;
let mut data = self.data.lock();
data.points.push_back(latency);
data.sum += latency;
if data.points.len() >= MAX_SAMPLES {
if data.points.len() < EXPECTED_SAMPLES {
// Do not initialize average latency until there are enough samples.
return;
}
while data.points.len() > EXPECTED_SAMPLES {
let pop = data.points.pop_front().expect("data vector is not empty");
data.sum -= pop; // This does not overflow because of how running sum is calculated
data.sum -= pop; // This does not underflow because of how running sum is calculated
}
let latency = data.sum.as_millis() as u64 / data.points.len() as u64;
self.latency_ms.store(latency, Ordering::Relaxed);
Expand All @@ -44,7 +48,7 @@ impl LatencyObserver {
pub fn latency(&self) -> Option<Duration> {
let latency = self.latency_ms.load(Ordering::Relaxed);
if latency == u64::MAX {
// Not initialized yet (0 data points)
// Not initialized yet (not enough data points)
None
} else {
Some(Duration::from_millis(latency))
Expand Down
12 changes: 8 additions & 4 deletions crates/sui-core/src/overload_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,16 @@ mod tests {
.build()
.await;

// Initialize latency reporter.
for _ in 0..1000 {
state
.metrics
.execution_queueing_latency
.report(Duration::from_secs(20));
}

// Creates a simple case to see if authority state overload_info can be updated
// correctly by check_authority_overload.
state
.metrics
.execution_queueing_latency
.report(Duration::from_secs(20));
let authority = Arc::downgrade(&state);
assert!(check_authority_overload(&authority, &config));
assert!(state.overload_info.is_overload.load(Ordering::Relaxed));
Expand Down

0 comments on commit ff78dd0

Please sign in to comment.