Skip to content

Commit

Permalink
feat: have consensus persist DAG state in DB
Browse files Browse the repository at this point in the history
  • Loading branch information
huitseeker committed Dec 10, 2021
1 parent da1e08c commit 853681f
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 87 deletions.
2 changes: 1 addition & 1 deletion narwhal/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
.DS_Store
.storage_*
.*storage*
.db_test*
.venv/
.idea/*
rust/.idea/*
.consensus_db-*
.db-*

target/
Expand Down
2 changes: 2 additions & 0 deletions narwhal/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ rand = { version = "0.7.3", optional = true}
crypto = { path = "../crypto" }
config = { path = "../config" }
primary = { path = "../primary" }
store = { path = "../store" }

[dev-dependencies]
bincode = "1.3"
criterion = "0.3"
pprof = { version = "0.6", features = ["criterion", "flamegraph"] }
tempfile = "3.2"

[features]
default = ["rand"]
Expand Down
5 changes: 4 additions & 1 deletion narwhal/consensus/benches/process_certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ pub fn process_certificates(c: &mut Criterion) {
let (certificates, _next_parents) = make_optimal_certificates(1, rounds, &genesis, &keys);
let committee = mock_committee(&keys);

let mut state = consensus::State::new(Certificate::genesis(&mock_committee(&keys[..])));
let temp_dir = tempfile::tempdir().expect("Failed to open temporary directory");
let mut state =
consensus::State::new(Certificate::genesis(&mock_committee(&keys[..])), temp_dir)
.unwrap();

let data_size: usize = certificates
.iter()
Expand Down
192 changes: 107 additions & 85 deletions narwhal/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ use primary::{Certificate, Round};
use std::{
cmp::max,
collections::{HashMap, HashSet},
path::Path,
};
use store::{rocks::DBMap, Map};
use tokio::sync::mpsc::{Receiver, Sender};

#[cfg(any(test, feature = "benchmark"))]
#[path = "tests/consensus_tests.rs"]
pub mod consensus_tests;

/// The representation of the DAG in memory.
type Dag = HashMap<Round, HashMap<PublicKey, (Digest, Certificate)>>;
type Dag = DBMap<Round, HashMap<PublicKey, (Digest, Certificate)>>;

/// The state that needs to be persisted for crash-recovery.
pub struct State {
Expand All @@ -36,16 +38,19 @@ pub struct State {
}

impl State {
pub fn new(genesis: Vec<Certificate>) -> Self {
pub fn new<P: AsRef<Path>>(genesis: Vec<Certificate>, db_path: P) -> Self {
let genesis = genesis
.into_iter()
.map(|x| (x.origin(), (x.digest(), x)))
.collect::<HashMap<_, _>>();

let db = Dag::open(db_path, None).unwrap();
db.insert(&0, &genesis).unwrap();

Self {
last_committed_round: 0,
last_committed: genesis.iter().map(|(x, (_, y))| (*x, y.round())).collect(),
dag: [(0, genesis)].iter().cloned().collect(),
dag: db,
}
}

Expand All @@ -60,14 +65,30 @@ impl State {
self.last_committed_round = last_committed_round;

for (name, round) in &self.last_committed {
self.dag.retain(|r, authorities| {
// We purge certificates for `name` prior to its latest commit
if r < round {
// We purge certificates for `name` prior to its latest commit
let new_authorities = self.dag.iter().flat_map(|(ref r, mut authorities)| {
if r + gc_depth >= last_committed_round && r < round {
authorities.retain(|n, _| n != name);
Some((*r, authorities))
} else {
None
}
// We purge all certificates past the gc depth
!authorities.is_empty() && r + gc_depth >= last_committed_round
});
let (empties, refreshed): (Vec<_>, Vec<_>) =
new_authorities.partition(|(_r, a)| a.is_empty());
let remove_empties = self
.dag
.batch()
.delete_batch(empties.into_iter().map(|(k, _)| k))
.unwrap();
let update_refreshed = remove_empties.insert_batch(refreshed.into_iter()).unwrap();

// We purge all certificates past the gc depth
let bound = max(self.last_committed_round, gc_depth + 1);
let gc = update_refreshed
.delete_range(&0, &(bound - gc_depth))
.unwrap();
let _ = gc.write().unwrap();
}
}
}
Expand All @@ -77,6 +98,8 @@ pub struct Consensus {
committee: Committee,
/// The depth of the garbage collector.
gc_depth: Round,
/// The path of the consensus database
store_path: String,

/// Receives new certificates from the primary. The primary should send us new certificates only
/// if it already sent us its whole history.
Expand All @@ -94,6 +117,7 @@ impl Consensus {
pub fn spawn(
committee: Committee,
gc_depth: Round,
store_path: String,
rx_primary: Receiver<Certificate>,
tx_primary: Sender<Certificate>,
tx_output: Sender<Certificate>,
Expand All @@ -102,6 +126,7 @@ impl Consensus {
Self {
committee: committee.clone(),
gc_depth,
store_path,
rx_primary,
tx_primary,
tx_output,
Expand All @@ -114,7 +139,7 @@ impl Consensus {

async fn run(&mut self) {
// The consensus state (everything else is immutable).
let mut state = State::new(self.genesis.clone());
let mut state = State::new(self.genesis.clone(), &self.store_path);

// Listen to incoming certificates.
while let Some(certificate) = self.rx_primary.recv().await {
Expand Down Expand Up @@ -158,11 +183,9 @@ impl Consensus {
let round = certificate.round();

// Add the new certificate to the local storage.
state
.dag
.entry(round)
.or_insert_with(HashMap::new)
.insert(certificate.origin(), (certificate.digest(), certificate));
let mut map = state.dag.get_or_insert(&round, HashMap::new).unwrap();
map.insert(certificate.origin(), (certificate.digest(), certificate));
state.dag.insert(&round, &map).unwrap();

// Try to order the dag to commit. Start from the highest round for which we have at least
// 2f+1 certificates. This is because we need them to reveal the common coin.
Expand All @@ -187,10 +210,10 @@ impl Consensus {
// Check if the leader has f+1 support from its children (ie. round r-1).
let stake: Stake = state
.dag
.get(&(r - 1))
.get(&(r - 1))?
.expect("We should have the whole history by now")
.values()
.filter(|(_, x)| x.header.parents.contains(leader_digest))
.filter(|(_, x)| x.header.parents.contains(&leader_digest))
.map(|(_, x)| committee.stake(&x.origin()))
.sum();

Expand All @@ -205,7 +228,7 @@ impl Consensus {
// Get an ordered list of past leaders that are linked to the current leader.
debug!("Leader {:?} has enough support", leader);
let mut sequence = Vec::new();
for leader in Consensus::order_leaders(committee, leader, state)
for leader in Consensus::order_leaders(committee, &leader, state)
.iter()
.rev()
{
Expand All @@ -231,11 +254,7 @@ impl Consensus {

/// Returns the certificate (and the certificate's digest) originated by the leader of the
/// specified round (if any).
fn leader<'a>(
committee: &Committee,
round: Round,
dag: &'a Dag,
) -> Option<&'a (Digest, Certificate)> {
fn leader(committee: &Committee, round: Round, dag: &Dag) -> Option<(Digest, Certificate)> {
// TODO: We should elect the leader of round r-2 using the common coin revealed at round r.
// At this stage, we are guaranteed to have 2f+1 certificates from round r (which is enough to
// compute the coin). We currently just use round-robin.
Expand All @@ -250,7 +269,9 @@ impl Consensus {
let leader = keys[coin as usize % committee.size()];

// Return its certificate and the certificate's digest.
dag.get(&round).and_then(|x| x.get(&leader))
dag.get(&round)
.expect("Leader from known round has no certificate, argument error")
.and_then(|x| x.get(&leader).cloned())
}

/// Order the past leaders that we didn't already commit.
Expand All @@ -260,7 +281,7 @@ impl Consensus {
state: &State,
) -> Vec<Certificate> {
let mut to_commit = vec![leader.clone()];
let mut leader = leader;
let mut leader = leader.clone();
for r in (state.last_committed_round + 2..leader.round())
.rev()
.step_by(2)
Expand All @@ -272,7 +293,7 @@ impl Consensus {
};

// Check whether there is a path between the last two leaders.
if Consensus::linked(leader, prev_leader, &state.dag) {
if Consensus::linked(&leader, &prev_leader, &state.dag) {
to_commit.push(prev_leader.clone());
leader = prev_leader;
}
Expand All @@ -282,14 +303,16 @@ impl Consensus {

/// Checks if there is a path between two leaders.
fn linked(leader: &Certificate, prev_leader: &Certificate, dag: &Dag) -> bool {
let mut parents = vec![leader];
let mut parents = vec![leader.clone()];
for r in (prev_leader.round()..leader.round()).rev() {
parents = dag
.get(&(r))
.unwrap()
.expect("We should have the whole history by now")
.values()
.filter(|(digest, _)| parents.iter().any(|x| x.header.parents.contains(digest)))
.map(|(_, certificate)| certificate)
.cloned()
.collect();
}
parents.contains(&prev_leader)
Expand All @@ -302,15 +325,16 @@ impl Consensus {
let mut ordered = Vec::new();
let mut already_ordered = HashSet::new();

let mut buffer = vec![leader];
let mut buffer = vec![leader.clone()];
while let Some(x) = buffer.pop() {
debug!("Sequencing {:?}", x);
ordered.push(x.clone());
for parent in &x.header.parents {
let (digest, certificate) = match state
.dag
.get(&(x.round() - 1))
.and_then(|x| x.values().find(|(x, _)| x == parent))
.unwrap()
.and_then(|x| x.values().find(|(x, _)| x == parent).cloned())
{
Some(x) => x,
None => continue, // We already ordered or GC up to here.
Expand Down Expand Up @@ -349,70 +373,68 @@ mod tests {

use super::consensus_tests::*;

fn temp_dir() -> std::path::PathBuf {
tempfile::tempdir()
.expect("Failed to open temporary directory")
.into_path()
}

#[test]
fn state_limits_test() {
let repeats = 10;
for _repeat in 1..repeats {
let gc_depth = 12;
let rounds: Round = rand::thread_rng().gen_range(10, 1000);

// process certificates for rounds, check we don't grow the dag too much
let keys: Vec<_> = keys().into_iter().map(|(x, _)| x).collect();
let genesis = Certificate::genesis(&mock_committee(&keys[..]))
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
let (certificates, _next_parents) =
make_optimal_certificates(1, rounds, &genesis, &keys);
let committee = mock_committee(&keys);

let mut state = State::new(Certificate::genesis(&mock_committee(&keys[..])));
for certificate in certificates {
Consensus::process_certificate(&committee, gc_depth, &mut state, certificate);
}
// with "optimal" certificates (see `make_optimal_certificates`), and a round-robin between leaders,
// we need at most 6 rounds lookbehind: we elect a leader at most at round r-2, and its round is
// preceded by one round of history for each prior leader, which contains their latest commit at least.
//
// -- L1's latest
// -- L2's latest
// -- L3's latest
// -- L4's latest
// -- support level 1 (for L4)
// -- support level 2 (for L4)
//
assert!(state.dag.len() <= 6, "DAG size: {}", state.dag.len());
let gc_depth = 12;
let rounds: Round = rand::thread_rng().gen_range(10, 100);

// process certificates for rounds, check we don't grow the dag too much
let keys: Vec<_> = keys().into_iter().map(|(x, _)| x).collect();

let genesis = Certificate::genesis(&mock_committee(&keys[..]))
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
let (certificates, _next_parents) = make_optimal_certificates(1, rounds, &genesis, &keys);
let committee = mock_committee(&keys);

let mut state = State::new(Certificate::genesis(&mock_committee(&keys[..])), temp_dir());
for certificate in certificates {
Consensus::process_certificate(&committee, gc_depth, &mut state, certificate);
}
// with "optimal" certificates (see `make_optimal_certificates`), and a round-robin between leaders,
// we need at most 6 rounds lookbehind: we elect a leader at most at round r-2, and its round is
// preceded by one round of history for each prior leader, which contains their latest commit at least.
//
// -- L1's latest
// -- L2's latest
// -- L3's latest
// -- L4's latest
// -- support level 1 (for L4)
// -- support level 2 (for L4)
//
let n = state.dag.keys().count();
assert!(n <= 6, "DAG size: {}", n);
}

#[test]
fn imperfect_state_limits_test() {
let repeats = 10;
for _repeat in 1..repeats {
let gc_depth = 12;
let rounds: Round = rand::thread_rng().gen_range(10, 1000);

// process certificates for rounds, check we don't grow the dag too much
let keys: Vec<_> = keys().into_iter().map(|(x, _)| x).collect();
let genesis = Certificate::genesis(&mock_committee(&keys[..]))
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
// TODO: evidence that this test fails when `failure_probability` parameter >= 1/3
let (certificates, _next_parents) =
make_certificates(1, rounds, &genesis, &keys, 0.333);
let committee = mock_committee(&keys);

let mut state = State::new(Certificate::genesis(&mock_committee(&keys[..])));
for certificate in certificates {
Consensus::process_certificate(&committee, gc_depth, &mut state, certificate);
}
// with "less optimal" certificates (see `make_certificates`), we should keep at most gc_depth rounds lookbehind
assert!(
state.dag.len() <= gc_depth as usize,
"DAG size: {}",
state.dag.len()
);
let gc_depth = 12;
let rounds: Round = rand::thread_rng().gen_range(10, 100);

// process certificates for rounds, check we don't grow the dag too much
let keys: Vec<_> = keys().into_iter().map(|(x, _)| x).collect();

let genesis = Certificate::genesis(&mock_committee(&keys[..]))
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
// TODO: evidence that this test fails when `failure_probability` parameter >= 1/3
let (certificates, _next_parents) = make_certificates(1, rounds, &genesis, &keys, 0.333);
let committee = mock_committee(&keys);

let mut state = State::new(Certificate::genesis(&mock_committee(&keys[..])), temp_dir());
for certificate in certificates {
Consensus::process_certificate(&committee, gc_depth, &mut state, certificate);
}
// with "less optimal" certificates (see `make_certificates`), we should keep at most gc_depth rounds lookbehind
let n = state.dag.keys().count();
assert!(n <= gc_depth as usize, "DAG size: {}", n);
}
}
Loading

0 comments on commit 853681f

Please sign in to comment.