Skip to content

Commit

Permalink
Add network metrics from anemo/quinn to ConnectionMonitor (MystenLabs…
Browse files Browse the repository at this point in the history
…#10198)

## Description 

Exported quinn metrics. 

Follow up PR to use
[ClosureMetric](https://github.com/MystenLabs/sui/blob/main/crates/prometheus-closure-metric/src/lib.rs)
instead as per @aschran suggestion. Would have required a little bit of
a refactor of the `ConnectionMonitor` so put it off for another PR.

## Test Plan 


[private-testnet](https://metrics.sui.io/d/k1e0Y8BVz/anemo-metrics?orgId=1&var-Environment=mysten-metrics-internal&var-network=private-testnet&from=1680232014920&to=1680236015994&var-validator=All)
  • Loading branch information
arun-koshy authored Apr 1, 2023
1 parent f3dbbf8 commit ab4c9ae
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 35 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "c2f79b18
fastcrypto-zkp = { git = "https://github.com/MystenLabs/fastcrypto", rev = "c2f79b1807bff7d09517b631191b61f2614c641c", package = "fastcrypto-zkp" }

# anemo dependencies
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9" }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9" }
anemo-cli = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9" }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9" }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e" }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e" }
anemo-cli = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e" }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e" }

# Use the same workspace-hack across crates.
workspace-hack = { path = "crates/workspace-hack" }
Expand Down
1 change: 1 addition & 0 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl SuiNode {
p2p_network.downgrade(),
network_connection_metrics,
HashMap::new(),
None,
);

let connection_monitor_status = ConnectionMonitorStatus {
Expand Down
18 changes: 9 additions & 9 deletions crates/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ aho-corasick = { version = "0.7" }
aliasable = { version = "0.1" }
alloc-no-stdlib = { version = "2", default-features = false }
alloc-stdlib = { version = "0.2", default-features = false }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9", default-features = false }
anemo-cli = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9", default-features = false }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e", default-features = false }
anemo-cli = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e", default-features = false }
anes = { version = "0.1" }
ansi_term = { version = "0.12", default-features = false }
anyhow = { version = "1", features = ["backtrace"] }
Expand Down Expand Up @@ -427,7 +427,7 @@ quanta = { version = "0.9" }
quick-error-dff4ba8e3ae991db = { package = "quick-error", version = "1", default-features = false }
quick-error-f595c2ba2a3f28df = { package = "quick-error", version = "2", default-features = false }
quinn = { version = "0.9", default-features = false, features = ["futures-io", "runtime-tokio", "tls-rustls"] }
quinn-proto = { version = "0.9", default-features = false, features = ["tls-rustls"] }
quinn-proto = { version = "0.9" }
quinn-udp = { version = "0.3", default-features = false }
quote-dff4ba8e3ae991db = { package = "quote", version = "1" }
r2d2 = { version = "0.8", default-features = false }
Expand Down Expand Up @@ -637,10 +637,10 @@ aho-corasick = { version = "0.7" }
aliasable = { version = "0.1" }
alloc-no-stdlib = { version = "2", default-features = false }
alloc-stdlib = { version = "0.2", default-features = false }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9", default-features = false }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9", default-features = false }
anemo-cli = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "4ebf4a86952827ff0fcce6a2d8a80f42f34efed9", default-features = false }
anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e", default-features = false }
anemo-build = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e", default-features = false }
anemo-cli = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e", default-features = false }
anemo-tower = { git = "https://github.com/mystenlabs/anemo.git", rev = "82875616a86f94652b1c8d374b839917791d805e", default-features = false }
anes = { version = "0.1" }
ansi_term = { version = "0.12", default-features = false }
anyhow = { version = "1", features = ["backtrace"] }
Expand Down Expand Up @@ -1115,7 +1115,7 @@ quanta = { version = "0.9" }
quick-error-dff4ba8e3ae991db = { package = "quick-error", version = "1", default-features = false }
quick-error-f595c2ba2a3f28df = { package = "quick-error", version = "2", default-features = false }
quinn = { version = "0.9", default-features = false, features = ["futures-io", "runtime-tokio", "tls-rustls"] }
quinn-proto = { version = "0.9", default-features = false, features = ["tls-rustls"] }
quinn-proto = { version = "0.9" }
quinn-udp = { version = "0.3", default-features = false }
quote-3b31131e45eafb45 = { package = "quote", version = "0.6" }
quote-dff4ba8e3ae991db = { package = "quote", version = "1" }
Expand Down
1 change: 1 addition & 0 deletions narwhal/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ backoff = { version = "0.4.0", features = ["tokio"] }
bytes = "1.3.0"
futures = "0.3.24"
parking_lot = "0.12.1"
quinn-proto = "^0.9.2"
prometheus = "0.13.3"
rand = { version = "0.8.5", features = ["small_rng"] }
tokio = { workspace = true, features = ["rt", "net", "sync", "macros", "time"] }
Expand Down
156 changes: 141 additions & 15 deletions narwhal/network/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::NetworkConnectionMetrics;
use anemo::types::PeerEvent;
use anemo::PeerId;
use dashmap::DashMap;
use futures::future;
use mysten_metrics::spawn_logged_monitored_task;
use std::collections::HashMap;
use quinn_proto::ConnectionStats;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time;
use types::ConditionalBroadcastReceiver;

