Skip to content

Commit

Permalink
[narwhal] calculate consensus reputation scores (MystenLabs#8581)
Browse files Browse the repository at this point in the history
## Description 

Introduces the authority score calculation as this is [described
here](https://www.notion.so/mystenlabs/2023-04-Leader-Reputation-c9204dd9e91249d5a68175e7076f9e91?pvs=4#84e2b6aac24241a1bec8b8fd51ea51ab).
The `change_schedule_every_committed_sub_dags` has been introduced to
accommodate the score reset for every `K` commit rounds which will be
later needed once we implement the schedule change. Also, the crash
recovery path has been considered to ensure that we keep counting scores
from where we left off. Additional testing has been introduced.

## Test Plan 

Unit tests included under the bullshark_tests file.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [x] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Mar 2, 2023
1 parent 4e63679 commit 2ddd42d
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 41 deletions.
59 changes: 58 additions & 1 deletion narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ use crate::{
};
use config::{Committee, Stake};
use crypto::PublicKey;
use fastcrypto::hash::Hash;
use fastcrypto::traits::EncodeDecodeBase64;
use std::{collections::BTreeSet, sync::Arc};
use tokio::time::Instant;
use tracing::{debug, trace};
use types::{Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, Round};
use types::{
Certificate, CertificateDigest, CommittedSubDag, ConsensusStore, ReputationScores, Round,
};

#[cfg(test)]
#[path = "tests/bullshark_tests.rs"]
Expand Down Expand Up @@ -48,6 +51,9 @@ pub struct Bullshark {
pub last_leader_election: LastRound,
/// The most recent round of inserted certificate
pub max_inserted_certificate_round: Round,
/// The number of committed subdags that will trigger the schedule change and reputation
/// score reset.
pub num_sub_dags_per_schedule: u64,
}

impl ConsensusProtocol for Bullshark {
Expand Down Expand Up @@ -163,10 +169,16 @@ impl ConsensusProtocol for Bullshark {
}

let next_sub_dag_index = state.latest_sub_dag_index + 1;

// We update the reputation score stored in state
let reputation_score =
self.update_reputation_score(state, &sequence, next_sub_dag_index);

let sub_dag = CommittedSubDag {
certificates: sequence,
leader: leader.clone(),
sub_dag_index: next_sub_dag_index,
reputation_score,
};

// Persist the update.
Expand All @@ -176,6 +188,7 @@ impl ConsensusProtocol for Bullshark {

// Increase the global consensus index.
state.latest_sub_dag_index = next_sub_dag_index;
state.last_committed_leader = Some(sub_dag.leader.digest());

committed_sub_dags.push(sub_dag);
}
Expand Down Expand Up @@ -236,6 +249,7 @@ impl Bullshark {
store: Arc<ConsensusStore>,
gc_depth: Round,
metrics: Arc<ConsensusMetrics>,
num_sub_dags_per_schedule: u64,
) -> Self {
Self {
committee,
Expand All @@ -245,6 +259,7 @@ impl Bullshark {
last_leader_election: LastRound::default(),
max_inserted_certificate_round: 0,
metrics,
num_sub_dags_per_schedule,
}
}

Expand Down Expand Up @@ -314,4 +329,46 @@ impl Bullshark {
// Return its certificate and the certificate's digest.
dag.get(&round).and_then(|x| x.get(&leader))
}

/// Updates and calculates the reputation score for the current commit managing any internal state.
/// It returns the updated reputation score.
fn update_reputation_score(
&mut self,
state: &mut ConsensusState,
committed_sequence: &[Certificate],
sub_dag_index: u64,
) -> ReputationScores {
// we reset the scores for every schedule change window.
// TODO: when schedule change is implemented we should probably change a little bit
// this logic here.
if sub_dag_index % self.num_sub_dags_per_schedule == 0 {
state.last_consensus_reputation_score = ReputationScores::default()
}

// update the score for the previous leader. If no previous leader exists,
// then this is the first time we commit a leader, so no score update takes place
if let Some(previous_leader) = state.last_committed_leader {
for certificate in committed_sequence {
// TODO: we could iterate only the certificates of the round above the previous leader's round
if certificate
.header
.parents
.iter()
.any(|digest| *digest == previous_leader)
{
state
.last_consensus_reputation_score
.add_score(certificate.origin(), 1);
}
}
}

// we check if this is the last subdag of the current schedule. If yes then we mark the
// scores as final_of_schedule = true so any downstream user can now that those are the last
// ones calculated for the current schedule.
state.last_consensus_reputation_score.final_of_schedule =
(sub_dag_index + 1) % self.num_sub_dags_per_schedule == 0;

state.last_consensus_reputation_score.clone()
}
}
24 changes: 19 additions & 5 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use storage::CertificateStore;
use tokio::{sync::watch, task::JoinHandle};
use tracing::{debug, info, instrument};
use types::{
metered_channel, Certificate, CertificateDigest, CommittedSubDag, ConditionalBroadcastReceiver,
ConsensusStore, Round, Timestamp,
metered_channel, Certificate, CertificateDigest, CommittedSubDag, CommittedSubDagShell,
ConditionalBroadcastReceiver, ConsensusStore, ReputationScores, Round, Timestamp,
};

#[cfg(test)]
Expand All @@ -38,6 +38,11 @@ pub struct ConsensusState {
pub last_committed: HashMap<PublicKey, Round>,
/// Used to populate the index in the sub-dag construction.
pub latest_sub_dag_index: SequenceNumber,
/// The last calculated consensus reputation score
pub last_consensus_reputation_score: ReputationScores,
/// The last committed sub dag leader. This allow us to calculate the reputation score of the nodes
/// that vote for the last leader.
pub last_committed_leader: Option<CertificateDigest>,
/// Keeps the latest committed certificate (and its parents) for every authority. Anything older
/// must be regularly cleaned up through the function `update`.
pub dag: Dag,
Expand All @@ -52,14 +57,16 @@ impl ConsensusState {
last_committed: Default::default(),
latest_sub_dag_index: 0,
dag: Default::default(),
last_consensus_reputation_score: ReputationScores::default(),
last_committed_leader: None,
metrics,
}
}

pub fn new_from_store(
metrics: Arc<ConsensusMetrics>,
recover_last_committed: HashMap<PublicKey, Round>,
latest_sub_dag_index: SequenceNumber,
latest_sub_dag: Option<CommittedSubDagShell>,
cert_store: CertificateStore,
) -> Self {
let last_committed_round = *recover_last_committed
Expand All @@ -71,10 +78,17 @@ impl ConsensusState {
.expect("error when recovering DAG from store");
metrics.recovered_consensus_state.inc();

let (latest_sub_dag_index, last_consensus_reputation_score, last_committed_leader) =
latest_sub_dag
.map(|s| (s.sub_dag_index, s.reputation_score, Some(s.leader)))
.unwrap_or_default();

Self {
last_committed_round,
last_committed: recover_last_committed,
last_consensus_reputation_score,
latest_sub_dag_index,
last_committed_leader,
dag,
metrics,
}
Expand Down Expand Up @@ -246,11 +260,11 @@ where
) -> JoinHandle<()> {
// The consensus state (everything else is immutable).
let recovered_last_committed = store.read_last_committed();
let latest_sub_dag_index = store.get_latest_sub_dag_index();
let latest_sub_dag = store.get_latest_sub_dag();
let state = ConsensusState::new_from_store(
metrics.clone(),
recovered_last_committed,
latest_sub_dag_index,
latest_sub_dag,
cert_store,
);
tx_consensus_round_updates
Expand Down
Loading

0 comments on commit 2ddd42d

Please sign in to comment.