From 265c8cb43256553f091e425199f246b274f8e57c Mon Sep 17 00:00:00 2001
From: Zekun Li
Date: Thu, 6 Jul 2023 12:58:36 -0700
Subject: [PATCH] [dag] commit rule draft 2/n
---
consensus/src/dag/commit_rule.rs | 102 ------------------
consensus/src/dag/dag_store.rs | 108 ++++++++++---------
consensus/src/dag/mod.rs | 2 +-
consensus/src/dag/order_rule.rs | 137 ++++++++++++++++++++++++
consensus/src/dag/reliable_broadcast.rs | 2 +-
5 files changed, 197 insertions(+), 154 deletions(-)
delete mode 100644 consensus/src/dag/commit_rule.rs
create mode 100644 consensus/src/dag/order_rule.rs
diff --git a/consensus/src/dag/commit_rule.rs b/consensus/src/dag/commit_rule.rs
deleted file mode 100644
index 978f52291a1b9..0000000000000
--- a/consensus/src/dag/commit_rule.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright © Aptos Foundation
-// SPDX-License-Identifier: Apache-2.0
-
-use crate::{
- dag::{anchor_election::AnchorElection, dag_store::Dag, CertifiedNode},
- experimental::buffer_manager::OrderedBlocks,
-};
-use aptos_consensus_types::common::Round;
-use aptos_crypto::HashValue;
-use aptos_infallible::RwLock;
-use aptos_types::{epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures};
-use futures_channel::mpsc::UnboundedSender;
-use std::sync::Arc;
-
-pub struct CommitRule {
- epoch_state: Arc,
- ordered_block_id: HashValue,
- lowest_unordered_round: Round,
- dag: Arc>,
- anchor_election: Box,
- commit_sender: UnboundedSender,
-}
-
-impl CommitRule {
- pub fn new(
- epoch_state: Arc,
- latest_ledger_info: LedgerInfoWithSignatures,
- dag: Arc>,
- anchor_election: Box,
- commit_sender: UnboundedSender,
- ) -> Self {
- // TODO: we need to initialize the anchor election based on the dag
- Self {
- epoch_state,
- ordered_block_id: latest_ledger_info.commit_info().id(),
- lowest_unordered_round: latest_ledger_info.commit_info().round() + 1,
- dag,
- anchor_election,
- commit_sender,
- }
- }
-
- pub fn new_node(&mut self, node: &CertifiedNode) {
- let round = node.round();
- while self.lowest_unordered_round <= round {
- if let Some(direct_anchor) = self.find_first_anchor_with_enough_votes(round) {
- let commit_anchor = self.find_first_anchor_to_commit(direct_anchor);
- self.finalize_order(commit_anchor);
- } else {
- break;
- }
- }
- }
-
- pub fn find_first_anchor_with_enough_votes(
- &self,
- target_round: Round,
- ) -> Option> {
- let dag_reader = self.dag.read();
- let mut current_round = self.lowest_unordered_round;
- while current_round < target_round {
- let anchor_author = self.anchor_election.get_anchor(current_round);
- // I "think" it's impossible to get ordered/committed node here but to double check
- if let Some(anchor_node) =
- dag_reader.get_node_by_round_author(current_round, &anchor_author)
- {
- // f+1 or 2f+1?
- if dag_reader
- .check_votes_for_node(anchor_node.metadata(), &self.epoch_state.verifier)
- {
- return Some(anchor_node);
- }
- }
- current_round += 2;
- }
- None
- }
-
- pub fn find_first_anchor_to_commit(
- &mut self,
- current_anchor: Arc,
- ) -> Arc {
- let dag_reader = self.dag.read();
- let anchor_round = current_anchor.round();
- let first_anchor = dag_reader
- .reachable(¤t_anchor)
- .filter(|node| node.round() >= self.lowest_unordered_round)
- // the same parity of the current anchor round
- .filter(|node| (node.round() ^ anchor_round) & 1 == 0)
- // anchor node, we can cache the election result per round
- .filter(|node| *node.author() == self.anchor_election.get_anchor(node.round()))
- .last()
- .unwrap();
- self.lowest_unordered_round = first_anchor.round() + 1;
- first_anchor.clone()
- }
-
- pub fn finalize_order(&mut self, anchor: Arc) {
- let dag_writer = self.dag.write();
- let _commit_nodes: Vec<_> = dag_writer.reachable(&anchor).collect();
- }
-}
diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs
index 67761101d7b3b..4fa8a20f12e73 100644
--- a/consensus/src/dag/dag_store.rs
+++ b/consensus/src/dag/dag_store.rs
@@ -11,7 +11,7 @@ use aptos_crypto::HashValue;
use aptos_logger::error;
use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
use std::{
- collections::{BTreeMap, HashMap},
+ collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};
@@ -30,6 +30,11 @@ impl NodeStatus {
| NodeStatus::Committed(node) => node,
}
}
+
+ pub fn mark_as_ordered(&mut self) {
+ assert!(matches!(self, NodeStatus::Unordered(_)));
+ *self = NodeStatus::Ordered(self.as_node().clone());
+ }
}
/// Data structure that stores the DAG representation, it maintains round based index.
@@ -149,9 +154,9 @@ impl Dag {
&self,
round: Round,
author: &Author,
- ) -> Option> {
+ ) -> Option<&Arc> {
self.get_node_ref(round, author)
- .map(|node_status| node_status.as_node().clone())
+ .map(|node_status| node_status.as_node())
}
// TODO: I think we can cache votes in the NodeStatus::Unordered
@@ -161,7 +166,7 @@ impl Dag {
validator_verifier: &ValidatorVerifier,
) -> bool {
self.get_round_iter(metadata.round() + 1)
- .and_then(|next_round_iter| {
+ .map(|next_round_iter| {
let votes = next_round_iter
.filter(|node_status| {
node_status
@@ -171,16 +176,59 @@ impl Dag {
.any(|cert| cert.metadata() == metadata)
})
.map(|node_status| node_status.as_node().author());
- Some(validator_verifier.check_voting_power(votes).is_ok())
+ validator_verifier.check_voting_power(votes).is_ok()
})
.unwrap_or(false)
}
- pub fn reachable<'a>(
- &'a self,
- from: &'a Arc,
- ) -> impl Iterator- > {
- ReachableIterator::new(from, self)
+ fn reachable_filter(start: HashValue) -> impl FnMut(&Arc) -> bool {
+ let mut reachable = HashSet::from([start]);
+ move |node| {
+ if reachable.contains(&node.digest()) {
+ for parent in node.parents() {
+ reachable.insert(*parent.metadata().digest());
+ }
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ pub fn reachable_mut(
+ &mut self,
+ from: &Arc,
+ until: Option,
+ ) -> impl Iterator
- {
+ let until = until.unwrap_or(self.lowest_round());
+ let mut reachable_filter = Self::reachable_filter(from.digest());
+ self.nodes_by_round
+ .range_mut(until..=from.round())
+ .rev()
+ .flat_map(|(_, round_ref)| round_ref.iter_mut())
+ .flatten()
+ .filter(move |node_status| {
+ matches!(node_status, NodeStatus::Unordered(_))
+ && reachable_filter(node_status.as_node())
+ })
+ }
+
+ pub fn reachable(
+ &self,
+ from: &Arc,
+ until: Option,
+ ) -> impl Iterator
- {
+ let until = until.unwrap_or(self.lowest_round());
+ let mut reachable_filter = Self::reachable_filter(from.digest());
+ self.nodes_by_round
+ .range(until..=from.round())
+ .rev()
+ .flat_map(|(_, round_ref)| round_ref.iter())
+ .flatten()
+ .filter(move |node_status| {
+ matches!(node_status, NodeStatus::Unordered(_))
+ && reachable_filter(node_status.as_node())
+ })
}
pub fn get_strong_links_for_round(
@@ -210,43 +258,3 @@ impl Dag {
todo!();
}
}
-
-pub struct ReachableIterator<'a> {
- current: Vec<&'a Arc>,
- next: HashMap,
- dag: &'a Dag,
-}
-
-impl<'a> ReachableIterator<'a> {
- fn new(from: &'a Arc, dag: &'a Dag) -> Self {
- Self {
- current: vec![from],
- next: HashMap::new(),
- dag,
- }
- }
-}
-
-impl<'a> Iterator for ReachableIterator<'a> {
- type Item = &'a Arc;
-
- fn next(&mut self) -> Option {
- if self.current.is_empty() {
- for (_, metadata) in self.next.drain() {
- match self.dag.get_node_ref_by_metadata(&metadata) {
- Some(NodeStatus::Unordered(node)) => self.current.push(node),
- _ => (),
- }
- }
- }
- if let Some(node) = self.current.pop() {
- for parent in node.parents() {
- self.next
- .insert(*parent.metadata().digest(), parent.metadata().clone());
- }
- Some(node)
- } else {
- None
- }
- }
-}
diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs
index cc0cd8bc58a99..fbfda90fbc350 100644
--- a/consensus/src/dag/mod.rs
+++ b/consensus/src/dag/mod.rs
@@ -3,12 +3,12 @@
#![allow(dead_code)]
mod anchor_election;
-mod commit_rule;
mod dag_driver;
mod dag_fetcher;
mod dag_handler;
mod dag_network;
mod dag_store;
+mod order_rule;
mod reliable_broadcast;
mod storage;
#[cfg(test)]
diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs
new file mode 100644
index 0000000000000..1585c6cf7370d
--- /dev/null
+++ b/consensus/src/dag/order_rule.rs
@@ -0,0 +1,137 @@
+// Copyright © Aptos Foundation
+// SPDX-License-Identifier: Apache-2.0
+
+use crate::{
+ dag::{anchor_election::AnchorElection, dag_store::Dag, types::NodeMetadata, CertifiedNode},
+ experimental::buffer_manager::OrderedBlocks,
+};
+use aptos_consensus_types::common::Round;
+use aptos_crypto::HashValue;
+use aptos_infallible::RwLock;
+use aptos_types::{epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures};
+use futures_channel::mpsc::UnboundedSender;
+use std::sync::Arc;
+
+pub struct OrderRule {
+ epoch_state: Arc,
+ ordered_block_id: HashValue,
+ lowest_unordered_anchor_round: Round,
+ dag: Arc>,
+ anchor_election: Box,
+ ordered_blocks_sender: UnboundedSender,
+}
+
+impl OrderRule {
+ pub fn new(
+ epoch_state: Arc,
+ latest_ledger_info: LedgerInfoWithSignatures,
+ dag: Arc>,
+ anchor_election: Box,
+ ordered_blocks_sender: UnboundedSender,
+ ) -> Self {
+ // TODO: we need to initialize the anchor election based on the dag
+ Self {
+ epoch_state,
+ ordered_block_id: latest_ledger_info.commit_info().id(),
+ lowest_unordered_anchor_round: latest_ledger_info.commit_info().round() + 1,
+ dag,
+ anchor_election,
+ ordered_blocks_sender,
+ }
+ }
+
+ /// Check if two rounds have the same parity
+ fn check_parity(r1: Round, r2: Round) -> bool {
+ (r1 ^ r2) & 1 == 0
+ }
+
+ pub fn process_new_node(&mut self, node: &CertifiedNode) {
+ let round = node.round();
+ // If the node comes from the proposal round in the current instance, it can't trigger any ordering
+ if Self::check_parity(round, self.lowest_unordered_anchor_round) {
+ return;
+ }
+ // This node's votes can trigger an anchor from previous round to be ordered.
+ let mut start_round = round - 1;
+ while start_round <= round {
+ if let Some(direct_anchor) =
+ self.find_first_anchor_with_enough_votes(start_round, round)
+ {
+ let ordered_anchor = self.find_first_anchor_to_order(direct_anchor);
+ self.finalize_order(ordered_anchor);
+ // if there's any anchor being ordered, the loop continues to check if new anchor can be ordered as well.
+ start_round = self.lowest_unordered_anchor_round;
+ } else {
+ break;
+ }
+ }
+ }
+
+ /// From the start round until the target_round, try to find if there's any anchor has enough votes to trigger ordering
+ pub fn find_first_anchor_with_enough_votes(
+ &self,
+ mut start_round: Round,
+ target_round: Round,
+ ) -> Option> {
+ let dag_reader = self.dag.read();
+ while start_round < target_round {
+ let anchor_author = self.anchor_election.get_anchor(start_round);
+ // I "think" it's impossible to get ordered/committed node here but to double check
+ if let Some(anchor_node) =
+ dag_reader.get_node_by_round_author(start_round, &anchor_author)
+ {
+ // f+1 or 2f+1?
+ if dag_reader
+ .check_votes_for_node(anchor_node.metadata(), &self.epoch_state.verifier)
+ {
+ return Some(anchor_node.clone());
+ }
+ }
+ start_round += 2;
+ }
+ None
+ }
+
+ /// Follow an anchor with enough votes to find the first anchor that's recursively reachable by its suffix anchor
+ pub fn find_first_anchor_to_order(
+ &self,
+ mut current_anchor: Arc,
+ ) -> Arc {
+ let dag_reader = self.dag.read();
+ let anchor_round = current_anchor.round();
+ let is_anchor = |metadata: &NodeMetadata| -> bool {
+ Self::check_parity(metadata.round(), anchor_round)
+ && *metadata.author() == self.anchor_election.get_anchor(metadata.round())
+ };
+ while let Some(prev_anchor) = dag_reader
+ .reachable(¤t_anchor, Some(self.lowest_unordered_anchor_round))
+ .map(|node_status| node_status.as_node())
+ .find(|node| is_anchor(node.metadata()))
+ {
+ current_anchor = prev_anchor.clone();
+ }
+ current_anchor
+ }
+
+ /// Finalize the ordering with the given anchor node, update anchor election and construct blocks for execution.
+ pub fn finalize_order(&mut self, anchor: Arc) {
+ let _failed_anchors: Vec<_> = (self.lowest_unordered_anchor_round..anchor.round())
+ .step_by(2)
+ .map(|failed_round| self.anchor_election.get_anchor(failed_round))
+ .collect();
+ assert!(Self::check_parity(
+ self.lowest_unordered_anchor_round,
+ anchor.round(),
+ ));
+ self.lowest_unordered_anchor_round = anchor.round() + 1;
+
+ let mut dag_writer = self.dag.write();
+ let _ordered_nodes: Vec<_> = dag_writer
+ .reachable_mut(&anchor, None)
+ .map(|node_status| {
+ node_status.mark_as_ordered();
+ node_status.as_node()
+ })
+ .collect();
+ }
+}
diff --git a/consensus/src/dag/reliable_broadcast.rs b/consensus/src/dag/reliable_broadcast.rs
index 9b56c857dc946..9f23994bab42a 100644
--- a/consensus/src/dag/reliable_broadcast.rs
+++ b/consensus/src/dag/reliable_broadcast.rs
@@ -211,7 +211,7 @@ impl RpcHandler for CertifiedNodeHandler {
let epoch = node.metadata().epoch();
{
let dag_reader = self.dag.read();
- if dag_reader.exists(&node.metadata()) {
+ if dag_reader.exists(node.metadata()) {
return Ok(CertifiedAck::new(node.metadata().epoch()));
}