diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index b4e6f16c8c519..587d1521d289d 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -63,10 +63,12 @@ pub mod consensus_tests; const SEQUENCING_CERTIFICATE_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.1, 0.25, 0.5, 0.75, 1., 1.25, 1.5, 1.75, 2., 2.25, 2.5, 2.75, 3., 4., 5., 6., 7., 10., 15., - 20., 25., 30., 60., + 20., 25., 30., 60., 90., 120., 150., 180., 210., 240., 270., 300., ]; -const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[0., 1., 2., 3., 5., 10.]; +const SEQUENCING_CERTIFICATE_POSITION_BUCKETS: &[f64] = &[ + 0., 1., 2., 3., 5., 10., 15., 20., 25., 30., 50., 100., 150., 200., +]; pub struct ConsensusAdapterMetrics { // Certificate sequencing metrics @@ -416,14 +418,16 @@ impl ConsensusAdapter { let (position, positions_moved, preceding_disconnected) = self.submission_position(committee, tx_digest); - const MAX_LATENCY: Duration = Duration::from_secs(5 * 60); const DEFAULT_LATENCY: Duration = Duration::from_secs(3); // > p50 consensus latency with global deployment + const MIN_LATENCY: Duration = Duration::from_millis(150); + const MAX_LATENCY: Duration = Duration::from_secs(10); + let latency = self.latency_observer.latency().unwrap_or(DEFAULT_LATENCY); self.metrics .sequencing_estimated_latency .set(latency.as_millis() as i64); - let latency = std::cmp::max(latency, DEFAULT_LATENCY); + let latency = std::cmp::max(latency, MIN_LATENCY); let latency = std::cmp::min(latency, MAX_LATENCY); let latency = latency * 2; let latency = self.override_by_throughput_profiler(position, latency); @@ -719,7 +723,7 @@ impl ConsensusAdapter { let (await_submit, position, positions_moved, preceding_disconnected) = self.await_submit_delay(epoch_store.committee(), &transactions[..]); - let mut guard = InflightDropGuard::acquire(&self, tx_type.to_string()); + let mut guard = InflightDropGuard::acquire(&self, tx_type); let processed_waiter = tokio::select! { // We need to wait for some delay until we submit transaction to the consensus _ = await_submit => Some(processed_waiter), @@ -1022,24 +1026,24 @@ struct InflightDropGuard<'a> { position: Option, positions_moved: Option, preceding_disconnected: Option, - tx_type: String, + tx_type: &'static str, } impl<'a> InflightDropGuard<'a> { - pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: String) -> Self { - let inflight = adapter + pub fn acquire(adapter: &'a ConsensusAdapter, tx_type: &'static str) -> Self { + adapter .num_inflight_transactions .fetch_add(1, Ordering::SeqCst); adapter .metrics - .sequencing_certificate_attempt + .sequencing_certificate_inflight .with_label_values(&[&tx_type]) .inc(); adapter .metrics - .sequencing_certificate_inflight + .sequencing_certificate_attempt .with_label_values(&[&tx_type]) - .set(inflight as i64); + .inc(); Self { adapter, start: Instant::now(), @@ -1053,16 +1057,14 @@ impl<'a> InflightDropGuard<'a> { impl<'a> Drop for InflightDropGuard<'a> { fn drop(&mut self) { - let inflight = self - .adapter + self.adapter .num_inflight_transactions .fetch_sub(1, Ordering::SeqCst); - // Store the latest latency self.adapter .metrics .sequencing_certificate_inflight - .with_label_values(&[&self.tx_type]) - .set(inflight as i64); + .with_label_values(&[self.tx_type]) + .dec(); let position = if let Some(position) = self.position { self.adapter @@ -1089,14 +1091,27 @@ impl<'a> Drop for InflightDropGuard<'a> { }; let latency = self.start.elapsed(); - if self.position == Some(0) { - self.adapter.latency_observer.report(latency); - } self.adapter .metrics .sequencing_certificate_latency - .with_label_values(&[&position, &self.tx_type]) + .with_label_values(&[&position, self.tx_type]) .observe(latency.as_secs_f64()); + + // Only sample latency after consensus quorum is up. Otherwise, the wait for consensus + // quorum at the beginning of an epoch can distort the sampled latencies. + // Technically there are more system transaction types that can be included in samples + // after the first consensus commit, but this set of types should be enough. + if self.position == Some(0) { + // Transaction types below require quorum existed in the current epoch. + // TODO: refactor tx_type to enum. + let sampled = matches!( + self.tx_type, + "shared_certificate" | "owned_certificate" | "checkpoint_signature" | "soft_bundle" + ); + if sampled { + self.adapter.latency_observer.report(latency); + } + } } } diff --git a/crates/sui-core/src/metrics.rs b/crates/sui-core/src/metrics.rs index 54cc6fbf9de4a..d9093e32b9ae3 100644 --- a/crates/sui-core/src/metrics.rs +++ b/crates/sui-core/src/metrics.rs @@ -29,13 +29,17 @@ impl LatencyObserver { } pub fn report(&self, latency: Duration) { - const MAX_SAMPLES: usize = 64; + const EXPECTED_SAMPLES: usize = 128; let mut data = self.data.lock(); data.points.push_back(latency); data.sum += latency; - if data.points.len() >= MAX_SAMPLES { + if data.points.len() < EXPECTED_SAMPLES { + // Do not initialize average latency until there are enough samples. + return; + } + while data.points.len() > EXPECTED_SAMPLES { let pop = data.points.pop_front().expect("data vector is not empty"); - data.sum -= pop; // This does not overflow because of how running sum is calculated + data.sum -= pop; // This does not underflow because of how running sum is calculated } let latency = data.sum.as_millis() as u64 / data.points.len() as u64; self.latency_ms.store(latency, Ordering::Relaxed); @@ -44,7 +48,7 @@ impl LatencyObserver { pub fn latency(&self) -> Option { let latency = self.latency_ms.load(Ordering::Relaxed); if latency == u64::MAX { - // Not initialized yet (0 data points) + // Not initialized yet (not enough data points) None } else { Some(Duration::from_millis(latency)) diff --git a/crates/sui-core/src/overload_monitor.rs b/crates/sui-core/src/overload_monitor.rs index f02df06ac54fa..b162880226844 100644 --- a/crates/sui-core/src/overload_monitor.rs +++ b/crates/sui-core/src/overload_monitor.rs @@ -424,12 +424,16 @@ mod tests { .build() .await; + // Initialize latency reporter. + for _ in 0..1000 { + state + .metrics + .execution_queueing_latency + .report(Duration::from_secs(20)); + } + // Creates a simple case to see if authority state overload_info can be updated // correctly by check_authority_overload. - state - .metrics - .execution_queueing_latency - .report(Duration::from_secs(20)); let authority = Arc::downgrade(&state); assert!(check_authority_overload(&authority, &config)); assert!(state.overload_info.is_overload.load(Ordering::Relaxed));