Skip to content

Commit

Permalink
[narwhal] restore the LeaderSchedule from storage & commit recursivel…
Browse files Browse the repository at this point in the history
…y correctly (MystenLabs#12540)

## Description 

This PR is doing the following two:
* It restores the `LeaderSchedule` (when crash/recovery) from storage to
make sure that the right LeaderSwapTable will be used (the last "final"
committed one) so the right schedule will be used from now on.
* It modifies the recursive commit path to ensure that when a commit
happens in the middle that changes the schedule , all the future commits
will use the updated schedule and the new leaders will be considered.

TODO:
- [ ] add more comprehensive testing for the new leader schedule
algorithm in consensus to cover happy and unhappy scenarios
- [ ] polish things a bit more
- [ ] stress tests in dev environments

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Aug 4, 2023
1 parent 63513d2 commit 52a1867
Show file tree
Hide file tree
Showing 11 changed files with 850 additions and 114 deletions.
5 changes: 5 additions & 0 deletions crates/sui-protocol-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1403,9 +1403,11 @@ impl ProtocolConfig {
pub fn set_zklogin_auth(&mut self, val: bool) {
self.feature_flags.zklogin_auth = val
}

pub fn set_max_tx_gas_for_testing(&mut self, max_tx_gas: u64) {
self.max_tx_gas = Some(max_tx_gas)
}

pub fn set_execution_version_for_testing(&mut self, version: u64) {
self.execution_version = Some(version)
}
Expand All @@ -1416,6 +1418,9 @@ impl ProtocolConfig {
pub fn set_simplified_unwrap_then_delete(&mut self, val: bool) {
self.feature_flags.simplified_unwrap_then_delete = val
}
pub fn set_narwhal_new_leader_election_schedule(&mut self, val: bool) {
self.feature_flags.narwhal_new_leader_election_schedule = val;
}
}

type OverrideFn = dyn Fn(ProtocolVersion, ProtocolConfig) -> ProtocolConfig + Send;
Expand Down
146 changes: 99 additions & 47 deletions narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
};
use config::{Committee, Stake};
use fastcrypto::hash::Hash;
use std::collections::VecDeque;
use std::sync::Arc;
use storage::ConsensusStore;
use sui_protocol_config::ProtocolConfig;
Expand Down Expand Up @@ -155,21 +156,90 @@ impl Bullshark {
return Ok((Outcome::LeaderBelowCommitRound, Vec::new()));
}

let mut committed_sub_dags = Vec::new();
let outcome = loop {
let (outcome, committed) = self.commit_leader(leader_round, state)?;

// always extend the returned sub dags
committed_sub_dags.extend(committed);

// break the loop and return the result as long as there is no schedule change.
// We want to retry if there is a schedule change.
if outcome != Outcome::ScheduleChanged {
break outcome;
}
};

// If we have no sub dag to commit then we simply return the outcome directly.
// Otherwise we let the rest of the method run.
if committed_sub_dags.is_empty() {
return Ok((outcome, committed_sub_dags));
}

// record the last time we got a successful leader election
let elapsed = self.last_successful_leader_election_timestamp.elapsed();

self.metrics
.commit_rounds_latency
.observe(elapsed.as_secs_f64());

self.last_successful_leader_election_timestamp = Instant::now();

// The total leader_commits are expected to grow the same amount on validators,
// but strong vs weak counts are not expected to be the same across validators.
self.metrics
.leader_commits
.with_label_values(&["strong"])
.inc();
self.metrics
.leader_commits
.with_label_values(&["weak"])
.inc_by(committed_sub_dags.len() as u64 - 1);

// Log the latest committed round of every authority (for debug).
// Performance note: if tracing at the debug log level is disabled, this is cheap, see
// https://github.com/tokio-rs/tracing/pull/326
for (name, round) in &state.last_committed {
debug!("Latest commit of {}: Round {}", name, round);
}

let total_committed_certificates: u64 = committed_sub_dags
.iter()
.map(|sub_dag| sub_dag.certificates.len() as u64)
.sum();

self.metrics
.committed_certificates
.report(total_committed_certificates);

Ok((Outcome::Commit, committed_sub_dags))
}

