Skip to content

Commit

Permalink
Wrap NW Cert in versioned enum (MystenLabs#9878)
Browse files Browse the repository at this point in the history
## Test Plan 

Unit Test & Benchmark
  • Loading branch information
arun-koshy authored Mar 27, 2023
1 parent b845251 commit 4882541
Show file tree
Hide file tree
Showing 30 changed files with 317 additions and 144 deletions.
15 changes: 10 additions & 5 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use lru::LruCache;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use narwhal_config::Committee;
use narwhal_executor::{ExecutionIndices, ExecutionState};
use narwhal_types::{ConsensusOutput, HeaderAPI};
use narwhal_types::{CertificateAPI, ConsensusOutput, HeaderAPI};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -132,7 +132,7 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {
/* (serialized, transaction, output_cert) */
let mut transactions = vec![];
// Narwhal enforces some invariants on the header.created_at, so we can use it as a timestamp
let timestamp = *consensus_output.sub_dag.leader.header.created_at();
let timestamp = *consensus_output.sub_dag.leader.header().created_at();

let prologue_transaction = self.consensus_commit_prologue_transaction(round, timestamp);
transactions.push((
Expand All @@ -152,10 +152,15 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {

self.metrics
.consensus_committed_subdags
.with_label_values(&[&consensus_output.sub_dag.leader.header.author().to_string()])
.with_label_values(&[&consensus_output
.sub_dag
.leader
.header()
.author()
.to_string()])
.inc();
for (cert, batches) in consensus_output.batches {
let author = cert.header.author();
let author = cert.header().author();
self.metrics
.consensus_committed_certificates
.with_label_values(&[&author.to_string()])
Expand Down Expand Up @@ -209,7 +214,7 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {

let certificate_author = AuthorityName::from_bytes(
self.committee
.authority_safe(&output_cert.header.author())
.authority_safe(&output_cert.header().author())
.protocol_key_bytes()
.0
.as_ref(),
Expand Down
8 changes: 4 additions & 4 deletions narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::sync::Arc;
use tokio::time::Instant;
use tracing::{debug, error_span};
use types::{
Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, HeaderAPI, ReputationScores,
Round,
Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, ConsensusStore, HeaderAPI,
ReputationScores, Round,
};

#[cfg(test)]
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ConsensusProtocol for Bullshark {
.get(&round)
.expect("We should have the whole history by now")
.values()
.filter(|(_, x)| x.header.parents().contains(leader_digest))
.filter(|(_, x)| x.header().parents().contains(leader_digest))
.map(|(_, x)| self.committee.stake_by_id(x.origin()))
.sum();

Expand Down Expand Up @@ -319,7 +319,7 @@ impl Bullshark {
for certificate in committed_sequence {
// TODO: we could iterate only the certificates of the round above the previous leader's round
if certificate
.header
.header()
.parents()
.iter()
.any(|digest| *digest == previous_leader)
Expand Down
17 changes: 9 additions & 8 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use storage::CertificateStore;
use tokio::{sync::watch, task::JoinHandle};
use tracing::{debug, info, instrument};
use types::{
metered_channel, Certificate, CertificateDigest, CommittedSubDag, CommittedSubDagShell,
ConditionalBroadcastReceiver, ConsensusStore, HeaderAPI, ReputationScores, Round, Timestamp,
metered_channel, Certificate, CertificateAPI, CertificateDigest, CommittedSubDag,
CommittedSubDagShell, ConditionalBroadcastReceiver, ConsensusStore, HeaderAPI,
ReputationScores, Round, Timestamp,
};

#[cfg(test)]
Expand Down Expand Up @@ -191,10 +192,10 @@ impl ConsensusState {
.last_committed_round
.with_label_values(&[])
.set(self.last_round.committed_round as i64);
let elapsed = certificate.metadata.created_at.elapsed().as_secs_f64();
let elapsed = certificate.metadata().created_at.elapsed().as_secs_f64();
self.metrics
.certificate_commit_latency
.observe(certificate.metadata.created_at.elapsed().as_secs_f64());
.observe(certificate.metadata().created_at.elapsed().as_secs_f64());

// NOTE: This log entry is used to compute performance.
tracing::debug!(
Expand All @@ -219,7 +220,7 @@ impl ConsensusState {
if let Some(round_table) = dag.get(&(round - 1)) {
let store_parents: BTreeSet<&CertificateDigest> =
round_table.iter().map(|(_, (digest, _))| digest).collect();
for parent_digest in certificate.header.parents() {
for parent_digest in certificate.header().parents() {
if !store_parents.contains(parent_digest) {
panic!("Parent digest {parent_digest:?} not found in DAG for {certificate:?}!");
}
Expand Down Expand Up @@ -418,13 +419,13 @@ where

if i % 5_000 == 0 {
#[cfg(not(feature = "benchmark"))]
tracing::debug!("Committed {}", certificate.header);
tracing::debug!("Committed {}", certificate.header());
}

#[cfg(feature = "benchmark")]
for digest in certificate.header.payload().keys() {
for digest in certificate.header().payload().keys() {
// NOTE: This log entry is used to compute performance.
tracing::info!("Committed {} -> {:?}", certificate.header, digest);
tracing::info!("Committed {} -> {:?}", certificate.header(), digest);
}

commited_certificates.push(certificate.clone());
Expand Down
4 changes: 2 additions & 2 deletions narwhal/consensus/src/tests/bullshark_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use test_utils::CommitteeFixture;
use tokio::sync::mpsc::channel;
use tokio::sync::watch;
use tracing::info;
use types::{HeaderAPI, PreSubscribedBroadcastSender};
use types::{CertificateAPI, HeaderAPI, PreSubscribedBroadcastSender};

// Run for 4 dag rounds in ideal conditions (all nodes reference all other nodes). We should commit
// the leader of round 2.
Expand Down Expand Up @@ -877,7 +877,7 @@ async fn garbage_collection_basic() {
!sub_dag
.certificates
.iter()
.any(|c| c.header.author() == slow_node),
.any(|c| c.header().author() == slow_node),
"Slow authority shouldn't be amongst the committed ones"
);

Expand Down
10 changes: 6 additions & 4 deletions narwhal/consensus/src/tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use crate::consensus::ConsensusRound;
use crate::metrics::ConsensusMetrics;
use crate::Consensus;
use crate::NUM_SHUTDOWN_RECEIVERS;
use types::{Certificate, HeaderAPI, PreSubscribedBroadcastSender, ReputationScores};
use types::{
Certificate, CertificateAPI, HeaderAPI, PreSubscribedBroadcastSender, ReputationScores,
};

/// This test is trying to compare the output of the Consensus algorithm when:
/// (1) running without any crash for certificates processed from round 1 to 5 (inclusive)
Expand Down Expand Up @@ -186,10 +188,10 @@ async fn test_consensus_recovery_with_bullshark() {
// We omit round 7 so we can feed those later after "crash" to trigger a new leader
// election round and commit.
for certificate in certificates.iter() {
if certificate.header.round() <= 3 {
if certificate.header().round() <= 3 {
tx_waiter.send(certificate.clone()).await.unwrap();
}
if certificate.header.round() <= 6 {
if certificate.header().round() <= 6 {
certificate_store.write(certificate.clone()).unwrap();
}
}
Expand Down Expand Up @@ -252,7 +254,7 @@ async fn test_consensus_recovery_with_bullshark() {
// WHEN send the certificates of round >= 5 to trigger a leader election for round 4
// and start committing.
for certificate in certificates.iter() {
if certificate.header.round() >= 5 {
if certificate.header().round() >= 5 {
tx_waiter.send(certificate.clone()).await.unwrap();
}
}
Expand Down
6 changes: 3 additions & 3 deletions narwhal/consensus/src/tests/dag_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use indexmap::IndexMap;
use prometheus::Registry;
use std::{collections::BTreeSet, sync::Arc};
use test_utils::{make_optimal_certificates, CommitteeFixture};
use types::{Certificate, HeaderAPI, PreSubscribedBroadcastSender};
use types::{Certificate, CertificateAPI, HeaderAPI, PreSubscribedBroadcastSender};

#[tokio::test]
async fn inner_dag_insert_one() {
Expand Down Expand Up @@ -154,7 +154,7 @@ async fn test_dag_compresses_empty_blocks() {
make_optimal_certificates(&committee, 1..=1, &genesis.clone(), &ids);
// make those empty
for cert in certificates.iter_mut() {
cert.header.update_payload(IndexMap::new());
cert.header_mut().update_payload(IndexMap::new());
}

// Feed the certificates to the Dag
Expand Down Expand Up @@ -222,7 +222,7 @@ async fn test_dag_rounds_after_compression() {
make_optimal_certificates(&committee, 1..=1, &genesis.clone(), &ids);
// make those empty
for cert in certificates.iter_mut() {
cert.header.update_payload(IndexMap::new());
cert.header_mut().update_payload(IndexMap::new());
}

// Feed the certificates to the Dag
Expand Down
6 changes: 3 additions & 3 deletions narwhal/consensus/src/tusk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use fastcrypto::hash::Hash;
use std::{collections::HashMap, sync::Arc};
use tracing::{debug, error_span};
use types::{
Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, HeaderAPI, ReputationScores,
Round,
Certificate, CertificateAPI, CertificateDigest, CommittedSubDag, ConsensusStore, HeaderAPI,
ReputationScores, Round,
};

#[cfg(any(test))]
Expand Down Expand Up @@ -70,7 +70,7 @@ impl ConsensusProtocol for Tusk {
.get(&(r - 1))
.expect("We should have the whole history by now")
.values()
.filter(|(_, x)| x.header.parents().contains(leader_digest))
.filter(|(_, x)| x.header().parents().contains(leader_digest))
.map(|(_, x)| self.committee.stake_by_id(x.origin()))
.sum();

Expand Down
10 changes: 7 additions & 3 deletions narwhal/consensus/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::consensus::{ConsensusState, Dag};
use config::Committee;
use std::collections::HashSet;
use tracing::debug;
use types::{Certificate, CertificateDigest, HeaderAPI, Round};
use types::{Certificate, CertificateAPI, CertificateDigest, HeaderAPI, Round};

/// Order the past leaders that we didn't already commit.
pub fn order_leaders<'a, LeaderElector>(
Expand Down Expand Up @@ -47,7 +47,11 @@ fn linked(leader: &Certificate, prev_leader: &Certificate, dag: &Dag) -> bool {
.get(&r)
.expect("We should have the whole history by now")
.values()
.filter(|(digest, _)| parents.iter().any(|x| x.header.parents().contains(digest)))
.filter(|(digest, _)| {
parents
.iter()
.any(|x| x.header().parents().contains(digest))
})
.map(|(_, certificate)| certificate)
.collect();
}
Expand All @@ -72,7 +76,7 @@ pub fn order_dag(leader: &Certificate, state: &ConsensusState) -> Vec<Certificat
// Do not try to order parents of the certificate, since they have been GC'ed.
continue;
}
for parent in x.header.parents() {
for parent in x.header().parents() {
let (digest, certificate) = match state
.dag
.get(&(x.round() - 1))
Expand Down
12 changes: 6 additions & 6 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tokio::{sync::oneshot, task::JoinHandle};
use tracing::{debug, error, trace, warn};
use tracing::{info, instrument};
use types::{
metered_channel, Batch, BatchDigest, Certificate, CommittedSubDag,
metered_channel, Batch, BatchDigest, Certificate, CertificateAPI, CommittedSubDag,
ConditionalBroadcastReceiver, ConsensusOutput, HeaderAPI, RequestBatchesResponse, Timestamp,
};

Expand Down Expand Up @@ -237,7 +237,7 @@ impl<Network: SubscriberNetwork> Fetcher<Network> {
> = HashMap::new();

for cert in &sub_dag.certificates {
for (digest, (worker_id, _)) in cert.header.payload().iter() {
for (digest, (worker_id, _)) in cert.header().payload().iter() {
let workers = self.network.workers_for_certificate(cert, worker_id);
batch_digests_and_workers
.entry(*worker_id)
Expand Down Expand Up @@ -270,7 +270,7 @@ impl<Network: SubscriberNetwork> Fetcher<Network> {
// Map all fetched batches to their respective certificates and submit as
// consensus output
for cert in &sub_dag.certificates {
let mut output_batches = Vec::with_capacity(cert.header.payload().len());
let mut output_batches = Vec::with_capacity(cert.header().payload().len());
let output_cert = cert.clone();

self.metrics
Expand All @@ -279,9 +279,9 @@ impl<Network: SubscriberNetwork> Fetcher<Network> {

self.metrics
.subscriber_certificate_latency
.observe(cert.metadata.created_at.elapsed().as_secs_f64());
.observe(cert.metadata().created_at.elapsed().as_secs_f64());

for (digest, (_, _)) in cert.header.payload().iter() {
for (digest, (_, _)) in cert.header().payload().iter() {
self.metrics.subscriber_processed_batches.inc();
let batch = fetched_batches
.get(digest)
Expand Down Expand Up @@ -949,7 +949,7 @@ mod tests {
certificate: &Certificate,
worker_id: &WorkerId,
) -> Vec<NetworkPublicKey> {
let payload = certificate.header.payload().to_owned();
let payload = certificate.header().payload().to_owned();
let digest = payload.keys().next().unwrap();
self.data
.get(worker_id)
Expand Down
16 changes: 11 additions & 5 deletions narwhal/node/tests/staged/narwhal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ BatchDigest:
CONTENT: U8
SIZE: 32
Certificate:
ENUM:
0:
V1:
NEWTYPE:
TYPENAME: CertificateV1
CertificateDigest:
NEWTYPESTRUCT:
TUPLEARRAY:
CONTENT: U8
SIZE: 32
CertificateV1:
STRUCT:
- header:
TYPENAME: Header
Expand All @@ -24,11 +35,6 @@ Certificate:
- signed_authorities: BYTES
- metadata:
TYPENAME: Metadata
CertificateDigest:
NEWTYPESTRUCT:
TUPLEARRAY:
CONTENT: U8
SIZE: 32
Header:
ENUM:
0:
Expand Down
4 changes: 2 additions & 2 deletions narwhal/primary/src/aggregators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::warn;
use types::{
ensure,
error::{DagError, DagResult},
Certificate, Header, Vote,
Certificate, CertificateAPI, Header, Vote,
};

/// Aggregates votes for a particular header into a certificate.
Expand Down Expand Up @@ -63,7 +63,7 @@ impl VotesAggregator {
let (_, pks) = cert.signed_by(committee);

let certificate_digest: Digest<{ crypto::DIGEST_LENGTH }> = Digest::from(cert.digest());
match AggregateSignature::try_from(&cert.aggregated_signature)
match AggregateSignature::try_from(cert.aggregated_signature())
.map_err(|_| DagError::InvalidSignature)?
.verify_secure(&to_intent_message(certificate_digest), &pks[..])
{
Expand Down
12 changes: 8 additions & 4 deletions narwhal/primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use store::rocks::TypedStoreError;

use tracing::{debug, instrument, warn};
use types::{
metered_channel::Sender, BatchDigest, Certificate, CertificateDigest, HeaderAPI, HeaderDigest,
Round,
metered_channel::Sender, BatchDigest, Certificate, CertificateAPI, CertificateDigest,
HeaderAPI, HeaderDigest, Round,
};

#[cfg(test)]
Expand Down Expand Up @@ -138,7 +138,7 @@ impl BlockRemover {
batches_by_worker: HashMap<WorkerId, Vec<BatchDigest>>,
) -> Result<(), Either<TypedStoreError, ValidatorDagError>> {
let header_digests: Vec<HeaderDigest> =
certificates.iter().map(|c| c.header.digest()).collect();
certificates.iter().map(|c| c.header().digest()).collect();

self.header_store
.remove_all(header_digests)
Expand Down Expand Up @@ -173,7 +173,11 @@ impl BlockRemover {
if !certificates.is_empty() {
let all_certs = certificates.clone();
// Unwrap safe since list is not empty.
let highest_round = certificates.iter().map(|c| c.header.round()).max().unwrap();
let highest_round = certificates
.iter()
.map(|c| c.header().round())
.max()
.unwrap();

// We signal that these certificates must have been committed by the external consensus
self.tx_committed_certificates
Expand Down
Loading

0 comments on commit 4882541

Please sign in to comment.