Skip to content

Commit

Permalink
[dag] commit rule draft 2/n
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Jul 11, 2023
1 parent 98d5aca commit 265c8cb
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 154 deletions.
102 changes: 0 additions & 102 deletions consensus/src/dag/commit_rule.rs

This file was deleted.

108 changes: 58 additions & 50 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aptos_crypto::HashValue;
use aptos_logger::error;
use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};

Expand All @@ -30,6 +30,11 @@ impl NodeStatus {
| NodeStatus::Committed(node) => node,
}
}

pub fn mark_as_ordered(&mut self) {
assert!(matches!(self, NodeStatus::Unordered(_)));
*self = NodeStatus::Ordered(self.as_node().clone());
}
}

/// Data structure that stores the DAG representation, it maintains round based index.
Expand Down Expand Up @@ -149,9 +154,9 @@ impl Dag {
&self,
round: Round,
author: &Author,
) -> Option<Arc<CertifiedNode>> {
) -> Option<&Arc<CertifiedNode>> {
self.get_node_ref(round, author)
.map(|node_status| node_status.as_node().clone())
.map(|node_status| node_status.as_node())
}

// TODO: I think we can cache votes in the NodeStatus::Unordered
Expand All @@ -161,7 +166,7 @@ impl Dag {
validator_verifier: &ValidatorVerifier,
) -> bool {
self.get_round_iter(metadata.round() + 1)
.and_then(|next_round_iter| {
.map(|next_round_iter| {
let votes = next_round_iter
.filter(|node_status| {
node_status
Expand All @@ -171,16 +176,59 @@ impl Dag {
.any(|cert| cert.metadata() == metadata)
})
.map(|node_status| node_status.as_node().author());
Some(validator_verifier.check_voting_power(votes).is_ok())
validator_verifier.check_voting_power(votes).is_ok()
})
.unwrap_or(false)
}

pub fn reachable<'a>(
&'a self,
from: &'a Arc<CertifiedNode>,
) -> impl Iterator<Item = &Arc<CertifiedNode>> {
ReachableIterator::new(from, self)
fn reachable_filter(start: HashValue) -> impl FnMut(&Arc<CertifiedNode>) -> bool {
let mut reachable = HashSet::from([start]);
move |node| {
if reachable.contains(&node.digest()) {
for parent in node.parents() {
reachable.insert(*parent.metadata().digest());
}
true
} else {
false
}
}
}

pub fn reachable_mut(
&mut self,
from: &Arc<CertifiedNode>,
until: Option<Round>,
) -> impl Iterator<Item = &mut NodeStatus> {
let until = until.unwrap_or(self.lowest_round());
let mut reachable_filter = Self::reachable_filter(from.digest());
self.nodes_by_round
.range_mut(until..=from.round())
.rev()
.flat_map(|(_, round_ref)| round_ref.iter_mut())
.flatten()
.filter(move |node_status| {
matches!(node_status, NodeStatus::Unordered(_))
&& reachable_filter(node_status.as_node())
})
}

pub fn reachable(
&self,
from: &Arc<CertifiedNode>,
until: Option<Round>,
) -> impl Iterator<Item = &NodeStatus> {
let until = until.unwrap_or(self.lowest_round());
let mut reachable_filter = Self::reachable_filter(from.digest());
self.nodes_by_round
.range(until..=from.round())
.rev()
.flat_map(|(_, round_ref)| round_ref.iter())
.flatten()
.filter(move |node_status| {
matches!(node_status, NodeStatus::Unordered(_))
&& reachable_filter(node_status.as_node())
})
}

pub fn get_strong_links_for_round(
Expand Down Expand Up @@ -210,43 +258,3 @@ impl Dag {
todo!();
}
}

pub struct ReachableIterator<'a> {
current: Vec<&'a Arc<CertifiedNode>>,
next: HashMap<HashValue, NodeMetadata>,
dag: &'a Dag,
}

impl<'a> ReachableIterator<'a> {
fn new(from: &'a Arc<CertifiedNode>, dag: &'a Dag) -> Self {
Self {
current: vec![from],
next: HashMap::new(),
dag,
}
}
}

impl<'a> Iterator for ReachableIterator<'a> {
type Item = &'a Arc<CertifiedNode>;

fn next(&mut self) -> Option<Self::Item> {
if self.current.is_empty() {
for (_, metadata) in self.next.drain() {
match self.dag.get_node_ref_by_metadata(&metadata) {
Some(NodeStatus::Unordered(node)) => self.current.push(node),
_ => (),
}
}
}
if let Some(node) = self.current.pop() {
for parent in node.parents() {
self.next
.insert(*parent.metadata().digest(), parent.metadata().clone());
}
Some(node)
} else {
None
}
}
}
2 changes: 1 addition & 1 deletion consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#![allow(dead_code)]

mod anchor_election;
mod commit_rule;
mod dag_driver;
mod dag_fetcher;
mod dag_handler;
mod dag_network;
mod dag_store;
mod order_rule;
mod reliable_broadcast;
mod storage;
#[cfg(test)]
Expand Down
Loading

0 comments on commit 265c8cb

Please sign in to comment.