Skip to content

Commit

Permalink
move metrics, use serialized tx for hash
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Dec 6, 2022
1 parent bad51ad commit fd72e10
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 30 deletions.
16 changes: 15 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ pub struct AuthorityMetrics {
pub(crate) batch_svc_is_running: IntCounter,

pending_notify_read: IntGauge,

/// Consensus handler metrics
pub consensus_handler_processed_batches: IntCounter,
pub consensus_handler_processed_bytes: IntCounter,
}

// Override default Prom buckets for positive numbers in 0-50k range
Expand Down Expand Up @@ -472,6 +476,16 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
consensus_handler_processed_batches: register_int_counter_with_registry!(
"consensus_handler_processed_batches",
"Number of batches processed by consensus_handler",
registry
).unwrap(),
consensus_handler_processed_bytes: register_int_counter_with_registry!(
"consensus_handler_processed_bytes",
"Number of bytes processed by consensus_handler",
registry
).unwrap(),
}
}
}
Expand Down Expand Up @@ -2171,7 +2185,7 @@ impl AuthorityState {
})?;
}
ConsensusTransactionKind::EndOfPublish(authority) => {
if &AuthorityName::from(&transaction.certificate.origin()) != authority {
if &transaction.sender_authority() != authority {
warn!(
"EndOfPublish authority {} does not match narwhal certificate source {}",
authority,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
next_versions,
)
}

/// Returns transaction digests from consensus_message_order table in the "checkpoint range".
///
/// Checkpoint range is defined from the last seen checkpoint(excluded) to the provided
Expand Down
15 changes: 12 additions & 3 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ConsensusHandler {
fn update_hash(
last_seen: &Mutex<ExecutionIndicesWithHash>,
index: ExecutionIndices,
v: &[u8; 8],
v: &[u8],
) -> Option<ExecutionIndicesWithHash> {
let mut last_seen_guard = last_seen
.try_lock()
Expand Down Expand Up @@ -74,12 +74,16 @@ impl ExecutionState for ConsensusHandler {
let mut sequenced_transactions = Vec::new();
let mut seq = 0;

let mut bytes = 0usize;
let round = consensus_output.sub_dag.round();
for (cert, batches) in consensus_output.batches {
let author = cert.header.author.clone();
let output_cert = Arc::new(cert);
for batch in batches {
self.state.metrics.consensus_handler_processed_batches.inc();
for serialized_transaction in batch.transactions {
bytes += serialized_transaction.len();

let transaction = match bincode::deserialize::<ConsensusTransaction>(
&serialized_transaction,
) {
Expand All @@ -99,7 +103,7 @@ impl ExecutionState for ConsensusHandler {
};

let index_with_hash =
match Self::update_hash(&self.last_seen, index, &transaction.tracking_id) {
match Self::update_hash(&self.last_seen, index, &serialized_transaction) {
Some(i) => i,
None => {
debug!(
Expand All @@ -120,6 +124,11 @@ impl ExecutionState for ConsensusHandler {
}
}

self.state
.metrics
.consensus_handler_processed_bytes
.inc_by(bytes as u64);

for sequenced_transaction in sequenced_transactions {
let verified_transaction = match self
.state
Expand Down Expand Up @@ -207,7 +216,7 @@ pub fn test_update_hash() {
};

let last_seen = Mutex::new(last_seen);
let tx = &[0, 0, 0, 0, 0, 0, 0, 0];
let tx = &[0];
assert!(ConsensusHandler::update_hash(&last_seen, index0, tx).is_none());
assert!(ConsensusHandler::update_hash(&last_seen, index1, tx).is_none());
assert!(ConsensusHandler::update_hash(&last_seen, index2, tx).is_some());
Expand Down
7 changes: 3 additions & 4 deletions narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,14 @@ impl ConsensusProtocol for Bullshark {
sub_dag_index: next_sub_dag_index,
};

// Increase the global consensus index.
state.latest_sub_dag_index = next_sub_dag_index;

// Persist the update.
// TODO [issue #116]: Ensure this is not a performance bottleneck.
self.store
.write_consensus_state(&state.last_committed, &sub_dag)?;
debug!("Store commit index:{},", &next_sub_dag_index,);

// Increase the global consensus index.
state.latest_sub_dag_index = next_sub_dag_index;

committed_sub_dags.push(sub_dag);
}

Expand Down
7 changes: 3 additions & 4 deletions narwhal/consensus/src/tusk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ impl ConsensusProtocol for Tusk {
sub_dag_index: next_sub_dag_index,
};

// Increase the global consensus index.
state.latest_sub_dag_index = next_sub_dag_index;

// Persist the update.
// TODO [issue #116]: Ensure this is not a performance bottleneck.
self.store
.write_consensus_state(&state.last_committed, &sub_dag)?;

// Increase the global consensus index.
state.latest_sub_dag_index = next_sub_dag_index;

committed_sub_dags.push(sub_dag);
}

Expand Down
14 changes: 0 additions & 14 deletions narwhal/executor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ pub struct ExecutorMetrics {
pub subscriber_remote_fetch_latency: Histogram,
/// Number of times certificate was found locally
pub subscriber_local_hit: IntCounter,
/// Number of batches processed by notifier
pub notifier_processed_batches: IntCounter,
/// Number of bytes processed by notifier
pub notifier_processed_bytes: IntCounter,
/// Number of certificates processed by subscriber
pub subscriber_processed_certificates: IntCounter,
/// Round of last certificate seen by subscriber
Expand Down Expand Up @@ -77,16 +73,6 @@ impl ExecutorMetrics {
"Number of times certificate was found locally",
registry
).unwrap(),
notifier_processed_batches: register_int_counter_with_registry!(
"notifier_processed_batches",
"Number of batches processed by notifier",
registry
).unwrap(),
notifier_processed_bytes: register_int_counter_with_registry!(
"notifier_processed_bytes",
"Number of bytes processed by notifier",
registry
).unwrap(),
subscriber_processed_certificates: register_int_counter_with_registry!(
"subscriber_processed_certificates",
"Number of certificates processed by subscriber",
Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/tests/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ExecutionState for SimpleExecutionState {

impl SimpleExecutionState {
async fn process_transaction(&self, transaction: Transaction, change_epoch: bool) {
let transaction: u64 = bincode::deserialize(&transaction).unwrap();
let _transaction: u64 = bincode::deserialize(&transaction).unwrap();
// Change epoch every few certificates. Note that empty certificates are not provided to
// this function (they are immediately skipped).
let mut epoch = self.committee.lock().unwrap().epoch();
Expand Down
3 changes: 0 additions & 3 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,6 @@ impl Primary {
}
}

// TODO (Laura): if we are restarting and not advancing, for the headers in the header
// TODO (Laura): store that do not have a matching certificate, re-create and send a vote
// The `Core` receives and handles headers, votes, and certificates from the other primaries.
let core_primary_network = P2pNetwork::new(network.clone());
let core_handle = Core::spawn(
name.clone(),
Expand Down

0 comments on commit fd72e10

Please sign in to comment.