Skip to content

Commit

Permalink
[CheckpointExecution] add more metrics and logs (MystenLabs#10884)
Browse files Browse the repository at this point in the history
## Description 

1. Add metrics for inflight checkpoints being executed, latency to
prepare checkpoint for execution, and actual execution latency.
2. Clean up logging of checkpoint sequence number a bit, by relying on
`instrument` when possible.
3. For checkpoint execution timeout (30s), only emit warnings for the
checkpoint next to last executed. This reduces noises from the logs. We
can consider reducing the timeout back to 10s.

## Test Plan 

Built fullnode and viewed logs emitted.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Apr 14, 2023
1 parent d1773e6 commit 57451ad
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 22 deletions.
19 changes: 19 additions & 0 deletions crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct CheckpointExecutorMetrics {
pub last_executed_checkpoint: IntGauge,
pub checkpoint_exec_errors: IntCounter,
pub checkpoint_exec_epoch: IntGauge,
pub checkpoint_exec_inflight: IntGauge,
pub checkpoint_exec_latency_us: Histogram,
pub checkpoint_prepare_latency_us: Histogram,
pub checkpoint_transaction_count: Histogram,
pub checkpoint_contents_age_ms: Histogram,
pub accumulator_inconsistent_state: IntGauge,
Expand Down Expand Up @@ -45,6 +48,22 @@ impl CheckpointExecutorMetrics {
registry
)
.unwrap(),
checkpoint_exec_inflight: register_int_gauge_with_registry!(
"checkpoint_exec_inflight",
"Current number of inflight checkpoints being executed",
registry
)
.unwrap(),
checkpoint_exec_latency_us: Histogram::new_in_registry(
"checkpoint_exec_latency_us",
"Latency of executing a checkpoint from enqueue to all effects available, in microseconds",
registry,
),
checkpoint_prepare_latency_us: Histogram::new_in_registry(
"checkpoint_prepare_latency_us",
"Latency of preparing a checkpoint to enqueue for execution, in microseconds",
registry,
),
checkpoint_transaction_count: Histogram::new_in_registry(
"checkpoint_transaction_count",
"Number of transactions in the checkpoint",
Expand Down
102 changes: 80 additions & 22 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub(crate) mod tests;

type CheckpointExecutionBuffer = FuturesOrdered<JoinHandle<VerifiedCheckpoint>>;

/// The interval to log checkpoint progress, in # of checkpoints processed.
const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;

pub struct CheckpointExecutor {
mailbox: broadcast::Receiver<VerifiedCheckpoint>,
checkpoint_store: Arc<CheckpointStore>,
Expand Down Expand Up @@ -168,6 +171,9 @@ impl CheckpointExecutor {
epoch_store.clone(),
)
.await;
self.metrics
.checkpoint_exec_inflight
.set(pending.len() as i64);
tokio::select! {
// Check for completed workers and ratchet the highest_checkpoint_executed
// watermark accordingly. Note that given that checkpoints are guaranteed to
Expand Down Expand Up @@ -239,7 +245,7 @@ impl CheckpointExecutor {
assert_eq!(seq, 0);
}
debug!("Bumping highest_executed_checkpoint watermark to {:?}", seq);
if seq % 10000 == 0 {
if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
info!("Finished syncing and executing checkpoint {}", seq);
}

Expand Down Expand Up @@ -290,13 +296,14 @@ impl CheckpointExecutor {
}
}

#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
async fn schedule_checkpoint(
&self,
checkpoint: VerifiedCheckpoint,
pending: &mut CheckpointExecutionBuffer,
epoch_store: Arc<AuthorityPerEpochStore>,
) {
debug!("Executing checkpoint {:?}", checkpoint.sequence_number());
debug!("Scheduling checkpoint for execution");

// Mismatch between node epoch and checkpoint epoch after startup
// crash recovery is invalid
Expand Down Expand Up @@ -325,18 +332,16 @@ impl CheckpointExecutor {
}
}

#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
// Logs within the function are annotated with the checkpoint sequence number and epoch,
// from schedule_checkpoint().
async fn execute_checkpoint(
&self,
checkpoint: VerifiedCheckpoint,
epoch_store: Arc<AuthorityPerEpochStore>,
pending: &mut CheckpointExecutionBuffer,
) -> SuiResult {
let checkpoint_sequence = *checkpoint.sequence_number();
debug!(
"Scheduling checkpoint {:?} for execution",
checkpoint_sequence,
);
debug!("Preparing checkpoint for execution",);
let prepare_start = Instant::now();

// this function must guarantee that all transactions in the checkpoint are executed before it
// returns. This invariant is enforced in two phases:
Expand All @@ -352,12 +357,7 @@ impl CheckpointExecutor {
);

let tx_count = execution_digests.len();
debug!(
epoch=?epoch_store.epoch(),
checkpoint_sequence=?checkpoint.sequence_number(),
"Number of transactions in the checkpoint: {:?}",
tx_count
);
debug!("Number of transactions in the checkpoint: {:?}", tx_count);
self.metrics
.checkpoint_transaction_count
.report(tx_count as u64);
Expand All @@ -369,11 +369,14 @@ impl CheckpointExecutor {
epoch_store.clone(),
checkpoint,
pending,
prepare_start,
)
.await?;
Ok(())
}

// Logs within the function are annotated with the checkpoint sequence number and epoch,
// from schedule_checkpoint().
async fn execute_transactions(
&self,
execution_digests: Vec<ExecutionDigests>,
Expand All @@ -382,6 +385,7 @@ impl CheckpointExecutor {
epoch_store: Arc<AuthorityPerEpochStore>,
checkpoint: VerifiedCheckpoint,
pending: &mut CheckpointExecutionBuffer,
prepare_start: Instant,
) -> SuiResult {
let effects_digests = execution_digests.iter().map(|digest| digest.effects);

Expand Down Expand Up @@ -412,26 +416,47 @@ impl CheckpointExecutor {
}
}

let exec_start = Instant::now();
let prepare_elapsed = exec_start - prepare_start;
self.metrics
.checkpoint_prepare_latency_us
.report(prepare_elapsed.as_micros() as u64);
if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
info!(
"Checkpoint preparation for execution took {:?}",
prepare_elapsed
);
}

self.tx_manager
.enqueue(executable_txns.clone(), &epoch_store)?;

let local_execution_timeout_sec = self.config.local_execution_timeout_sec;
let checkpoint_store = self.checkpoint_store.clone();
let authority_store = self.authority_store.clone();
let tx_manager = self.tx_manager.clone();
let accumulator = self.accumulator.clone();
let metrics = self.metrics.clone();
pending.push_back(spawn_monitored_task!(async move {
let epoch_store = epoch_store.clone();
handle_execution_effects(
execution_digests,
all_tx_digests,
checkpoint.clone(),
authority_store.clone(),
epoch_store.clone(),
tx_manager.clone(),
accumulator.clone(),
checkpoint_store,
authority_store,
epoch_store,
tx_manager,
accumulator,
local_execution_timeout_sec,
)
.await;
let exec_elapsed = exec_start.elapsed();
metrics
.checkpoint_exec_latency_us
.report(exec_elapsed.as_micros() as u64);
if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
info!(seq = ?checkpoint.sequence_number(), "Checkpoint execution took {:?}", exec_elapsed);
}
checkpoint
}));

Expand Down Expand Up @@ -472,6 +497,7 @@ impl CheckpointExecutor {
vec![execution_digests],
vec![change_epoch_tx_digest],
checkpoint.clone(),
self.checkpoint_store.clone(),
self.authority_store.clone(),
epoch_store.clone(),
self.tx_manager.clone(),
Expand Down Expand Up @@ -566,10 +592,12 @@ impl CheckpointExecutor {
}
}

#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
async fn handle_execution_effects(
execution_digests: Vec<ExecutionDigests>,
all_tx_digests: Vec<TransactionDigest>,
checkpoint: VerifiedCheckpoint,
checkpoint_store: Arc<CheckpointStore>,
authority_store: Arc<AuthorityStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
transaction_manager: Arc<TransactionManager>,
Expand All @@ -579,12 +607,44 @@ async fn handle_execution_effects(
// Once synced_txns have been awaited, all txns should have effects committed.
let mut periods = 1;
let log_timeout_sec = Duration::from_secs(local_execution_timeout_sec);
let seq_num = checkpoint.sequence_number;
// Whether the checkpoint is next to execute and blocking additional executions.
let mut blocking_execution = false;
loop {
let effects_future = authority_store.notify_read_executed_effects(all_tx_digests.clone());

match timeout(log_timeout_sec, effects_future).await {
Err(_elapsed) => {
// Reading this value every timeout should be ok.
let highest_seq = checkpoint_store
.get_highest_executed_checkpoint_seq_number()
.unwrap()
.unwrap_or_default();
if checkpoint.sequence_number <= highest_seq {
error!(
"Re-executing checkpoint {} after higher checkpoint {} has executed!",
checkpoint.sequence_number, highest_seq
);
continue;
}
if checkpoint.sequence_number > highest_seq + 1 {
trace!(
"Checkpoint {} is still executing. Highest executed = {}",
checkpoint.sequence_number,
highest_seq
);
continue;
}
if !blocking_execution {
trace!(
"Checkpoint {} is next to execute.",
checkpoint.sequence_number
);
blocking_execution = true;
continue;
}

// Only log details when the checkpoint is next to execute, but has not finished
// execution within log_timeout_sec.
let missing_digests: Vec<TransactionDigest> = authority_store
.multi_get_executed_effects(&all_tx_digests)
.expect("multi_get_executed_effects cannot fail")
Expand All @@ -607,7 +667,6 @@ async fn handle_execution_effects(
}

warn!(
checkpoint_seq_num = seq_num,
"Transaction effects for checkpoint tx digests {:?} not present within {:?}. ",
missing_digests,
log_timeout_sec * periods,
Expand All @@ -618,7 +677,6 @@ async fn handle_execution_effects(
let pending_digest = missing_digests.first().unwrap();
if let Some(missing_input) = transaction_manager.get_missing_input(pending_digest) {
warn!(
checkpoint_seq_num = seq_num,
"Transaction {pending_digest:?} has missing input objects {missing_input:?}",
);
}
Expand Down

0 comments on commit 57451ad

Please sign in to comment.