Skip to content

Commit

Permalink
[Consensus] improve logs and panic messages (MystenLabs#17261)
Browse files Browse the repository at this point in the history
## Description 

Improve panic message when local timestamp < accepted block timestamp.

Add more details to logs for block creation and processing.

Remove logic no longer necessary after using DagState for proposing and
keeping track of last proposed ancestors.

Ensure timestamp_utc_ms() returns monotonic values. Ensure after Core
recover, local time > max ancestor timestamp.

## Test plan 

CI
Simulation

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
mwtian authored Apr 22, 2024
1 parent 1ad8b4f commit a887e82
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 112 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dashmap.workspace = true
enum_dispatch.workspace = true
fastcrypto.workspace = true
futures.workspace = true
itertools.workspace = true
quinn-proto.workspace = true
mockall.workspace = true
mysten-metrics.workspace = true
Expand Down
46 changes: 34 additions & 12 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{ready, stream, task, Stream, StreamExt};
use parking_lot::RwLock;
use tokio::{sync::broadcast, time::sleep};
use tokio_util::sync::ReusableBoxFuture;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use crate::{
block::{timestamp_utc_ms, BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
Expand Down Expand Up @@ -83,7 +83,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.metrics
.node_metrics
.invalid_blocks
.with_label_values(&[peer_hostname, "send_block"])
.with_label_values(&[peer_hostname, "handle_send_block"])
.inc();
let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
info!("Block with wrong authority from {}: {}", peer, e);
Expand All @@ -97,29 +97,37 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.metrics
.node_metrics
.invalid_blocks
.with_label_values(&[peer_hostname])
.with_label_values(&[peer_hostname, "handle_send_block"])
.inc();
info!("Invalid block from {}: {}", peer, e);
return Err(e);
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

// Reject block with timestamp too far in the future.
let forward_time_drift = Duration::from_millis(
verified_block
.timestamp_ms()
.saturating_sub(timestamp_utc_ms()),
);
let now = timestamp_utc_ms();
let forward_time_drift =
Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
if forward_time_drift > self.context.parameters.max_forward_time_drift {
self.context
.metrics
.node_metrics
.rejected_future_blocks
.with_label_values(&[&peer_hostname])
.inc();
return Err(ConsensusError::BlockTooFarInFuture {
block_timestamp: verified_block.timestamp_ms(),
forward_time_drift,
debug!(
"Block {:?} timestamp ({} > {}) is too far in the future, rejected.",
verified_block.reference(),
verified_block.timestamp_ms(),
now,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
reason: format!(
"Block timestamp is too far in the future: {} > {}",
verified_block.timestamp_ms(),
now
),
});
}

Expand All @@ -131,6 +139,13 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.block_timestamp_drift_wait_ms
.with_label_values(&[peer_hostname, &"handle_send_block"])
.inc_by(forward_time_drift.as_millis() as u64);
debug!(
"Block {:?} timestamp ({} > {}) is in the future, waiting for {}ms",
verified_block.reference(),
verified_block.timestamp_ms(),
now,
forward_time_drift.as_millis(),
);
sleep(forward_time_drift).await;
}

Expand Down Expand Up @@ -160,9 +175,16 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.rejected_blocks
.with_label_values(&[&"commit_lagging"])
.inc();
debug!(
"Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
verified_block.reference(),
last_commit_index,
quorum_commit_index,
);
return Err(ConsensusError::BlockRejected {
block_ref: verified_block.reference(),
reason: format!(
"Local commit index {} is too far behind the quorum commit index {}",
"Last commit index is lagging quorum commit index too much ({} < {})",
last_commit_index, quorum_commit_index,
),
});
Expand Down
34 changes: 23 additions & 11 deletions consensus/core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
fmt,
hash::{Hash, Hasher},
ops::Deref,
sync::Arc,
time::SystemTime,
sync::{Arc, OnceLock},
time::{Instant, SystemTime},
};

use bytes::Bytes;
Expand All @@ -32,11 +32,22 @@ pub(crate) const GENESIS_ROUND: Round = 0;
pub type BlockTimestampMs = u64;

// Returns the current time expressed as UNIX timestamp in milliseconds.
pub fn timestamp_utc_ms() -> BlockTimestampMs {
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_millis() as BlockTimestampMs,
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
}
// Calculated with Rust Instant to ensure monotonicity.
pub(crate) fn timestamp_utc_ms() -> BlockTimestampMs {
static UNIX_EPOCH: OnceLock<Instant> = OnceLock::new();
let unix_epoch_instant = UNIX_EPOCH.get_or_init(|| {
let now = Instant::now();
let duration_since_unix_epoch =
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(d) => d,
Err(e) => panic!("SystemTime before UNIX EPOCH! {e}"),
};
now.checked_sub(duration_since_unix_epoch).unwrap()
});
Instant::now()
.checked_duration_since(*unix_epoch_instant)
.unwrap()
.as_millis() as BlockTimestampMs
}

/// Sui transaction in serialised bytes
Expand Down Expand Up @@ -195,13 +206,13 @@ impl BlockRef {
// TODO: re-evaluate formats for production debugging.
impl fmt::Display for BlockRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "{}{}({})", self.author, self.round, self.digest)
write!(f, "B{}({},{})", self.round, self.author, self.digest)
}
}

