Skip to content

Commit

Permalink
[Consensus] Garbage Collection - 1 (#19315)
Browse files Browse the repository at this point in the history
## Description 

This is the fist part that implements Garbage Collection for consensus.

Compared to Narwhal/Bullshark where the GC round was calculated and
driven by the consensus component it self, here the single threaded
nature of the system and the fact that we do have DagState - which is
shared amongst all the components - allow us to base the gc round
calculations purely on the latest committed round as derived from
DagState.

On this PR:
* `Linearizer` has been refactored to respect the `gc_round`. When
linearizer tries to commit leader `R` , it attempts to commit everything
up to the `gc_round` that has been set from leader `R-1` (as already
discussed this is the desired approach).
* `DagState` is calculating the current `gc_round` based on the latest
commit.
* `gc_depth` has been added as a protocol config variable
* Basic testing has been added to `Linearizer`

Next steps:

- [ ] BlockManager to respect the `gc_round` when accepting blocks and
trigger clean ups for new gc rounds
- [ ] Skip blocks that are received which are `<= gc_round`
- [ ] Not propose ancestors that are `<= gc_round`
- [ ] Subscriber to ask for blocks from `gc_round` when
`last_fetched_round < gc_round` for a peer to prevent us from fetching
unnecessary blocks
- [ ] Implement new timestamp approach so ancestor verification is not
needed
- [ ] Re-propose GC'ed transactions (probably not all of them)
- [ ] Harden testing for GC & edge cases

## Test plan 

CI/PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
akichidis authored Sep 20, 2024
1 parent 35dacad commit 42aa935
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 0 deletions.
19 changes: 19 additions & 0 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,25 @@ impl DagState {
self.last_committed_rounds.clone()
}

/// The GC round is the highest round that blocks of equal or lower round are considered obsolete and no longer possible to be committed.
/// There is no meaning accepting any blocks with round <= gc_round. The Garbage Collection (GC) round is calculated based on the latest
/// committed leader round. When GC is disabled that will return the genesis round.
pub(crate) fn gc_round(&self) -> Round {
let gc_depth = self.context.protocol_config.gc_depth();
if gc_depth > 0 {
// GC is enabled, only then calculate the diff
self.last_commit_round().saturating_sub(gc_depth)
} else {
// Otherwise just return genesis round. That also acts as a safety mechanism so we never attempt to truncate anything
// even accidentally.
GENESIS_ROUND
}
}

pub(crate) fn gc_enabled(&self) -> bool {
self.context.protocol_config.gc_depth() > 0
}

/// After each flush, DagState becomes persisted in storage and it expected to recover
/// all internal states from storage after restarts.
pub(crate) fn flush(&mut self) {
Expand Down
133 changes: 133 additions & 0 deletions consensus/core/src/linearizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
commit::{sort_sub_dag_blocks, Commit, CommittedSubDag, TrustedCommit},
dag_state::DagState,
leader_schedule::LeaderSchedule,
Round,
};

/// Expand a committed sequence of leader into a sequence of sub-dags.
Expand Down Expand Up @@ -45,6 +46,12 @@ impl Linearizer {
let last_commit_digest = dag_state.last_commit_digest();
let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms();
let last_committed_rounds = dag_state.last_committed_rounds();
let gc_enabled = dag_state.gc_enabled();
// The GC round here is calculated based on the last committed round of the leader block. The algorithm will attempt to
// commit blocks up to this GC round. Once this commit has been processed and written to DagState, then gc round will update
// and on the processing of the next commit we'll have it already updated, so no need to do any gc_round recalculations here.
// We just use whatever is currently in DagState.
let gc_round: Round = dag_state.gc_round();

let mut to_commit = Vec::new();
let mut committed = HashSet::new();
Expand All @@ -65,9 +72,16 @@ impl Linearizer {
.filter(|ancestor| {
// We skip the block if we already committed it or we reached a
// round that we already committed.
// TODO: for Fast Path we need to ammend the recursion rule here and allow us to commit blocks all the way up to the `gc_round`.
// Some additional work will be needed to make sure that we keep the uncommitted blocks up to the `gc_round` across commits.
!committed.contains(ancestor)
&& last_committed_rounds[ancestor.author] < ancestor.round
})
.filter(|ancestor| {
// Keep the block if GC is not enabled or it is enabled and the block is above the gc_round. We do this
// to stop the recursion early and avoid going to deep when it's unnecessary.
!gc_enabled || ancestor.round > gc_round
})
.collect::<Vec<_>>(),
)
.into_iter()
Expand All @@ -84,6 +98,12 @@ impl Linearizer {

drop(dag_state);

// The above code should have not yielded any blocks that are <= gc_round, but just to make sure that we'll never
// commit anything that should be garbage collected we attempt to prune here as well.
if gc_enabled {
assert!(to_commit.iter().all(|block| block.round() > gc_round), "No blocks <= {gc_round} should be committed. Commit index {}, leader round {}, blocks {to_commit:?}.", last_commit_index, leader_block_ref);
}

// Sort the blocks of the sub-dag blocks
sort_sub_dag_blocks(&mut to_commit);

Expand Down Expand Up @@ -168,13 +188,16 @@ impl Linearizer {

#[cfg(test)]
mod tests {
use rstest::rstest;

use super::*;
use crate::{
commit::{CommitAPI as _, CommitDigest, DEFAULT_WAVE_LENGTH},
context::Context,
leader_schedule::{LeaderSchedule, LeaderSwapTable},
storage::mem_store::MemStore,
test_dag_builder::DagBuilder,
test_dag_parser::parse_dag,
CommitIndex,
};

Expand Down Expand Up @@ -467,4 +490,114 @@ mod tests {
assert!(block.round() <= expected_second_commit.leader().round);
}
}

/// This test will run the linearizer with GC disabled (gc_depth = 0) and gc enabled (gc_depth = 3) and make
/// sure that for the exact same DAG the linearizer will commit different blocks according to the rules.
#[rstest]
#[tokio::test]
async fn test_handle_commit_with_gc_simple(#[values(0, 3)] gc_depth: u32) {
telemetry_subscribers::init_for_testing();

let num_authorities = 4;
let (mut context, _keys) = Context::new_for_test(num_authorities);
context.protocol_config.set_gc_depth_for_testing(gc_depth);
let context = Arc::new(context);
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
Arc::new(MemStore::new()),
)));
let leader_schedule = Arc::new(LeaderSchedule::new(
context.clone(),
LeaderSwapTable::default(),
));
let mut linearizer = Linearizer::new(dag_state.clone(), leader_schedule);

// Authorities of index 0->2 will always creates blocks that see each other, but until round 5 they won't see the blocks of authority 3.
// For authority 3 we create blocks that connect to all the other authorities.
// On round 5 we finally make the other authorities see the blocks of authority 3.
// Practically we "simulate" here a long chain created by authority 3 that is visible in round 5, but due to GC blocks of only round >=2 will
// be committed, when GC is enabled. When GC is disabled all blocks will be committed for rounds >= 1.
let dag_str = "DAG {
Round 0 : { 4 },
Round 1 : { * },
Round 2 : {
A -> [-D1],
B -> [-D1],
C -> [-D1],
D -> [*],
},
Round 3 : {
A -> [-D2],
B -> [-D2],
C -> [-D2],
},
Round 4 : {
A -> [-D3],
B -> [-D3],
C -> [-D3],
D -> [A3, B3, C3, D2],
},
Round 5 : { * },
}";

let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
dag_builder.print();
dag_builder.persist_all_blocks(dag_state.clone());

let leaders = dag_builder
.leader_blocks(1..=6)
.into_iter()
.flatten()
.collect::<Vec<_>>();

let commits = linearizer.handle_commit(leaders.clone());
for (idx, subdag) in commits.into_iter().enumerate() {
tracing::info!("{subdag:?}");
assert_eq!(subdag.leader, leaders[idx].reference());
assert_eq!(subdag.timestamp_ms, leaders[idx].timestamp_ms());
if idx == 0 {
// First subdag includes the leader block only
assert_eq!(subdag.blocks.len(), 1);
} else if idx == 1 {
assert_eq!(subdag.blocks.len(), 3);
} else if idx == 2 {
// We commit:
// * 1 block on round 4, the leader block
// * 3 blocks on round 3, as no commit happened on round 3 since the leader was missing
// * 2 blocks on round 2, again as no commit happened on round 3, we commit the "sub dag" of leader of round 3, which will be another 2 blocks
assert_eq!(subdag.blocks.len(), 6);
} else {
// GC is enabled, so we expect to see only blocks of round >= 2
if gc_depth > 0 {
// Now it's going to be the first time that a leader will see the blocks of authority 3 and will attempt to commit
// the long chain. However, due to GC it will only commit blocks of round > 1. That's because it will commit blocks
// up to previous leader's round (round = 4) minus the gc_depth = 3, so that will be gc_round = 4 - 3 = 1. So we expect
// to see on the sub dag committed blocks of round >= 2.
assert_eq!(subdag.blocks.len(), 5);

assert!(
subdag.blocks.iter().all(|block| block.round() >= 2),
"Found blocks that are of round < 2."
);

// Also ensure that gc_round has advanced with the latest committed leader
assert_eq!(dag_state.read().gc_round(), subdag.leader.round - gc_depth);
} else {
// GC is disabled, so we expect to see all blocks of round >= 1
assert_eq!(subdag.blocks.len(), 6);
assert!(
subdag.blocks.iter().all(|block| block.round() >= 1),
"Found blocks that are of round < 1."
);

// GC round should never have moved
assert_eq!(dag_state.read().gc_round(), 0);
}
}
for block in subdag.blocks.iter() {
assert!(block.round() <= leaders[idx].round());
}
assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
}
}
}
1 change: 1 addition & 0 deletions crates/sui-open-rpc/spec/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@
"config_read_setting_impl_cost_base": null,
"config_read_setting_impl_cost_per_byte": null,
"consensus_bad_nodes_stake_threshold": null,
"consensus_gc_depth": null,
"consensus_max_num_transactions_in_block": null,
"consensus_max_transaction_size_bytes": null,
"consensus_max_transactions_in_block_bytes": null,
Expand Down
13 changes: 13 additions & 0 deletions crates/sui-protocol-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,10 @@ pub struct ProtocolConfig {
/// This config plays the same role as `max_accumulated_txn_cost_per_object_in_narwhal_commit`
/// but for mysticeti commits due to that mysticeti has higher commit rate.
max_accumulated_txn_cost_per_object_in_mysticeti_commit: Option<u64>,

/// Configures the garbage collection depth for consensus. When is unset or `0` then the garbage collection
/// is disabled.
consensus_gc_depth: Option<u32>,
}

// feature flags
Expand Down Expand Up @@ -1596,6 +1600,9 @@ impl ProtocolConfig {
pub fn validate_identifier_inputs(&self) -> bool {
self.feature_flags.validate_identifier_inputs
}
pub fn gc_depth(&self) -> u32 {
self.consensus_gc_depth.unwrap_or(0)
}
}

#[cfg(not(msim))]
Expand Down Expand Up @@ -2109,6 +2116,8 @@ impl ProtocolConfig {
bridge_should_try_to_finalize_committee: None,

max_accumulated_txn_cost_per_object_in_mysticeti_commit: None,

consensus_gc_depth: None,
// When adding a new constant, set it to None in the earliest version, like this:
// new_constant: None,
};
Expand Down Expand Up @@ -2937,6 +2946,10 @@ impl ProtocolConfig {
pub fn set_consensus_round_prober_for_testing(&mut self, val: bool) {
self.feature_flags.consensus_round_prober = val;
}

pub fn set_gc_depth_for_testing(&mut self, val: u32) {
self.consensus_gc_depth = Some(val);
}
}

type OverrideFn = dyn Fn(ProtocolVersion, ProtocolConfig) -> ProtocolConfig + Send;
Expand Down

0 comments on commit 42aa935

Please sign in to comment.