Skip to content

Commit

Permalink
state-sync: add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed Nov 28, 2022
1 parent 3910efc commit a6ff1d1
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ multiaddr = "0.16.0"
tap = "1.0.1"
rand = "0.8.5"
anyhow = "1.0.65"
prometheus = "0.13.3"

[build-dependencies]
anemo-build.workspace = true
Expand Down
23 changes: 20 additions & 3 deletions crates/sui-network/src/state_sync/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ use tokio::{
};

use super::{
server::Server, Handle, PeerHeights, StateSync, StateSyncEventLoop, StateSyncMessage,
StateSyncServer,
metrics::Metrics, server::Server, Handle, PeerHeights, StateSync, StateSyncEventLoop,
StateSyncMessage, StateSyncServer,
};
use sui_types::storage::WriteStore;

pub struct Builder<S> {
store: Option<S>,
config: Option<StateSyncConfig>,
metrics: Option<Metrics>,
}

impl Builder<()> {
Expand All @@ -30,6 +31,7 @@ impl Builder<()> {
Self {
store: None,
config: None,
metrics: None,
}
}
}
Expand All @@ -39,13 +41,19 @@ impl<S> Builder<S> {
Builder {
store: Some(store),
config: self.config,
metrics: self.metrics,
}
}

pub fn config(mut self, config: StateSyncConfig) -> Self {
self.config = Some(config);
self
}

pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
self.metrics = Some(Metrics::enabled(registry));
self
}
}

impl<S> Builder<S>
Expand All @@ -59,9 +67,14 @@ where
}

pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
let Builder { store, config } = self;
let Builder {
store,
config,
metrics,
} = self;
let store = store.unwrap();
let config = config.unwrap_or_default();
let metrics = metrics.unwrap_or_else(Metrics::disabled);

let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
let (checkpoint_event_sender, _reciever) =
Expand Down Expand Up @@ -93,6 +106,7 @@ where
store,
peer_heights,
checkpoint_event_sender,
metrics,
},
server,
)
Expand All @@ -106,6 +120,7 @@ pub struct UnstartedStateSync<S> {
pub(super) store: S,
pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
pub(super) metrics: Metrics,
}

impl<S> UnstartedStateSync<S>
Expand All @@ -121,6 +136,7 @@ where
store,
peer_heights,
checkpoint_event_sender,
metrics,
} = self;

(
Expand All @@ -135,6 +151,7 @@ where
peer_heights,
checkpoint_event_sender,
network,
metrics,
},
handle,
)
Expand Down
75 changes: 75 additions & 0 deletions crates/sui-network/src/state_sync/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{register_int_gauge_with_registry, IntGauge, Registry};
use std::sync::Arc;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::Pipe;

#[derive(Clone, Debug)]
pub(super) struct Metrics(Option<Arc<Inner>>);

impl Metrics {
pub fn enabled(registry: &Registry) -> Self {
Metrics(Some(Inner::new(registry)))
}

pub fn disabled() -> Self {
Metrics(None)
}

pub fn set_highest_known_checkpoint(&self, sequence_number: CheckpointSequenceNumber) {
if let Some(inner) = &self.0 {
inner.highest_known_checkpoint.set(sequence_number as i64);
}
}

pub fn set_highest_verified_checkpoint(&self, sequence_number: CheckpointSequenceNumber) {
if let Some(inner) = &self.0 {
inner
.highest_verified_checkpoint
.set(sequence_number as i64);
}
}

pub fn set_highest_synced_checkpoint(&self, sequence_number: CheckpointSequenceNumber) {
if let Some(inner) = &self.0 {
inner.highest_synced_checkpoint.set(sequence_number as i64);
}
}
}

#[derive(Debug)]
struct Inner {
highest_known_checkpoint: IntGauge,
highest_verified_checkpoint: IntGauge,
highest_synced_checkpoint: IntGauge,
}

