Skip to content

Commit

Permalink
[refactor] always include the zero-score authorities as well (MystenL…
Browse files Browse the repository at this point in the history
…abs#8886)

## Description 

This PR is modifying the current `ReputationScores` implementation so it
always includes all the authorities - even the zero score ones.

## Test Plan 

Unit tests

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [x] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Mar 7, 2023
1 parent 95a5a4f commit c733745
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 31 deletions.
2 changes: 1 addition & 1 deletion narwhal/consensus/benches/process_certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn process_certificates(c: &mut Criterion) {
let store = make_consensus_store(&store_path);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));

let mut state = ConsensusState::new(metrics.clone());
let mut state = ConsensusState::new(metrics.clone(), &committee);

let data_size: usize = certificates
.iter()
Expand Down
9 changes: 8 additions & 1 deletion narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl Bullshark {
// TODO: when schedule change is implemented we should probably change a little bit
// this logic here.
if sub_dag_index % self.num_sub_dags_per_schedule == 0 {
state.last_consensus_reputation_score = ReputationScores::default()
state.last_consensus_reputation_score = ReputationScores::new(&self.committee)
}

// update the score for the previous leader. If no previous leader exists,
Expand All @@ -369,6 +369,13 @@ impl Bullshark {
state.last_consensus_reputation_score.final_of_schedule =
(sub_dag_index + 1) % self.num_sub_dags_per_schedule == 0;

// Always ensure that all the authorities are present in the reputation scores - even
// when score is zero.
assert_eq!(
state.last_consensus_reputation_score.total_authorities() as usize,
self.committee.authorities.len()
);

state.last_consensus_reputation_score.clone()
}
}
8 changes: 5 additions & 3 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ pub struct ConsensusState {
}

