Skip to content

Commit

Permalink
update to latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Aug 18, 2022
1 parent 15f3e2f commit 6b98051
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 139 deletions.
12 changes: 10 additions & 2 deletions crates/sui-benchmark/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ use crate::benchmark::{
use futures::{join, StreamExt};
use multiaddr::Multiaddr;
use rayon::{iter::ParallelIterator, prelude::*};
use std::sync::Arc;
use std::{panic, thread, thread::sleep, time::Duration};
use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient};
use sui_core::authority_client::{
AuthorityAPI, NetworkAuthorityClient, NetworkAuthorityClientMetrics,
};
use sui_types::{
batch::UpdateItem,
messages::{BatchInfoRequest, BatchInfoResponseItem, TransactionInfoRequest},
Expand Down Expand Up @@ -217,7 +220,12 @@ fn run_latency_microbench(

async fn run_follower(address: Multiaddr) {
// We spawn a second client that listens to the batch interface
let authority_client = NetworkAuthorityClient::connect(&address).await.unwrap();
let authority_client = NetworkAuthorityClient::connect(
&address,
Arc::new(NetworkAuthorityClientMetrics::new_for_tests()),
)
.await
.unwrap();

follow(authority_client, false).await;
}
Expand Down
19 changes: 16 additions & 3 deletions crates/sui-benchmark/src/benchmark/load_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::{sync::Notify, time};
use tracing::{error, info};

use sui_config::NetworkConfig;
use sui_core::authority_client::NetworkAuthorityClientMetrics;
use sui_types::committee::StakeUnit;

pub fn check_transaction_response(reply_message: Result<TransactionInfoResponse, io::Error>) {
Expand All @@ -50,7 +51,11 @@ pub async fn send_tx_chunks(

let mut tasks = Vec::new();
for tx_chunks in tx_chunks.chunks(tx_chunks.len() / conn) {
let client = NetworkAuthorityClient::connect_lazy(&address).unwrap();
let client = NetworkAuthorityClient::connect_lazy(
&address,
Arc::new(NetworkAuthorityClientMetrics::new_for_tests()),
)
.unwrap();
let txns = tx_chunks.to_vec();

let task = tokio::spawn(async move {
Expand Down Expand Up @@ -93,7 +98,11 @@ pub async fn send_transactions(

let mut tasks = Vec::new();
for tx_chunks in tx_chunks.chunks(tx_chunks.len() / conn) {
let client = NetworkAuthorityClient::connect_lazy(&address).unwrap();
let client = NetworkAuthorityClient::connect_lazy(
&address,
Arc::new(NetworkAuthorityClientMetrics::new_for_tests()),
)
.unwrap();
let txns = tx_chunks.to_vec();

let task = tokio::spawn(async move {
Expand Down Expand Up @@ -131,7 +140,11 @@ pub async fn send_confs(

let mut tasks = Vec::new();
for tx_chunks in tx_chunks.chunks(tx_chunks.len() / conn) {
let client = NetworkAuthorityClient::connect_lazy(&address).unwrap();
let client = NetworkAuthorityClient::connect_lazy(
&address,
Arc::new(NetworkAuthorityClientMetrics::new_for_tests()),
)
.unwrap();
let txns = tx_chunks.to_vec();

let task = tokio::spawn(async move {
Expand Down
18 changes: 15 additions & 3 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct GossipMetrics {
pub wait_for_finality_latency_sec: Histogram,
pub total_attempts_cert_downloads: IntCounter,
pub total_successful_attempts_cert_downloads: IntCounter,
pub follower_stream_duration: Histogram,
}

impl GossipMetrics {
Expand Down Expand Up @@ -101,7 +102,7 @@ impl GossipMetrics {
.unwrap(),
wait_for_finality_latency_sec: register_histogram_with_registry!(
"gossip_wait_for_finality_latency_sec",
"Latency histogram for gossip/node sync process to wait for txs to become final, in sec",
"Latency histogram for gossip/node sync process to wait for txs to become final, in seconds",
registry,
)
.unwrap(),
Expand All @@ -117,6 +118,12 @@ impl GossipMetrics {
registry,
)
.unwrap(),
follower_stream_duration: register_histogram_with_registry!(
"follower_stream_duration",
"Latency histogram of the duration of the follower streams to peers, in seconds",
registry,
)
.unwrap(),
}
}

Expand Down Expand Up @@ -263,8 +270,8 @@ async fn wait_for_one_gossip_task_to_finish<A>(
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let (finished_name, _result) = gossip_tasks.select_next_some().await;
if let Err(err) = _result {
let (finished_name, result) = gossip_tasks.select_next_some().await;
if let Err(err) = result {
active_authority.set_failure_backoff(finished_name).await;
active_authority.state.metrics.gossip_task_error_count.inc();
error!(peer = ?finished_name, "Peer returned error: {:?}", err);
Expand Down Expand Up @@ -466,6 +473,8 @@ where
let mut last_seq_in_cur_batch: TxSequenceNumber = 0;
let mut streamx = Box::pin(self.client.handle_batch_stream(req).await?);
let metrics = handler.get_metrics();
let mut timer = metrics.follower_stream_duration.start_timer();

loop {
tokio::select! {
_ = &mut timeout => {
Expand Down Expand Up @@ -513,6 +522,8 @@ where

// The stream has closed, re-request:
None => {
timer.stop_and_record();
timer = metrics.follower_stream_duration.start_timer();
info!(peer = ?self.peer_name, ?latest_seq, "Gossip stream was closed. Restarting");
self.client.metrics_seq_number_to_handle_batch_stream.set(latest_seq.unwrap_or_default() as i64);
self.client.metrics_total_times_reconnect_follower_stream.inc();
Expand Down Expand Up @@ -545,6 +556,7 @@ where
}
};
}
timer.stop_and_record();
Ok(())
}
}
Loading

0 comments on commit 6b98051

Please sign in to comment.