/// Commits the leader of round `leader_round`. It is also recursively committing any earlier
/// leader that hasn't been committed, assuming that's possible.
/// If the schedule has changed due to a commit and there are more leaders to commit, then this
/// method will return the enum `ScheduleChanged` so the caller will know to retry for the uncommitted
/// leaders with the updated schedule now.
fn commit_leader(
&mut self,
leader_round: Round,
state: &mut ConsensusState,
) -> Result<(Outcome, Vec<CommittedSubDag>), ConsensusError> {
let leader = match self
.leader_schedule
.leader_certificate(leader_round, &state.dag)
{
(_leader_authority, Some(certificate)) => certificate,
(_leader_authority, None) => {
// leader has not been found - we don't have any certificate
return Ok((Outcome::LeaderNotFound, Vec::new()));
return Ok((Outcome::LeaderNotFound, vec![]));
}
};

// Check if the leader has f+1 support from its children (ie. round r+1).
// Check if the leader has f+1 support from its children (ie. leader_round+1).
let stake: Stake = state
.dag
.get(&round)
.get(&(leader_round + 1))
.expect("We should have the whole history by now")
.values()
.filter(|(_, x)| x.header().parents().contains(&leader.digest()))
Expand All @@ -181,15 +251,16 @@ impl Bullshark {
// a leader block means committing all its dependencies.
if stake < self.committee.validity_threshold() {
debug!("Leader {:?} does not have enough support", leader);
return Ok((Outcome::NotEnoughSupportForLeader, Vec::new()));
return Ok((Outcome::NotEnoughSupportForLeader, vec![]));
}

// Get an ordered list of past leaders that are linked to the current leader.
debug!("Leader {:?} has enough support", leader);

let mut committed_sub_dags = Vec::new();
let mut total_committed_certificates = 0;
let mut leaders_to_commit = self.order_leaders(leader, state);

for leader in self.order_leaders(leader, state).iter().rev() {
while let Some(leader) = leaders_to_commit.pop_front() {
let sub_dag_index = state.next_sub_dag_index();
let _span = error_span!("bullshark_process_sub_dag", sub_dag_index);

Expand All @@ -199,7 +270,7 @@ impl Bullshark {
let mut sequence = Vec::new();

// Starting from the oldest leader, flatten the sub-dag referenced by the leader.
for x in utils::order_dag(leader, state) {
for x in utils::order_dag(&leader, state) {
// Update and clean up internal state.
state.update(&x);

Expand All @@ -211,8 +282,6 @@ impl Bullshark {
}
debug!(min_round, "Subdag has {} certificates", sequence.len());

total_committed_certificates += sequence.len();

// We resolve the reputation score that should be stored alongside with this sub dag.
let reputation_score = self.resolve_reputation_score(state, &sequence, sub_dag_index);

Expand All @@ -234,50 +303,27 @@ impl Bullshark {
committed_sub_dags.push(sub_dag);

// If the leader schedule has been updated, then we'll need to recalculate any upcoming
// leaders for the rest of the recursive commits.
// leaders for the rest of the recursive commits. We do that by repeating the leader
// election for the round that triggered the original commit
if self.update_leader_schedule(leader.round(), &reputation_score) {
// TODO: a new schedule has been produced, which means that we need to recalculate leaders
// to ensure that the new ones are considered and update the commit path.
// return that schedule has changed only when there are more leaders to commit until,
// the `leader_round`, otherwise we have committed everything we could and practically
// the leader of `leader_round` is the one that changed the schedule.
if !leaders_to_commit.is_empty() {
return Ok((Outcome::ScheduleChanged, committed_sub_dags));
}
}
}

// record the last time we got a successful leader election
let elapsed = self.last_successful_leader_election_timestamp.elapsed();

self.metrics
.commit_rounds_latency
.observe(elapsed.as_secs_f64());

self.last_successful_leader_election_timestamp = Instant::now();

// The total leader_commits are expected to grow the same amount on validators,
// but strong vs weak counts are not expected to be the same across validators.
self.metrics
.leader_commits
.with_label_values(&["strong"])
.inc();
self.metrics
.leader_commits
.with_label_values(&["weak"])
.inc_by(committed_sub_dags.len() as u64 - 1);

// Log the latest committed round of every authority (for debug).
// Performance note: if tracing at the debug log level is disabled, this is cheap, see
// https://github.com/tokio-rs/tracing/pull/326
for (name, round) in &state.last_committed {
debug!("Latest commit of {}: Round {}", name, round);
}

self.metrics
.committed_certificates
.report(total_committed_certificates as u64);

Ok((Outcome::Commit, committed_sub_dags))
}

/// Order the past leaders that we didn't already commit.
pub fn order_leaders(&self, leader: &Certificate, state: &ConsensusState) -> Vec<Certificate> {
let mut to_commit = vec![leader.clone()];
/// Order the past leaders that we didn't already commit. It orders the leaders from the one
/// of the older (smaller) round to the newest round.
fn order_leaders(&self, leader: &Certificate, state: &ConsensusState) -> VecDeque<Certificate> {
let mut to_commit = VecDeque::new();
to_commit.push_front(leader.clone());

let mut leader = leader;
assert_eq!(leader.round() % 2, 0);
for r in (state.last_round.committed_round + 2..=leader.round() - 2)
Expand All @@ -300,7 +346,9 @@ impl Bullshark {

// Check whether there is a path between the last two leaders.
if self.linked(leader, prev_leader, &state.dag) {
to_commit.push(prev_leader.clone());
// always add on the front so in the end we create a list with the leaders ordered
// from the lowest to the highest round.
to_commit.push_front(prev_leader.clone());
leader = prev_leader;
} else {
self.metrics
Expand Down Expand Up @@ -365,6 +413,10 @@ impl Bullshark {
reputation_scores,
));

self.metrics
.num_of_bad_nodes
.set(self.leader_schedule.num_of_bad_nodes() as i64);

return true;
}
false
Expand Down
Loading

0 comments on commit 52a1867

Please sign in to comment.