Skip to content

Commit

Permalink
[Consensus] resubmit GC'ed transactions (#19572)
Browse files Browse the repository at this point in the history
## Description 

Transactions of GC'ed blocks should be retried to remain compliant with
the so far system assumptions. Refactored the consensus side to signal
back the submitter about the status of the block that the submitted
transaction has been included to. Practically two possible outcomes:
1. Sequenced (when the block has been committed and consequently the tx
as well)
2. GarbageCollected (when the tx's block has not been committed and
passed gc_round)

also took the opportunity for a small refactoring around the
`SubmitToConsensus` trait and the `ConsensusClient` as intentions
started diverging.

## Test plan 

CI/PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
akichidis authored Nov 13, 2024
1 parent bb0288b commit ae6275f
Show file tree
Hide file tree
Showing 12 changed files with 641 additions and 114 deletions.
172 changes: 165 additions & 7 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ impl Core {
.with_label_values(&["Core::try_commit"])
.start_timer();

let mut committed_subdags = Vec::new();
let mut committed_sub_dags = Vec::new();
// TODO: Add optimization to abort early without quorum for a round.
loop {
// LeaderSchedule has a limit to how many sequenced leaders can be committed
Expand Down Expand Up @@ -663,10 +663,21 @@ impl Core {
self.block_manager
.try_unsuspend_blocks_for_latest_gc_round();

committed_subdags.extend(subdags);
committed_sub_dags.extend(subdags);
}

Ok(committed_subdags)
// Notify about our own committed blocks
let committed_block_refs = committed_sub_dags
.iter()
.flat_map(|sub_dag| sub_dag.blocks.iter())
.filter_map(|block| {
(block.author() == self.context.own_index).then_some(block.reference())
})
.collect::<Vec<_>>();
self.transaction_consumer
.notify_own_blocks_status(committed_block_refs, self.dag_state.read().gc_round());

Ok(committed_sub_dags)
}

pub(crate) fn get_missing_blocks(&self) -> BTreeSet<BlockRef> {
Expand Down Expand Up @@ -1162,6 +1173,8 @@ mod test {
use std::{collections::BTreeSet, time::Duration};

use consensus_config::{AuthorityIndex, Parameters};
use futures::{stream::FuturesUnordered, StreamExt};
use rstest::rstest;
use sui_protocol_config::ProtocolConfig;
use tokio::time::sleep;

Expand All @@ -1173,7 +1186,8 @@ mod test {
leader_scoring::ReputationScores,
storage::{mem_store::MemStore, Store, WriteBatch},
test_dag_builder::DagBuilder,
transaction::TransactionClient,
test_dag_parser::parse_dag,
transaction::{BlockStatus, TransactionClient},
CommitConsumer, CommitIndex,
};

Expand All @@ -1186,6 +1200,7 @@ mod test {
let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut block_status_subscriptions = FuturesUnordered::new();

// Create test blocks for all the authorities for 4 rounds and populate them in store
let mut last_round_blocks = genesis_blocks(context.clone());
Expand All @@ -1199,6 +1214,13 @@ mod test {
.build(),
);

// If it's round 1, that one will be committed later on, and it's our "own" block, then subscribe to listen for the block status.
if round == 1 && index == context.own_index {
let subscription =
transaction_consumer.subscribe_for_block_status_testing(block.reference());
block_status_subscriptions.push(subscription);
}

this_round_blocks.push(block);
}
all_blocks.extend(this_round_blocks.clone());
Expand Down Expand Up @@ -1239,7 +1261,7 @@ mod test {
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let mut block_receiver = signal_receivers.block_broadcast_receiver();
let mut core = Core::new(
let _core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
Expand Down Expand Up @@ -1270,8 +1292,6 @@ mod test {
assert_eq!(ancestor.round, 4);
}

// Run commit rule.
core.try_commit().ok();
let last_commit = store
.read_last_commit()
.unwrap()
Expand All @@ -1284,6 +1304,12 @@ mod test {
assert_eq!(dag_state.read().last_commit_index(), 2);
let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
assert_eq!(all_stored_commits.len(), 2);

// And ensure that our "own" block 1 sent to TransactionConsumer as notification alongside with gc_round
while let Some(result) = block_status_subscriptions.next().await {
let status = result.unwrap();
assert!(matches!(status, BlockStatus::Sequenced(_)));
}
}

/// Recover Core and continue proposing when having a partial last round which doesn't form a quorum and we haven't
Expand Down Expand Up @@ -1602,6 +1628,138 @@ mod test {
assert_eq!(dag_state.read().last_commit_index(), 0);
}

#[rstest]
#[tokio::test]
async fn test_commit_and_notify_for_block_status(#[values(0, 2)] gc_depth: u32) {
telemetry_subscribers::init_for_testing();
let (mut context, mut key_pairs) = Context::new_for_test(4);

if gc_depth > 0 {
context
.protocol_config
.set_consensus_gc_depth_for_testing(gc_depth);
}

let context = Arc::new(context);

let store = Arc::new(MemStore::new());
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut block_status_subscriptions = FuturesUnordered::new();

let dag_str = "DAG {
Round 0 : { 4 },
Round 1 : { * },
Round 2 : { * },
Round 3 : {
A -> [*],
B -> [-A2],
C -> [-A2],
D -> [-A2],
},
Round 4 : {
B -> [-A3],
C -> [-A3],
D -> [-A3],
},
Round 5 : {
A -> [A3, B4, C4, D4]
B -> [*],
C -> [*],
D -> [*],
},
Round 6 : { * },
Round 7 : { * },
Round 8 : { * },
}";

let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag");
dag_builder.print();

// Subscribe to all created "own" blocks. We know that for our node (A) we'll be able to commit up to round 5.
for block in dag_builder.blocks(1..=5) {
if block.author() == context.own_index {
let subscription =
transaction_consumer.subscribe_for_block_status_testing(block.reference());
block_status_subscriptions.push(subscription);
}
}

// write them in store
store
.write(WriteBatch::default().blocks(dag_builder.blocks(1..=8)))
.expect("Storage error");

// create dag state after all blocks have been written to store
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let block_manager = BlockManager::new(
context.clone(),
dag_state.clone(),
Arc::new(NoopBlockVerifier),
);
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));

let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0);
let commit_observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
store.clone(),
leader_schedule.clone(),
);

// Check no commits have been persisted to dag_state or store.
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);