#[derive(Eq, PartialEq, Clone, Debug)]
pub enum ConnectionStatus {
Expand All @@ -20,6 +26,7 @@ pub struct ConnectionMonitor {
connection_metrics: NetworkConnectionMetrics,
peer_id_types: HashMap<PeerId, String>,
connection_statuses: Arc<DashMap<PeerId, ConnectionStatus>>,
rx_shutdown: Option<ConditionalBroadcastReceiver>,
}

impl ConnectionMonitor {
Expand All @@ -28,6 +35,7 @@ impl ConnectionMonitor {
network: anemo::NetworkRef,
connection_metrics: NetworkConnectionMetrics,
peer_id_types: HashMap<PeerId, String>,
rx_shutdown: Option<ConditionalBroadcastReceiver>,
) -> (JoinHandle<()>, Arc<DashMap<PeerId, ConnectionStatus>>) {
let connection_statuses_outer = Arc::new(DashMap::new());
let connection_statuses = connection_statuses_outer.clone();
Expand All @@ -38,6 +46,7 @@ impl ConnectionMonitor {
connection_metrics,
peer_id_types,
connection_statuses,
rx_shutdown
}
.run(),
"ConnectionMonitor"
Expand All @@ -46,14 +55,17 @@ impl ConnectionMonitor {
)
}

async fn run(self) {
async fn run(mut self) {
let (mut subscriber, connected_peers) = {
if let Some(network) = self.network.upgrade() {
let Ok((subscriber, connected_peers)) = network.subscribe() else {
let Ok((subscriber, known_peers)) = network.subscribe() else {
return;
};

(subscriber, connected_peers)
(
subscriber,
known_peers.into_iter().collect::<HashSet<PeerId>>(),
)
} else {
return;
}
Expand All @@ -69,20 +81,53 @@ impl ConnectionMonitor {
}

// now report the connected peers
for peer_id in connected_peers {
self.handle_peer_status_change(peer_id, ConnectionStatus::Connected)
for peer_id in connected_peers.iter() {
self.handle_peer_status_change(*peer_id, ConnectionStatus::Connected)
.await;
}

while let Ok(event) = subscriber.recv().await {
match event {
anemo::types::PeerEvent::NewPeer(peer_id) => {
self.handle_peer_status_change(peer_id, ConnectionStatus::Connected)
.await;
let mut connection_stat_collection_interval = time::interval(Duration::from_secs(60));

async fn wait_for_shutdown(
rx_shutdown: &mut Option<ConditionalBroadcastReceiver>,
) -> Result<(), tokio::sync::broadcast::error::RecvError> {
if let Some(rx) = rx_shutdown.as_mut() {
rx.receiver.recv().await
} else {
// If no shutdown receiver is provided, wait forever.
let future = future::pending();
#[allow(clippy::let_unit_value)]
let () = future.await;
Ok(())
}
}

loop {
tokio::select! {
_ = connection_stat_collection_interval.tick() => {
if let Some(network) = self.network.upgrade() {
for peer_id in connected_peers.iter() {
if let Some(connection) = network.peer(*peer_id) {
let stats = connection.connection_stats();
self.update_quinn_metrics_for_peer(&format!("{peer_id}"), &stats);
}
}
} else {
continue;
}
}
anemo::types::PeerEvent::LostPeer(peer_id, _) => {
self.handle_peer_status_change(peer_id, ConnectionStatus::Disconnected)
.await;
Ok(event) = subscriber.recv() => {
match event {
PeerEvent::NewPeer(peer_id) => {
self.handle_peer_status_change(peer_id, ConnectionStatus::Connected).await;
}
PeerEvent::LostPeer(peer_id, _) => {
self.handle_peer_status_change(peer_id, ConnectionStatus::Disconnected).await;
}
}
}
_ = wait_for_shutdown(&mut self.rx_shutdown) => {
return;
}
}
}
Expand Down Expand Up @@ -115,6 +160,87 @@ impl ConnectionMonitor {

self.connection_statuses.insert(peer_id, connection_status);
}

// TODO: Replace this with ClosureMetric
fn update_quinn_metrics_for_peer(&self, peer_id: &str, stats: &ConnectionStats) {
// Update PathStats
self.connection_metrics
.network_peer_rtt
.with_label_values(&[peer_id])
.set(stats.path.rtt.as_millis() as i64);
self.connection_metrics
.network_peer_lost_packets
.with_label_values(&[peer_id])
.set(stats.path.lost_packets as i64);
self.connection_metrics
.network_peer_lost_bytes
.with_label_values(&[peer_id])
.set(stats.path.lost_bytes as i64);
self.connection_metrics
.network_peer_sent_packets
.with_label_values(&[peer_id])
.set(stats.path.sent_packets as i64);
self.connection_metrics
.network_peer_congestion_events
.with_label_values(&[peer_id])
.set(stats.path.congestion_events as i64);
self.connection_metrics
.network_peer_congestion_window
.with_label_values(&[peer_id])
.set(stats.path.cwnd as i64);

// Update FrameStats
self.connection_metrics
.network_peer_max_data
.with_label_values(&[peer_id, "transmitted"])
.set(stats.frame_tx.max_data as i64);
self.connection_metrics
.network_peer_max_data
.with_label_values(&[peer_id, "received"])
.set(stats.frame_rx.max_data as i64);
self.connection_metrics
.network_peer_closed_connections
.with_label_values(&[peer_id, "transmitted"])
.set(stats.frame_tx.connection_close as i64);
self.connection_metrics
.network_peer_closed_connections
.with_label_values(&[peer_id, "received"])
.set(stats.frame_rx.connection_close as i64);
self.connection_metrics
.network_peer_data_blocked
.with_label_values(&[peer_id, "transmitted"])
.set(stats.frame_tx.data_blocked as i64);
self.connection_metrics
.network_peer_data_blocked
.with_label_values(&[peer_id, "received"])
.set(stats.frame_rx.data_blocked as i64);

// Update UDPStats
self.connection_metrics
.network_peer_udp_datagrams
.with_label_values(&[peer_id, "transmitted"])
.set(stats.udp_tx.datagrams as i64);
self.connection_metrics
.network_peer_udp_datagrams
.with_label_values(&[peer_id, "received"])
.set(stats.udp_rx.datagrams as i64);
self.connection_metrics
.network_peer_udp_bytes
.with_label_values(&[peer_id, "transmitted"])
.set(stats.udp_tx.bytes as i64);
self.connection_metrics
.network_peer_udp_bytes
.with_label_values(&[peer_id, "received"])
.set(stats.udp_rx.bytes as i64);
self.connection_metrics
.network_peer_udp_transmits
.with_label_values(&[peer_id, "transmitted"])
.set(stats.udp_tx.transmits as i64);
self.connection_metrics
.network_peer_udp_transmits
.with_label_values(&[peer_id, "received"])
.set(stats.udp_rx.transmits as i64);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -145,7 +271,7 @@ mod tests {

// WHEN bring up the monitor
let (_h, statuses) =
ConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), HashMap::new());
ConnectionMonitor::spawn(network_1.downgrade(), metrics.clone(), HashMap::new(), None);

// THEN peer 2 should be already connected
assert_network_peers(metrics.clone(), 1).await;
Expand Down
Loading

0 comments on commit ab4c9ae

Please sign in to comment.