Skip to content

Commit

Permalink
Add a scoped utility to monitor cumulative duration and iterations (M…
Browse files Browse the repository at this point in the history
…ystenLabs#6501)

This is a generalization of TaskUtilizationExt that we use in a few places. There are a few select loops and sections guarded by mutex that may become the bottleneck of the system. It is useful to monitor how "loaded" they are, and if increased load comes from increased number of calls. In future we can also record the number of times where a call is particularly long (> 1s or a user specified threshold).
  • Loading branch information
mwtian authored Dec 1, 2022
1 parent fae9b0f commit 1c711c1
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 100 deletions.
58 changes: 56 additions & 2 deletions crates/mysten-metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Instant;

use once_cell::sync::OnceCell;
use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry};
use tap::TapFallible;

use tracing::warn;

pub use scopeguard;
Expand All @@ -13,6 +14,8 @@ pub use scopeguard;
pub struct Metrics {
pub tasks: IntGaugeVec,
pub futures: IntGaugeVec,
pub scope_iterations: IntGaugeVec,
pub scope_duration_ns: IntGaugeVec,
}

impl Metrics {
Expand All @@ -32,6 +35,20 @@ impl Metrics {
registry,
)
.unwrap(),
scope_iterations: register_int_gauge_vec_with_registry!(
"monitored_scope_iterations",
"Total number of times where the monitored scope runs",
&["name"],
registry,
)
.unwrap(),
scope_duration_ns: register_int_gauge_vec_with_registry!(
"monitored_scope_duration_ns",
"Total duration in nanosecs where the monitored scope is running",
&["name"],
registry,
)
.unwrap(),
}
}
}
Expand Down Expand Up @@ -61,7 +78,7 @@ macro_rules! monitored_future {
async move {
let metrics = mysten_metrics::get_metrics();

let _guard = if let Some(m) = &metrics {
let _guard = if let Some(m) = metrics {
m.$metric.with_label_values(&[LOCATION]).inc();
Some(mysten_metrics::scopeguard::guard(m, |metrics| {
m.$metric.with_label_values(&[LOCATION]).dec();
Expand All @@ -81,3 +98,40 @@ macro_rules! spawn_monitored_task {
tokio::task::spawn(mysten_metrics::monitored_future!(tasks, $fut))
};
}

pub struct MonitoredScopeGuard {
metrics: &'static Metrics,
name: &'static str,
timer: Instant,
}

impl Drop for MonitoredScopeGuard {
fn drop(&mut self) {
self.metrics
.scope_duration_ns
.with_label_values(&[self.name])
.add(self.timer.elapsed().as_nanos() as i64);
}
}

/// This function creates a named scoped object, that keeps track of
/// - the total iterations where the scope is called in the `monitored_scope_iterations` metric.
/// - and the total duration of the scope in the `monitored_scope_duration_ns` metric.
///
/// The monitored scope should be single threaded, e.g. the scoped object encompass the lifetime of
/// a select loop or guarded by mutex.
/// Then the rate of `monitored_scope_duration_ns`, converted to the unit of sec / sec, would be
/// how full the single threaded scope is running.
pub fn monitored_scope(name: &'static str) -> Option<MonitoredScopeGuard> {
let metrics = get_metrics();
if let Some(m) = metrics {
m.scope_iterations.with_label_values(&[name]).inc();
Some(MonitoredScopeGuard {
metrics: m,
name,
timer: Instant::now(),
})
} else {
None
}
}
34 changes: 3 additions & 31 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use move_core_types::identifier::Identifier;
use move_core_types::parser::parse_struct_tag;
use move_core_types::{language_storage::ModuleId, resolver::ModuleResolver};
use move_vm_runtime::{move_vm::MoveVM, native_functions::NativeFunctionTable};
use mysten_metrics::monitored_scope;
use prometheus::{
exponential_buckets, register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
Expand Down Expand Up @@ -87,7 +88,6 @@ use crate::consensus_handler::{
};
use crate::epoch::committee_store::CommitteeStore;
use crate::epoch::reconfiguration::ReconfigState;
use crate::metrics::TaskUtilizationExt;
use crate::module_cache_gauge::ModuleCacheGauge;
use crate::scoped_counter;
use crate::{
Expand Down Expand Up @@ -166,10 +166,7 @@ pub struct AuthorityMetrics {
pub(crate) transaction_manager_num_pending_certificates: IntGauge,
pub(crate) transaction_manager_num_ready: IntGauge,

total_consensus_txns: IntCounter,
skipped_consensus_txns: IntCounter,
handle_consensus_duration_mcs: IntCounter,
verify_narwhal_transaction_duration_mcs: IntCounter,

pub follower_items_streamed: IntCounter,
pub follower_items_loaded: IntCounter,
Expand Down Expand Up @@ -332,30 +329,12 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
total_consensus_txns: register_int_counter_with_registry!(
"total_consensus_txns",
"Total number of consensus transactions received from narwhal",
registry,
)
.unwrap(),
skipped_consensus_txns: register_int_counter_with_registry!(
"skipped_consensus_txns",
"Total number of consensus transactions skipped",
registry,
)
.unwrap(),
handle_consensus_duration_mcs: register_int_counter_with_registry!(
"handle_consensus_duration_mcs",
"Total duration of handle_consensus_transaction",
registry,
)
.unwrap(),
verify_narwhal_transaction_duration_mcs: register_int_counter_with_registry!(
"verify_narwhal_transaction_duration_mcs",
"Total duration of verify_narwhal_transaction",
registry,
)
.unwrap(),
follower_items_streamed: register_int_counter_with_registry!(
"follower_items_streamed",
"Number of transactions/signed batches streamed to followers",
Expand Down Expand Up @@ -2159,10 +2138,7 @@ impl AuthorityState {
consensus_output: &ConsensusOutput,
transaction: SequencedConsensusTransaction,
) -> Result<VerifiedSequencedConsensusTransaction, ()> {
let _timer = self
.metrics
.verify_narwhal_transaction_duration_mcs
.utilization_timer();
let _scope = monitored_scope("VerifyConsensusTransaction");
if self
.database
.consensus_message_processed(&transaction.transaction.key())
Expand Down Expand Up @@ -2223,16 +2199,12 @@ impl AuthorityState {
transaction: VerifiedSequencedConsensusTransaction,
checkpoint_service: &Arc<C>,
) -> SuiResult {
let _scope = monitored_scope("HandleConsensusTransaction");
let VerifiedSequencedConsensusTransaction(SequencedConsensusTransaction {
consensus_output: _consensus_output,
consensus_index,
transaction,
}) = transaction;
self.metrics.total_consensus_txns.inc();
let _timer = self
.metrics
.handle_consensus_duration_mcs
.utilization_timer();
let tracking_id = transaction.get_tracking_id();
// TODO: Somewhere here we check if we have seen 2f+1 EndOfPublish message, and if so,
// we call self.get_reconfig_state_write_lock_guard to get a guard, and then call
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{sync::Arc, time::Duration};

use mysten_metrics::spawn_monitored_task;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use prometheus::{
register_int_counter_with_registry, register_int_gauge_with_registry, IntCounter, IntGauge,
Registry,
Expand Down Expand Up @@ -80,6 +80,8 @@ where

// Loop whenever there is a signal that a new transactions is ready to process.
loop {
let _scope = monitored_scope("ExecutionDriver");

let certificate = if let Some(cert) = ready_certificates_stream.recv().await {
cert
} else {
Expand Down
7 changes: 3 additions & 4 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ pub use crate::checkpoints::checkpoint_output::{
LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
};
pub use crate::checkpoints::metrics::CheckpointMetrics;
use crate::metrics::TaskUtilizationExt;
use crate::stake_aggregator::{InsertResult, StakeAggregator};
use fastcrypto::encoding::{Encoding, Hex};
use futures::future::{select, Either};
use futures::FutureExt;
use mysten_metrics::spawn_monitored_task;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -284,7 +283,7 @@ impl CheckpointBuilder {
roots: Vec<TransactionDigest>,
last_checkpoint_of_epoch: bool,
) -> anyhow::Result<()> {
let _timer = self.metrics.builder_utilization.utilization_timer();
let _scope = monitored_scope("MakeCheckpoint");
self.metrics
.checkpoint_roots_count
.inc_by(roots.len() as u64);
Expand Down Expand Up @@ -518,7 +517,7 @@ impl CheckpointAggregator {
}

async fn run_inner(&mut self) -> SuiResult {
let _timer = self.metrics.aggregator_utilization.utilization_timer();
let _scope = monitored_scope("CheckpointAggregator");
'outer: loop {
let current = if let Some(current) = &mut self.current {
current
Expand Down
30 changes: 1 addition & 29 deletions crates/sui-core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{Histogram, IntCounter};
use prometheus::Histogram;
use tokio::time::Instant;

/// Increment an IntGauge metric, and decrement it when the scope ends.
Expand All @@ -23,31 +23,3 @@ pub fn start_timer(metrics: Histogram) -> impl Drop {
metrics.observe(start_ts.elapsed().as_secs_f64());
})
}

pub struct TaskUtilizationGuard<'a> {
metric: &'a IntCounter,
start: Instant,
}

pub trait TaskUtilizationExt {
/// Measures amount of time spent until guard is dropped and increments the counter by duration in mcs
/// Primary usage for this counter is to measure 'utilization' of the single task
/// E.g. having rate(metric) / 1_000_000 can tell what portion of time this task is busy
/// For the tasks that are run in single thread this indicates how close is this task to a complete saturation
fn utilization_timer(&self) -> TaskUtilizationGuard;
}

impl TaskUtilizationExt for IntCounter {
fn utilization_timer(&self) -> TaskUtilizationGuard {
TaskUtilizationGuard {
start: Instant::now(),
metric: self,
}
}
}

impl<'a> Drop for TaskUtilizationGuard<'a> {
fn drop(&mut self) {
self.metric.inc_by(self.start.elapsed().as_micros() as u64);
}
}
3 changes: 3 additions & 0 deletions crates/sui-storage/src/lock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//! This allows reads to proceed without being blocked on writes.
use futures::channel::oneshot;
use mysten_metrics::monitored_scope;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
Expand Down Expand Up @@ -500,6 +501,7 @@ impl LockServiceImpl {
debug!("LockService command processing loop started");
// NOTE: we use blocking_recv() as its faster than using regular async recv() with awaits in a loop
while let Some(msg) = receiver.blocking_recv() {
let _scope = monitored_scope("LockServiceCommandLoop");
match msg {
LockServiceCommands::Acquire {
epoch,
Expand Down Expand Up @@ -548,6 +550,7 @@ impl LockServiceImpl {
fn run_queries_loop(&self, mut receiver: Receiver<LockServiceQueries>) {
debug!("LockService queries processing loop started");
while let Some(msg) = receiver.blocking_recv() {
let _scope = monitored_scope("LockServiceQueryLoop");
match msg {
LockServiceQueries::GetLock {
object,
Expand Down
4 changes: 3 additions & 1 deletion narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{metrics::ConsensusMetrics, SequenceNumber};
use config::Committee;
use crypto::PublicKey;
use fastcrypto::hash::Hash;
use mysten_metrics::spawn_monitored_task;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use std::{
cmp::{max, Ordering},
collections::HashMap,
Expand Down Expand Up @@ -311,6 +311,8 @@ where
async fn run_inner(mut self) -> StoreResult<()> {
// Listen to incoming certificates.
loop {
let _scope = monitored_scope("NarwhalConsensus");

tokio::select! {
Some(certificate) = self.rx_new_certificates.recv() => {
// If the core already moved to the next epoch we should pull the next
Expand Down
13 changes: 2 additions & 11 deletions narwhal/primary/src/certificate_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::metrics::PrimaryMetrics;
use config::Committee;
use crypto::{NetworkPublicKey, PublicKey};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use mysten_metrics::{monitored_future, spawn_monitored_task};
use mysten_metrics::{monitored_future, monitored_scope, spawn_monitored_task};
use network::{P2pNetwork, PrimaryToPrimaryRpc};
use rand::{rngs::ThreadRng, seq::SliceRandom};
use std::{
Expand Down Expand Up @@ -272,16 +272,12 @@ impl CertificateWaiter {
);
self.fetch_certificates_task.push(
spawn_monitored_task!(async move {
let _scope = monitored_scope("CertificatesFetching");
state
.metrics
.certificate_waiter_inflight_fetch
.with_label_values(&[&committee.epoch.to_string()])
.inc();
state
.metrics
.certificate_waiter_fetch_attempts
.with_label_values(&[&committee.epoch.to_string()])
.inc();

let now = Instant::now();
match run_fetch_task(state.clone(), committee.clone(), gc_round, written_rounds)
Expand All @@ -298,11 +294,6 @@ impl CertificateWaiter {
}
};

state
.metrics
.certificate_waiter_op_latency
.with_label_values(&[&committee.epoch.to_string()])
.observe(now.elapsed().as_secs_f64());
state
.metrics
.certificate_waiter_inflight_fetch
Expand Down
4 changes: 3 additions & 1 deletion narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crypto::{NetworkPublicKey, PublicKey, Signature};
use fastcrypto::{hash::Hash as _, SignatureService};
use futures::StreamExt;
use futures::{future::OptionFuture, stream::FuturesUnordered};
use mysten_metrics::spawn_monitored_task;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use network::{anemo_ext::NetworkExt, CancelOnDropHandler, P2pNetwork, ReliableNetwork};
use std::time::Duration;
use std::{collections::HashMap, sync::Arc, time::Instant};
Expand Down Expand Up @@ -671,6 +671,8 @@ impl Core {
pub async fn run(mut self) {
info!("Core on node {} has started successfully.", self.name);
loop {
let _scope = monitored_scope("NarwhalCore");

let result = tokio::select! {
Some((certificate, notify)) = self.rx_certificates.recv() => {
match self.sanitize_certificate(&certificate).await {
Expand Down
Loading

0 comments on commit 1c711c1

Please sign in to comment.