impl ConsensusState {
pub fn new(metrics: Arc<ConsensusMetrics>) -> Self {
pub fn new(metrics: Arc<ConsensusMetrics>, committee: &Committee) -> Self {
Self {
last_committed_round: 0,
last_committed: Default::default(),
latest_sub_dag_index: 0,
dag: Default::default(),
last_consensus_reputation_score: ReputationScores::default(),
last_consensus_reputation_score: ReputationScores::new(committee),
last_committed_leader: None,
metrics,
}
Expand All @@ -70,6 +70,7 @@ impl ConsensusState {
recovered_last_committed: HashMap<PublicKey, Round>,
latest_sub_dag: Option<CommittedSubDagShell>,
cert_store: CertificateStore,
committee: &Committee,
) -> Self {
let gc_round = last_committed_round.saturating_sub(gc_depth);
let dag =
Expand All @@ -80,7 +81,7 @@ impl ConsensusState {
let (latest_sub_dag_index, last_consensus_reputation_score, last_committed_leader) =
latest_sub_dag
.map(|s| (s.sub_dag_index, s.reputation_score, Some(s.leader)))
.unwrap_or_default();
.unwrap_or((0, ReputationScores::new(committee), None));

Self {
last_committed_round,
Expand Down Expand Up @@ -278,6 +279,7 @@ where
recovered_last_committed,
latest_sub_dag,
cert_store,
&committee,
);

tx_consensus_round_updates
Expand Down
65 changes: 42 additions & 23 deletions narwhal/consensus/src/tests/bullshark_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ async fn commit_one() {
assert_eq!(output.round(), 2);

// AND the reputation scores have not been updated
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 0);
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 4);
assert!(committed_sub_dag.reputation_score.all_zero());
}

// Run for 8 dag rounds with one dead node node (that is not a leader). We should commit the leaders of
Expand All @@ -110,7 +111,7 @@ async fn dead_node() {
let committee = fixture.committee();
let mut keys: Vec<_> = fixture.authorities().map(|a| a.public_key()).collect();
keys.sort(); // Ensure we don't remove one of the leaders.
let _ = keys.pop().unwrap();
let dead_node = keys.pop().unwrap();

let genesis = Certificate::genesis(&committee)
.iter()
Expand Down Expand Up @@ -182,14 +183,26 @@ async fn dead_node() {

// AND check that the consensus scores are the expected ones
for (index, sub_dag) in committed_sub_dags.iter().enumerate() {
// For the first commit we don't expect to have any score updates
assert_eq!(sub_dag.reputation_score.total_authorities(), 4);

// For the first commit we expect to have any only zero scores
if index == 0 {
assert_eq!(sub_dag.reputation_score.total_authorities(), 0);
sub_dag
.reputation_score
.scores_per_authority
.iter()
.for_each(|(_key, score)| {
assert_eq!(*score, 0_u64);
});
} else {
// For any other commit we expect to always have a +1 score for each authority, as everyone
// always votes for the leader
for score in sub_dag.reputation_score.scores_per_authority.values() {
assert_eq!(*score as usize, index);
for (key, score) in &sub_dag.reputation_score.scores_per_authority {
if *key == dead_node {
assert_eq!(*score as usize, 0);
} else {
assert_eq!(*score as usize, index);
}
}
}
}
Expand Down Expand Up @@ -315,8 +328,9 @@ async fn not_enough_support() {
let output = sequence.next().unwrap();
assert_eq!(output.round(), 2);

// AND no scores exist for leader 2 , as this is the first commit
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 0);
// AND all scores are zero for leader 2 , as this is the first commit
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 4);
assert!(committed_sub_dag.reputation_score.all_zero());

let committed_sub_dag: CommittedSubDag = rx_output.recv().await.unwrap();
let mut sequence = committed_sub_dag.certificates.into_iter();
Expand All @@ -332,17 +346,22 @@ async fn not_enough_support() {
assert_eq!(output.round(), 4);

// AND scores should be updated with everyone that has voted for leader of round 2.
// Only node 0 has voted for the leader of this round, so only 1 score should exist
// with value 1
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 1);
// Only node 0 has voted for the leader of this round, so only their score should exist
// with value 1, and everything else should be zero.
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 4);

let node_0_name = &keys[0];
let score = committed_sub_dag
let node_0_name: PublicKey = keys[0].clone();
committed_sub_dag
.reputation_score
.scores_per_authority
.get(node_0_name)
.unwrap();
assert_eq!(*score, 1);
.iter()
.for_each(|(key, score)| {
if *key == node_0_name {
assert_eq!(*score, 1_u64);
} else {
assert_eq!(*score, 0_u64);
}
});
}

// Run for 7 dag rounds. Node 0 (the leader of round 2) is missing for rounds 1 and 2,
Expand Down Expand Up @@ -437,8 +456,8 @@ async fn missing_leader() {
let output = sequence.next().unwrap();
assert_eq!(output.round(), 4);

// AND no scores exist since this is the first commit that has happened
assert_eq!(committed_sub_dag.reputation_score.total_authorities(), 0);
// AND all scores are zero since this is the first commit that has happened
assert!(committed_sub_dag.reputation_score.all_zero());
}

// Run for 11 dag rounds in ideal conditions (all nodes reference all other nodes).
Expand Down Expand Up @@ -560,7 +579,7 @@ async fn delayed_certificates_are_rejected() {
test_utils::make_certificates_with_epoch(&committee, 1..=5, epoch, &genesis, &keys);

let store = make_consensus_store(&test_utils::temp_dir());
let mut state = ConsensusState::new(metrics.clone());
let mut state = ConsensusState::new(metrics.clone(), &committee);
let mut bullshark = Bullshark::new(
committee,
store,
Expand Down Expand Up @@ -612,7 +631,7 @@ async fn submitting_equivocating_certificate_should_error() {
test_utils::make_certificates_with_epoch(&committee, 1..=1, epoch, &genesis, &keys);

let store = make_consensus_store(&test_utils::temp_dir());
let mut state = ConsensusState::new(metrics.clone());
let mut state = ConsensusState::new(metrics.clone(), &committee);
let mut bullshark = Bullshark::new(
committee.clone(),
store,
Expand Down Expand Up @@ -675,7 +694,7 @@ async fn reset_consensus_scores_on_every_schedule_change() {
test_utils::make_certificates_with_epoch(&committee, 1..=50, epoch, &genesis, &keys);

let store = make_consensus_store(&test_utils::temp_dir());
let mut state = ConsensusState::new(metrics.clone());
let mut state = ConsensusState::new(metrics.clone(), &committee);
let mut bullshark = Bullshark::new(
committee,
store,
Expand All @@ -696,9 +715,9 @@ async fn reset_consensus_scores_on_every_schedule_change() {
// ensure the leaders of rounds 2 and 4 have been committed
let mut current_score = 0;
for sub_dag in all_subdags {
// The first commit has no scores
// The first commit has all zero scores
if sub_dag.sub_dag_index == 1 {
assert_eq!(sub_dag.reputation_score.total_authorities(), 0);
assert!(sub_dag.reputation_score.all_zero());
} else if sub_dag.sub_dag_index % NUM_SUB_DAGS_PER_SCHEDULE == 0 {
// On every 5th commit we reset the scores and count from the beginning with
// scores updated to 1, as we expect now every node to have voted for the previous leader.
Expand Down
2 changes: 2 additions & 0 deletions narwhal/consensus/src/tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ async fn test_consensus_recovery_with_bullshark() {

'main: while let Some(sub_dag) = rx_output.recv().await {
score_with_crash = sub_dag.reputation_score.clone();
assert_eq!(score_with_crash.total_authorities(), 4);

for output in sub_dag.certificates {
assert!(output.round() >= 2);

Expand Down
6 changes: 3 additions & 3 deletions narwhal/consensus/src/tusk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ mod tests {

let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));

let mut state = ConsensusState::new(metrics);
let mut state = ConsensusState::new(metrics, &committee);
let mut tusk = Tusk::new(committee, store, gc_depth);
for certificate in certificates {
tusk.process_certificate(&mut state, certificate).unwrap();
Expand Down Expand Up @@ -237,14 +237,14 @@ mod tests {
// TODO: evidence that this test fails when `failure_probability` parameter >= 1/3
let (certificates, _next_parents) =
test_utils::make_certificates(&committee, 1..=rounds, &genesis, &keys, 0.333);
let arc_committee = Arc::new(ArcSwap::from_pointee(committee));
let arc_committee = Arc::new(ArcSwap::from_pointee(committee.clone()));

let store_path = test_utils::temp_dir();
let store = make_consensus_store(&store_path);

let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));

let mut state = ConsensusState::new(metrics);
let mut state = ConsensusState::new(metrics, &committee);
let mut tusk = Tusk::new((**arc_committee.load()).clone(), store, gc_depth);

for certificate in certificates {
Expand Down
18 changes: 18 additions & 0 deletions narwhal/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![allow(clippy::mutable_key_type)]

use crate::{Batch, Certificate, CertificateDigest, Round};
use config::Committee;
use crypto::PublicKey;
use fastcrypto::hash::Hash;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -78,6 +79,19 @@ pub struct ReputationScores {
}

impl ReputationScores {
/// Creating a new ReputationScores instance pre-populating the authorities entries with
/// zero score value.
pub fn new(committee: &Committee) -> Self {
let scores_per_authority = committee
.authorities()
.map(|a| (a.0.clone(), 0_u64))
.collect();

Self {
scores_per_authority,
..Default::default()
}
}
/// Adds the provided `score` to the existing score for the provided `authority`
pub fn add_score(&mut self, authority: PublicKey, score: u64) {
self.scores_per_authority
Expand All @@ -89,6 +103,10 @@ impl ReputationScores {
pub fn total_authorities(&self) -> u64 {
self.scores_per_authority.len() as u64
}

pub fn all_zero(&self) -> bool {
!self.scores_per_authority.values().any(|e| *e > 0)
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down

0 comments on commit c733745

Please sign in to comment.