Skip to content

Commit

Permalink
add metrics, do not exit post processing loop when lagged (MystenLabs…
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu authored Oct 11, 2022
1 parent 16b6110 commit 03a0c21
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 26 deletions.
115 changes: 92 additions & 23 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ pub struct AuthorityMetrics {
total_certs: IntCounter,
total_cert_attempts: IntCounter,
total_effects: IntCounter,
total_events: IntCounter,
signature_errors: IntCounter,
pub shared_obj_tx: IntCounter,
tx_already_processed: IntCounter,
Expand Down Expand Up @@ -158,6 +157,18 @@ pub struct AuthorityMetrics {
pub gossip_sync_count: IntCounter,
pub gossip_task_success_count: IntCounter,
pub gossip_task_error_count: IntCounter,

/// Post processing metrics
post_processing_total_events_emitted: IntCounter,
post_processing_total_tx_indexed: IntCounter,
post_processing_total_tx_added_to_streamer: IntCounter,
post_processing_total_tx_had_event_processed: IntCounter,
post_processing_total_tx_sent_to_post_processing: IntCounter,
post_processing_latest_seq_seen: IntGauge,

/// Batch service metrics
pub(crate) batch_service_total_tx_broadcasted: IntCounter,
pub(crate) batch_service_latest_seq_broadcasted: IntGauge,
}

// Override default Prom buckets for positive numbers in 0-50k range
Expand Down Expand Up @@ -200,12 +211,7 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
total_events: register_int_counter_with_registry!(
"total_events",
"Total number of events produced",
registry,
)
.unwrap(),

signature_errors: register_int_counter_with_registry!(
"total_signature_errors",
"Number of transaction signature errors",
Expand Down Expand Up @@ -359,6 +365,54 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
post_processing_total_events_emitted: register_int_counter_with_registry!(
"post_processing_total_events_emitted",
"Total number of events emitted in post processing",
registry,
)
.unwrap(),
post_processing_total_tx_indexed: register_int_counter_with_registry!(
"post_processing_total_tx_indexed",
"Total number of txes indexed in post processing",
registry,
)
.unwrap(),
post_processing_total_tx_added_to_streamer: register_int_counter_with_registry!(
"post_processing_total_tx_added_to_streamer",
"Total number of txes added to tx streamer in post processing",
registry,
)
.unwrap(),
post_processing_total_tx_had_event_processed: register_int_counter_with_registry!(
"post_processing_total_tx_had_event_processed",
"Total number of txes finished event processing in post processing",
registry,
)
.unwrap(),
post_processing_total_tx_sent_to_post_processing: register_int_counter_with_registry!(
"post_processing_total_tx_sent_to_post_processing",
"Total number of txes sent to post processing",
registry,
)
.unwrap(),
post_processing_latest_seq_seen: register_int_gauge_with_registry!(
"post_processing_latest_seq_seen",
"The latest seq number of tx that is seen in post processing",
registry,
)
.unwrap(),
batch_service_total_tx_broadcasted: register_int_counter_with_registry!(
"batch_service_total_tx_broadcasted",
"Total number of txes broadcasted in batch service",
registry,
)
.unwrap(),
batch_service_latest_seq_broadcasted: register_int_gauge_with_registry!(
"batch_service_latest_seq_broadcasted",
"The latest seq number of tx that is broadcasted in batch service",
registry,
)
.unwrap(),
}
}
}
Expand Down Expand Up @@ -891,6 +945,7 @@ impl AuthorityState {
}
}

#[instrument(level = "debug", skip_all, fields(seq = ?seq, tx_digest =? digest), err)]
fn index_tx(
&self,
indexes: &IndexStore,
Expand Down Expand Up @@ -922,7 +977,12 @@ impl AuthorityState {
)
}

