Skip to content

Commit

Permalink
[consensus_handler] Local in-memory cache for processed transactions (M…
Browse files Browse the repository at this point in the history
…ystenLabs#9363)

We noticed that DB read into consensus_message_processed table takes
significant time, and using simple in-memory cache would speed this up
for duplicate transactions.

The cache is implemented in ConsensusHandler and not on the
PerEpochStore. The drawback is that other use cases would not make use
of that cache, but on the flip side the benefit is that it optimizes for
consensus handler performance:

* It avoids lock conflicts between consensus handler and other threads.
Even more, if/when we change handle_consensus_transaction to non-async,
we could only acquire lock once for entire consensus handler task
duration

* I think down the line we are likely to optimize this even more and
make this in memory cache even more specialized to skip deserialization
for the duplicated transaction.
  • Loading branch information
andll authored Mar 15, 2023
1 parent f91a53e commit 11ea6d6
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
7 changes: 7 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ pub struct AuthorityMetrics {
pub(crate) execution_driver_executed_transactions: IntCounter,

pub(crate) skipped_consensus_txns: IntCounter,
pub(crate) skipped_consensus_txns_cache_hit: IntCounter,

/// Post processing metrics
post_processing_total_events_emitted: IntCounter,
Expand Down Expand Up @@ -371,6 +372,12 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
skipped_consensus_txns_cache_hit: register_int_counter_with_registry!(
"skipped_consensus_txns_cache_hit",
"Total number of consensus transactions skipped because of local cache hit",
registry,
)
.unwrap(),
post_processing_total_events_emitted: register_int_counter_with_registry!(
"post_processing_total_events_emitted",
"Total number of events emitted in post processing",
Expand Down
23 changes: 22 additions & 1 deletion crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use crate::authority::AuthorityMetrics;
use crate::checkpoints::CheckpointService;
use crate::transaction_manager::TransactionManager;
use async_trait::async_trait;
use lru::LruCache;
use mysten_metrics::monitored_scope;
use narwhal_executor::{ExecutionIndices, ExecutionState};
use narwhal_types::ConsensusOutput;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
use std::num::NonZeroUsize;
use std::sync::Arc;
use sui_types::base_types::{AuthorityName, EpochId, TransactionDigest};
use sui_types::messages::{
ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
Expand All @@ -36,8 +39,12 @@ pub struct ConsensusHandler<T> {
// TODO: ConsensusHandler doesn't really share metrics with AuthorityState. We could define
// a new metrics type here if we want to.
metrics: Arc<AuthorityMetrics>,
/// Lru cache to quickly discard transactions processed by consensus
processed_cache: Mutex<LruCache<SequencedConsensusTransactionKey, ()>>,
}

const PROCESSED_CACHE_CAP: usize = 1024 * 1024;

impl<T> ConsensusHandler<T> {
pub fn new(
epoch_store: Arc<AuthorityPerEpochStore>,
Expand All @@ -54,6 +61,9 @@ impl<T> ConsensusHandler<T> {
transaction_manager,
parent_sync_store,
metrics,
processed_cache: Mutex::new(LruCache::new(
NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap(),
)),
}
}
}
Expand Down Expand Up @@ -175,6 +185,17 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {
.inc_by(bytes as u64);

for sequenced_transaction in sequenced_transactions {
// todo if we can make handle_consensus_transaction into sync function,
// we could acquire mutex once for entire loop
if self
.processed_cache
.lock()
.put(sequenced_transaction.key(), ())
.is_some()
{
self.metrics.skipped_consensus_txns_cache_hit.inc();
continue;
}
let verified_transaction = match self.epoch_store.verify_consensus_transaction(
sequenced_transaction,
&self.metrics.skipped_consensus_txns,
Expand Down

0 comments on commit 11ea6d6

Please sign in to comment.