impl Inner {
pub fn new(registry: &Registry) -> Arc<Self> {
Self {
highest_known_checkpoint: register_int_gauge_with_registry!(
"highest_known_checkpoint",
"Highest known checkpoint",
registry
)
.unwrap(),

highest_verified_checkpoint: register_int_gauge_with_registry!(
"highest_verified_checkpoint",
"Highest verified checkpoint",
registry
)
.unwrap(),

highest_synced_checkpoint: register_int_gauge_with_registry!(
"highest_synced_checkpoint",
"Highest synced checkpoint",
registry
)
.unwrap(),
}
.pipe(Arc::new)
}
}
16 changes: 16 additions & 0 deletions crates/sui-network/src/state_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ mod generated {
include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
}
mod builder;
mod metrics;
mod server;
#[cfg(test)]
mod tests;
Expand All @@ -94,6 +95,8 @@ pub use generated::{
};
pub use server::GetCheckpointSummaryRequest;

use self::metrics::Metrics;

/// A handle to the StateSync subsystem.
///
/// This handle can be cloned and shared. Once all copies of a StateSync system's Handle have been
Expand Down Expand Up @@ -226,6 +229,7 @@ struct StateSyncEventLoop<S> {
peer_heights: Arc<RwLock<PeerHeights>>,
checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
network: anemo::Network,
metrics: Metrics,
}

impl<S> StateSyncEventLoop<S>
Expand Down Expand Up @@ -356,6 +360,10 @@ where
self.store
.update_highest_synced_checkpoint(&checkpoint)
.expect("store operation should not fail");
self.metrics
.set_highest_verified_checkpoint(checkpoint.sequence_number());
self.metrics
.set_highest_synced_checkpoint(checkpoint.sequence_number());

// We don't care if no one is listening as this is a broadcast channel
let _ = self.checkpoint_event_sender.send(checkpoint.clone());
Expand Down Expand Up @@ -440,6 +448,7 @@ where
self.network.clone(),
self.store.clone(),
self.peer_heights.clone(),
self.metrics.clone(),
self.config.checkpoint_header_download_concurrency(),
// The if condition should ensure that this is Some
highest_known_checkpoint.unwrap(),
Expand Down Expand Up @@ -483,6 +492,7 @@ where
self.peer_heights.clone(),
self.weak_sender.clone(),
self.checkpoint_event_sender.clone(),
self.metrics.clone(),
self.config.transaction_download_concurrency(),
// The if condition should ensure that this is Some
highest_verified_checkpoint.unwrap(),
Expand Down Expand Up @@ -612,13 +622,16 @@ async fn sync_to_checkpoint<S>(
network: anemo::Network,
store: S,
peer_heights: Arc<RwLock<PeerHeights>>,
metrics: Metrics,
checkpoint_header_download_concurrency: usize,
checkpoint: Checkpoint,
) -> Result<()>
where
S: WriteStore,
<S as ReadStore>::Error: std::error::Error,
{
metrics.set_highest_known_checkpoint(checkpoint.sequence_number());

let mut current = store
.get_highest_verified_checkpoint()
.expect("store operation should not fail");
Expand Down Expand Up @@ -748,6 +761,7 @@ where
store
.insert_checkpoint(checkpoint.clone())
.expect("store operation should not fail");
metrics.set_highest_verified_checkpoint(checkpoint.sequence_number());
}

peer_heights
Expand All @@ -764,6 +778,7 @@ async fn sync_checkpoint_contents<S>(
peer_heights: Arc<RwLock<PeerHeights>>,
sender: mpsc::WeakSender<StateSyncMessage>,
checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
metrics: Metrics,
transaction_download_concurrency: usize,
target_checkpoint: VerifiedCheckpoint,
) where
Expand Down Expand Up @@ -796,6 +811,7 @@ async fn sync_checkpoint_contents<S>(
store
.update_highest_synced_checkpoint(&checkpoint)
.expect("store operation should not fail");
metrics.set_highest_synced_checkpoint(checkpoint.sequence_number());
// We don't care if no one is listening as this is a broadcast channel
let _ = checkpoint_event_sender.send(checkpoint.clone());
highest_synced = Some(checkpoint);
Expand Down

0 comments on commit a6ff1d1

Please sign in to comment.