Skip to content

Commit

Permalink
[dag] commit rule content 1/n
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Jul 11, 2023
1 parent 4eb3870 commit 98d5aca
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 23 deletions.
49 changes: 37 additions & 12 deletions consensus/src/dag/commit_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
pub struct CommitRule {
epoch_state: Arc<EpochState>,
ordered_block_id: HashValue,
ordered_round: Round,
lowest_unordered_round: Round,
dag: Arc<RwLock<Dag>>,
anchor_election: Box<dyn AnchorElection>,
commit_sender: UnboundedSender<OrderedBlocks>,
Expand All @@ -33,7 +33,7 @@ impl CommitRule {
Self {
epoch_state,
ordered_block_id: latest_ledger_info.commit_info().id(),
ordered_round: latest_ledger_info.commit_info().round(),
lowest_unordered_round: latest_ledger_info.commit_info().round() + 1,
dag,
anchor_election,
commit_sender,
Expand All @@ -42,11 +42,10 @@ impl CommitRule {

pub fn new_node(&mut self, node: &CertifiedNode) {
let round = node.round();
while self.ordered_round <= round {
while self.lowest_unordered_round <= round {
if let Some(direct_anchor) = self.find_first_anchor_with_enough_votes(round) {
let commit_anchor = self.find_first_anchor_to_commit(direct_anchor);
let commit_nodes = self.order_anchor(commit_anchor);
self.push_to_execution(commit_nodes);
self.finalize_order(commit_anchor);
} else {
break;
}
Expand All @@ -58,20 +57,46 @@ impl CommitRule {
target_round: Round,
) -> Option<Arc<CertifiedNode>> {
let dag_reader = self.dag.read();
let mut current_round = self.ordered_round;
let mut current_round = self.lowest_unordered_round;
while current_round < target_round {
let anchor_author = self.anchor_election.get_anchor(current_round);
// I "think" it's impossible to get ordered/committed node here but to double check
if let Some(anchor_node) =
dag_reader.get_node_by_round_author(current_round, &anchor_author)
{
// f+1 or 2f+1?
if dag_reader
.check_votes_for_node(anchor_node.metadata(), &self.epoch_state.verifier)
{
return Some(anchor_node);
}
}
current_round += 2;
}
None
}

pub fn find_first_anchor_to_commit(&mut self, node: Arc<CertifiedNode>) -> Arc<CertifiedNode> {
node
pub fn find_first_anchor_to_commit(
&mut self,
current_anchor: Arc<CertifiedNode>,
) -> Arc<CertifiedNode> {
let dag_reader = self.dag.read();
let anchor_round = current_anchor.round();
let first_anchor = dag_reader
.reachable(&current_anchor)
.filter(|node| node.round() >= self.lowest_unordered_round)
// the same parity of the current anchor round
.filter(|node| (node.round() ^ anchor_round) & 1 == 0)
// anchor node, we can cache the election result per round
.filter(|node| *node.author() == self.anchor_election.get_anchor(node.round()))
.last()
.unwrap();
self.lowest_unordered_round = first_anchor.round() + 1;
first_anchor.clone()
}

pub fn order_anchor(&mut self, anchor: Arc<CertifiedNode>) -> Vec<Arc<CertifiedNode>> {
vec![anchor]
pub fn finalize_order(&mut self, anchor: Arc<CertifiedNode>) {
let dag_writer = self.dag.write();
let _commit_nodes: Vec<_> = dag_writer.reachable(&anchor).collect();
}

pub fn push_to_execution(&mut self, nodes: Vec<Arc<CertifiedNode>>) {}
}
109 changes: 98 additions & 11 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::dag::{
};
use anyhow::{anyhow, ensure};
use aptos_consensus_types::common::{Author, Round};
use aptos_crypto::HashValue;
use aptos_logger::error;
use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
use std::{
Expand All @@ -15,14 +16,14 @@ use std::{
};

#[derive(Clone)]
enum NodeStatus {
pub enum NodeStatus {
Unordered(Arc<CertifiedNode>),
Ordered(Arc<CertifiedNode>),
Committed(Arc<CertifiedNode>),
}

impl NodeStatus {
fn as_node(&self) -> &Arc<CertifiedNode> {
pub fn as_node(&self) -> &Arc<CertifiedNode> {
match self {
NodeStatus::Unordered(node)
| NodeStatus::Ordered(node)
Expand Down Expand Up @@ -114,7 +115,7 @@ impl Dag {
}

pub fn exists(&self, metadata: &NodeMetadata) -> bool {
self.get_node_ref(metadata).is_some()
self.get_node_ref_by_metadata(metadata).is_some()
}

pub fn all_exists(&self, nodes: &[NodeCertificate]) -> bool {
Expand All @@ -123,33 +124,79 @@ impl Dag {
.all(|certificate| self.exists(certificate.metadata()))
}

fn get_node_ref(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> {
let index = self.author_to_index.get(metadata.author())?;
let round_ref = self.nodes_by_round.get(&metadata.round())?;
fn get_node_ref_by_metadata(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> {
self.get_node_ref(metadata.round(), metadata.author())
}

fn get_node_ref(&self, round: Round, author: &Author) -> Option<&NodeStatus> {
let index = self.author_to_index.get(author)?;
let round_ref = self.nodes_by_round.get(&round)?;
round_ref[*index].as_ref()
}

fn get_round_iter(&self, round: Round) -> Option<impl Iterator<Item = &NodeStatus>> {
self.nodes_by_round
.get(&round)
.map(|round_ref| round_ref.iter().flatten())
}

pub fn get_node(&self, metadata: &NodeMetadata) -> Option<Arc<CertifiedNode>> {
self.get_node_ref(metadata)
self.get_node_ref_by_metadata(metadata)
.map(|node_status| node_status.as_node().clone())
}

pub fn get_node_by_round_author(
&self,
round: Round,
author: &Author,
) -> Option<Arc<CertifiedNode>> {
self.get_node_ref(round, author)
.map(|node_status| node_status.as_node().clone())
}

// TODO: I think we can cache votes in the NodeStatus::Unordered
pub fn check_votes_for_node(
&self,
metadata: &NodeMetadata,
validator_verifier: &ValidatorVerifier,
) -> bool {
self.get_round_iter(metadata.round() + 1)
.and_then(|next_round_iter| {
let votes = next_round_iter
.filter(|node_status| {
node_status
.as_node()
.parents()
.iter()
.any(|cert| cert.metadata() == metadata)
})
.map(|node_status| node_status.as_node().author());
Some(validator_verifier.check_voting_power(votes).is_ok())
})
.unwrap_or(false)
}

pub fn reachable<'a>(
&'a self,
from: &'a Arc<CertifiedNode>,
) -> impl Iterator<Item = &Arc<CertifiedNode>> {
ReachableIterator::new(from, self)
}

pub fn get_strong_links_for_round(
&self,
round: Round,
validator_verifier: &ValidatorVerifier,
) -> Option<Vec<NodeCertificate>> {
let all_nodes_in_round = self.nodes_by_round.get(&round)?.iter().flatten();
if validator_verifier
.check_voting_power(
all_nodes_in_round
.clone()
self.get_round_iter(round)?
.map(|node_status| node_status.as_node().metadata().author()),
)
.is_ok()
{
Some(
all_nodes_in_round
self.get_round_iter(round)?
.map(|node_status| node_status.as_node().certificate())
.collect(),
)
Expand All @@ -163,3 +210,43 @@ impl Dag {
todo!();
}
}

pub struct ReachableIterator<'a> {
current: Vec<&'a Arc<CertifiedNode>>,
next: HashMap<HashValue, NodeMetadata>,
dag: &'a Dag,
}

impl<'a> ReachableIterator<'a> {
fn new(from: &'a Arc<CertifiedNode>, dag: &'a Dag) -> Self {
Self {
current: vec![from],
next: HashMap::new(),
dag,
}
}
}

impl<'a> Iterator for ReachableIterator<'a> {
type Item = &'a Arc<CertifiedNode>;

fn next(&mut self) -> Option<Self::Item> {
if self.current.is_empty() {
for (_, metadata) in self.next.drain() {
match self.dag.get_node_ref_by_metadata(&metadata) {
Some(NodeStatus::Unordered(node)) => self.current.push(node),
_ => (),
}
}
}
if let Some(node) = self.current.pop() {
for parent in node.parents() {
self.next
.insert(*parent.metadata().digest(), parent.metadata().clone());
}
Some(node)
} else {
None
}
}
}

0 comments on commit 98d5aca

Please sign in to comment.