// Now spin up core
let (signals, signal_receivers) = CoreSignals::new(context.clone());
// Need at least one subscriber to the block broadcast channel.
let _block_receiver = signal_receivers.block_broadcast_receiver();
let _core = Core::new(
context.clone(),
leader_schedule,
transaction_consumer,
block_manager,
true,
commit_observer,
signals,
key_pairs.remove(context.own_index.value()).1,
dag_state.clone(),
false,
);

let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");

assert_eq!(last_commit.index(), 5);

while let Some(result) = block_status_subscriptions.next().await {
let status = result.unwrap();

// If gc is enabled, then we expect some blocks to be garbage collected.
if gc_depth > 0 {
match status {
BlockStatus::Sequenced(block_ref) => {
assert!(block_ref.round == 1 || block_ref.round == 5);
}
BlockStatus::GarbageCollected(block_ref) => {
assert!(block_ref.round == 2 || block_ref.round == 3);
}
}
} else {
// otherwise all of them should be committed
assert!(matches!(status, BlockStatus::Sequenced(_)));
}
}
}

#[tokio::test]
async fn test_core_set_min_propose_round() {
telemetry_subscribers::init_for_testing();
Expand Down
6 changes: 4 additions & 2 deletions consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod test_dag_parser;

/// Exported consensus API.
pub use authority_node::ConsensusAuthority;
pub use block::{BlockAPI, Round, TransactionIndex};
pub use block::{BlockAPI, BlockRef, Round, TransactionIndex};
/// Exported API for testing.
pub use block::{TestBlock, Transaction, VerifiedBlock};
pub use commit::{CommitDigest, CommitIndex, CommitRef, CommittedSubDag};
Expand All @@ -55,4 +55,6 @@ pub use network::{
connection_monitor::{AnemoConnectionMonitor, ConnectionMonitorHandle, ConnectionStatus},
metrics::{MetricsMakeCallbackHandler, NetworkRouteMetrics, QuinnConnectionMetrics},
};
pub use transaction::{ClientError, TransactionClient, TransactionVerifier, ValidationError};
pub use transaction::{
BlockStatus, ClientError, TransactionClient, TransactionVerifier, ValidationError,
};
Loading

0 comments on commit ae6275f

Please sign in to comment.