async fn process_one_tx(&self, seq: TxSequenceNumber, digest: &TransactionDigest) -> SuiResult {
#[instrument(level = "debug", skip_all, fields(seq=?seq, tx_digest=?digest), err)]
async fn post_process_one_tx(
&self,
seq: TxSequenceNumber,
digest: &TransactionDigest,
) -> SuiResult {
// Load cert and effects.
let info = self.make_transaction_info(digest).await?;
let (cert, effects) = match info {
Expand All @@ -942,26 +1002,32 @@ impl AuthorityState {

// Index tx
if let Some(indexes) = &self.indexes {
if let Err(e) =
self.index_tx(indexes.as_ref(), seq, digest, &cert, &effects, timestamp_ms)
{
warn!(?digest, "Couldn't index tx: {}", e);
}
let _ = self
.index_tx(indexes.as_ref(), seq, digest, &cert, &effects, timestamp_ms)
.tap_ok(|_| self.metrics.post_processing_total_tx_indexed.inc())
.tap_err(
|e| warn!(tx_digest=?digest, "Post processing - Couldn't index tx: {}", e),
);
}

// Stream transaction
if let Some(transaction_streamer) = &self.transaction_streamer {
transaction_streamer.enqueue((cert, effects.clone())).await;
self.metrics
.post_processing_total_tx_added_to_streamer
.inc();
}

// Emit events
if let Some(event_handler) = &self.event_handler {
event_handler
.process_events(&effects.effects, timestamp_ms, seq)
.await?;
.await
.tap_ok(|_| self.metrics.post_processing_total_tx_had_event_processed.inc())
.tap_err(|e| warn!(tx_digest=?digest, "Post processing - Couldn't process events for tx: {}", e))?;

self.metrics
.total_events
.post_processing_total_events_emitted
.inc_by(effects.effects.events.len() as u64);
}

Expand All @@ -984,26 +1050,29 @@ impl AuthorityState {
},
)) = item
{
if let Err(e) = self.process_one_tx(seq, &digest).await {
self.metrics.post_processing_latest_seq_seen.set(seq as i64);
self.metrics
.post_processing_total_tx_sent_to_post_processing
.inc();
if let Err(e) = self.post_process_one_tx(seq, &digest).await {
warn!(?digest, "Couldn't process tx: {}", e);
}
}
}

// For both the error cases, we exit the loop which ends this task.
// TODO: Automatically restart the task, which in combination with the todo above,
// will process any skipped txes and then begin listening for new ones.
Err(RecvError::Closed) => {
// The service closed the channel.
error!("run_tx_post_processing_process receiver channel closed");
// This shall not happen.
error!("run_tx_post_processing_process receiver channel closed. This shall not happen.");
break;
}
// Today if post processing is too slow we will skip indexing some txes.
// TODO: https://github.com/MystenLabs/sui/issues/5025
// Automatically restart the task, which in combination with the todo above,
// will process any skipped txes and then begin listening for new ones.
Err(RecvError::Lagged(number_skipped)) => {
error!(
"run_tx_post_processing_process too slow, skipped {} txes",
number_skipped
);
break;
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions crates/sui-core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,19 @@ impl crate::authority::AuthorityState {
// transactions we may have received out of order.
let mut current_batch: Vec<(TxSequenceNumber, ExecutionDigests)> = Vec::new();

while !exit {
loop {
if exit {
error!("batch service exited!");
break;
}
// Reset the flags.
make_batch = false;

// check if we should make a new block
tokio::select! {
_ = interval.tick() => {
// Every so often we check if we should make a batch
// but it should never be empty. But never empty.
// but it should never be empty.
make_batch = true;
},
item_option = transaction_stream.next() => {
Expand All @@ -183,6 +187,9 @@ impl crate::authority::AuthorityState {
current_batch.push((seq, tx_digest));
let _ = self.batch_channels.send(UpdateItem::Transaction((seq, tx_digest)));

self.metrics.batch_service_total_tx_broadcasted.inc();
self.metrics.batch_service_latest_seq_broadcasted.set(seq as i64);

if current_batch.len() as TxSequenceNumber >= min_batch_size {
make_batch = true;
}
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-core/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use move_bytecode_utils::module_cache::SyncModuleCache;
use sui_json_rpc_types::SuiMoveStruct;
use tokio_stream::Stream;
use tracing::{debug, error, trace};
use tracing::{debug, error, instrument, trace};

use sui_storage::event_store::{EventStore, EventStoreType};
use sui_types::base_types::TransactionDigest;
Expand Down Expand Up @@ -42,6 +42,7 @@ impl EventHandler {
}
}

#[instrument(level = "debug", skip_all, fields(seq=?seq_num, tx_digest=?effects.transaction_digest), err)]
pub async fn process_events(
&self,
effects: &TransactionEffects,
Expand Down

0 comments on commit 03a0c21

Please sign in to comment.