impl fmt::Debug for BlockRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "{}{}({:?})", self.author, self.round, self.digest)
write!(f, "B{}({},{:?})", self.round, self.author, self.digest)
}
}

Expand Down Expand Up @@ -513,11 +524,12 @@ impl fmt::Debug for VerifiedBlock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(
f,
"{:?}({}ms;{:?};{};v)",
"{:?}({}ms;{:?};{}t;{}c)",
self.reference(),
self.timestamp_ms(),
self.ancestors(),
self.transactions().len()
self.transactions().len(),
self.commit_votes().len(),
)
}
}
Expand Down
7 changes: 6 additions & 1 deletion consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use std::{
sync::Arc,
};

use itertools::Itertools as _;
use mysten_metrics::monitored_scope;
use parking_lot::RwLock;
use tracing::warn;
use tracing::{debug, warn};

use crate::{
block::{BlockAPI, BlockRef, VerifiedBlock},
Expand Down Expand Up @@ -81,6 +82,10 @@ impl BlockManager {
let _s = monitored_scope("BlockManager::try_accept_blocks");

blocks.sort_by_key(|b| b.round());
debug!(
"Trying to accept blocks: {}",
blocks.iter().map(|b| b.reference().to_string()).join(",")
);

let mut accepted_blocks = vec![];
let missing_blocks_before = self.missing_blocks.clone();
Expand Down
12 changes: 12 additions & 0 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ impl CommitRef {
}
}

impl fmt::Display for CommitRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "C{}({})", self.index, self.digest)
}
}

impl fmt::Debug for CommitRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "C{}({:?})", self.index, self.digest)
}
}

// Represents a vote on a Commit.
pub type CommitVote = CommitRef;

Expand Down
22 changes: 15 additions & 7 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use std::{

use bytes::Bytes;
use consensus_config::AuthorityIndex;
use futures::{stream::FuturesOrdered, StreamExt};
use futures::{stream::FuturesOrdered, StreamExt as _};
use itertools::Itertools as _;
use mysten_metrics::spawn_logged_monitored_task;
use parking_lot::{Mutex, RwLock};
use rand::prelude::SliceRandom as _;
Expand All @@ -41,7 +42,7 @@ use tokio::{
task::{JoinHandle, JoinSet},
time::{sleep, Instant, MissedTickBehavior},
};
use tracing::{info, warn};
use tracing::{debug, info, warn};

use crate::{
block::{timestamp_utc_ms, BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
Expand Down Expand Up @@ -127,13 +128,13 @@ impl<C: NetworkClient> CommitSyncer<C> {
// Update synced_commit_index periodically to make sure it is not smaller than
// local commit index.
synced_commit_index = synced_commit_index.max(local_commit_index);
// TODO: pause commit sync when execution of commits is lagging behind.
// TODO: cleanup inflight fetches that are no longer needed.
let fetch_after_index = synced_commit_index.max(highest_scheduled_index.unwrap_or(0));
info!(
"Checking to schedule fetches: synced_commit_index={}, fetch_after_index={}, quorum_commit_index={}",
synced_commit_index, fetch_after_index, quorum_commit_index
"Checking to schedule fetches: synced_commit_index={}, highest_scheduled_index={}, quorum_commit_index={}",
synced_commit_index, highest_scheduled_index.unwrap_or(0), quorum_commit_index,
);
// TODO: pause commit sync when execution of commits is lagging behind, maybe through Core.
// TODO: cleanup inflight fetches that are no longer needed.
let fetch_after_index = synced_commit_index.max(highest_scheduled_index.unwrap_or(0));
// When the node is falling behind, schedule pending fetches which will be executed on later.
'pending: for prev_end in (fetch_after_index..=quorum_commit_index).step_by(inner.context.parameters.commit_sync_batch_size as usize) {
// Create range with inclusive start and end.
Expand Down Expand Up @@ -196,6 +197,13 @@ impl<C: NetworkClient> CommitSyncer<C> {
if fetched_end <= synced_commit_index {
continue 'fetched;
}
debug!(
"Fetched certified blocks: {}",
blocks
.iter()
.map(|b| b.reference().to_string())
.join(","),
);
// If core thread cannot handle the incoming blocks, it is ok to block here.
// Also it is possible to have missing ancestors because an equivocating validator
// may produce blocks that are not included in commits but are ancestors to other blocks.
Expand Down
Loading

0 comments on commit a887e82

Please sign in to comment.