Skip to content

Commit

Permalink
async batch verification (MystenLabs#9159)
Browse files Browse the repository at this point in the history
stacked on MystenLabs#9132 

Its difficult to give a solid prediction of how this will perform in
production, but here is some data on the raw CPU time savings:

With async batch verification:

$ time RUST_LOG=off SIM_STRESS_TEST_QPS=30
../../target/simulator/deps/simtest-3e38176b1c2a7e11
test_simulated_load_basic

    running 1 test
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 6 filtered
out; finished in 15.82s

RUST_LOG=off SIM_STRESS_TEST_QPS=30 test_simulated_load_basic 14.53s
user 0.97s system 97% cpu 15.851 total
^^^^^^^^^^^

Without batch verification:

; time RUST_LOG=off SIM_STRESS_TEST_QPS=30
../../target/simulator/deps/simtest-3e38176b1c2a7e11
test_simulated_load_basic

    running 1 test
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 6 filtered
out; finished in 17.91s

RUST_LOG=off SIM_STRESS_TEST_QPS=30 test_simulated_load_basic 16.72s
user 1.06s system 98% cpu 18.065 total
^^^^^^^^^^^

These numbers are corroborated by profiles taken with xcode, which
indicated
a drop from 55 Gigacycles down to 48Gc, about a 15% improvement.

Lastly, the async benchmarks show that 1600 certs per second per core is
achievable
with the async interface. The single-threaded synchronous benchmarks top
out at a
bit over 2000 certs per second. We may be able to improve the async
interface a bit
more with a lower-contention locking method.
  • Loading branch information
mystenmark authored Mar 13, 2023
1 parent 56ff6a5 commit 4d99ed8
Show file tree
Hide file tree
Showing 12 changed files with 600 additions and 220 deletions.
112 changes: 93 additions & 19 deletions crates/sui-core/benches/batch_verification_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,101 @@ use criterion::*;
use rand::prelude::*;
use rand::seq::SliceRandom;

use futures::future::join_all;
use prometheus::Registry;
use std::sync::Arc;
use sui_core::test_utils::{make_cert_with_large_committee, make_dummy_tx};
use sui_types::committee::Committee;
use sui_types::crypto::{get_key_pair, AccountKeyPair};
use sui_types::crypto::{get_key_pair, AccountKeyPair, AuthorityKeyPair};
use sui_types::messages::CertifiedTransaction;

use sui_core::batch_bls_verifier::*;

use criterion::Criterion;
fn gen_certs(
committee: &Committee,
key_pairs: &[AuthorityKeyPair],
count: u64,
) -> Vec<CertifiedTransaction> {
let (receiver, _): (_, AccountKeyPair) = get_key_pair();

let senders: Vec<_> = (0..count)
.into_iter()
.map(|_| get_key_pair::<AccountKeyPair>())
.collect();

let txns: Vec<_> = senders
.iter()
.map(|(sender, sender_sec)| make_dummy_tx(receiver, *sender, sender_sec))
.collect();

txns.iter()
.map(|t| make_cert_with_large_committee(committee, key_pairs, t))
.collect()
}

fn async_verifier_bench(c: &mut Criterion) {
let (committee, key_pairs) = Committee::new_simple_test_committee_of_size(100);
let committee = Arc::new(committee);
let count = 200;
let certs = gen_certs(&committee, &key_pairs, count);

let mut group = c.benchmark_group("async_verify");

// 8 times as many tasks as CPUs.
let over_subscription = 32;

// Get throughput per core
group.throughput(Throughput::Elements(count * over_subscription));

group.sample_size(10);

let registry = Registry::new();
let metrics = BatchCertificateVerifierMetrics::new(&registry);

let num_cpus = num_cpus::get() as u64;
for num_threads in [1, num_cpus / 2, num_cpus] {
for batch_size in [8, 16, 32] {
group.bench_with_input(
BenchmarkId::new(
format!("num_threads={num_threads} batch_size={batch_size}"),
count,
),
&count,
|b, _| {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_threads as usize)
.enable_time()
.build()
.unwrap();
let batch_verifier = Arc::new(BatchCertificateVerifier::new_with_batch_size(
committee.clone(),
batch_size,
metrics.clone(),
));

b.iter(|| {
let handles: Vec<_> = (0..(num_threads * over_subscription))
.into_iter()
.map(|_| {
let batch_verifier = batch_verifier.clone();
let certs = certs.clone();
runtime.spawn(async move {
for c in certs.into_iter() {
batch_verifier.verify_cert_skip_cache(c).await.unwrap();
}
})
})
.collect();

runtime.block_on(async move {
join_all(handles).await;
});
})
},
);
}
}
}

fn batch_verification_bench(c: &mut Criterion) {
let (committee, key_pairs) = Committee::new_simple_test_committee_of_size(100);
Expand All @@ -22,23 +110,9 @@ fn batch_verification_bench(c: &mut Criterion) {
// pretty significant at that point.
for batch_size in [1, 4, 16, 32, 64] {
for num_errors in [0, 1] {
let (receiver, _): (_, AccountKeyPair) = get_key_pair();

let senders: Vec<_> = (0..batch_size)
.into_iter()
.map(|_| get_key_pair::<AccountKeyPair>())
.collect();

let txns: Vec<_> = senders
.iter()
.map(|(sender, sender_sec)| make_dummy_tx(receiver, *sender, sender_sec))
.collect();

let mut certs: Vec<_> = txns
.iter()
.map(|t| make_cert_with_large_committee(&committee, &key_pairs, t))
.collect();
let mut certs = gen_certs(&committee, &key_pairs, batch_size);

let (receiver, _): (_, AccountKeyPair) = get_key_pair();
let (other_sender, other_sender_sec): (_, AccountKeyPair) = get_key_pair();
let other_tx = make_dummy_tx(receiver, other_sender, &other_sender_sec);
let other_cert = make_cert_with_large_committee(&committee, &key_pairs, &other_tx);
Expand All @@ -64,5 +138,5 @@ fn batch_verification_bench(c: &mut Criterion) {
group.finish();
}

criterion_group!(benches, batch_verification_bench);
criterion_group!(benches, batch_verification_bench, async_verifier_bench);
criterion_main!(benches);
4 changes: 2 additions & 2 deletions crates/sui-core/benches/verified_cert_cache_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use criterion::*;

use sui_core::authority::{VerifiedCertificateCache, VerifiedCertificateCacheMetrics};
use sui_core::batch_bls_verifier::{BatchCertificateVerifierMetrics, VerifiedCertificateCache};
use sui_types::digests::CertificateDigest;

use criterion::Criterion;
Expand All @@ -25,7 +25,7 @@ fn verified_cert_cache_bench(c: &mut Criterion) {
assert_eq!(chunks.len(), cpus);

let registry = prometheus::Registry::new();
let metrics = VerifiedCertificateCacheMetrics::new(&registry);
let metrics = BatchCertificateVerifierMetrics::new(&registry);
let cache = VerifiedCertificateCache::new(metrics);

let mut group = c.benchmark_group("digest-caching");
Expand Down
8 changes: 3 additions & 5 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,12 @@ use sui_types::{
use typed_store::Map;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
pub use crate::authority::authority_per_epoch_store::{
VerifiedCertificateCache, VerifiedCertificateCacheMetrics,
};
use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
use crate::authority::authority_store::{ExecutionLockReadGuard, InputKey, ObjectLockStatus};
use crate::authority::authority_store_pruner::AuthorityStorePruner;
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::batch_bls_verifier::BatchCertificateVerifierMetrics;
use crate::checkpoints::CheckpointStore;
use crate::epoch::committee_store::CommitteeStore;
use crate::epoch::epoch_metrics::EpochMetrics;
Expand Down Expand Up @@ -1683,7 +1681,7 @@ impl AuthorityState {
);
let registry = Registry::new();
let cache_metrics = Arc::new(ResolverMetrics::new(&registry));
let verified_cert_cache_metrics = VerifiedCertificateCacheMetrics::new(&registry);
let batch_verifier_metrics = BatchCertificateVerifierMetrics::new(&registry);
let epoch_store = AuthorityPerEpochStore::new(
name,
Arc::new(genesis_committee.clone()),
Expand All @@ -1693,7 +1691,7 @@ impl AuthorityState {
EpochStartConfiguration::new_for_testing(),
store.clone(),
cache_metrics,
verified_cert_cache_metrics,
batch_verifier_metrics,
);

let epochs = Arc::new(CommitteeStore::new(
Expand Down
100 changes: 11 additions & 89 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use futures::future::{join_all, select, Either};
use futures::FutureExt;
use lru::LruCache;
use narwhal_executor::ExecutionIndices;
use narwhal_types::Round;
use parking_lot::RwLock;
Expand All @@ -13,7 +12,6 @@ use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::iter;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use sui_storage::default_db_options;
Expand All @@ -22,7 +20,6 @@ use sui_types::accumulator::Accumulator;
use sui_types::base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest};
use sui_types::committee::Committee;
use sui_types::crypto::{AuthoritySignInfo, AuthorityStrongQuorumSignInfo};
use sui_types::digests::CertificateDigest;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{
AuthorityCapabilities, CertifiedTransaction, ConsensusTransaction, ConsensusTransactionKey,
Expand All @@ -38,6 +35,7 @@ use typed_store::traits::{TableSummary, TypedStoreDebug};
use crate::authority::authority_notify_read::NotifyRead;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::authority::{AuthorityStore, CertTxGuard, ResolverWrapper, MAX_TX_RECOVERY_RETRY};
use crate::batch_bls_verifier::*;
use crate::checkpoints::{
CheckpointCommitHeight, CheckpointServiceNotify, EpochStats, PendingCheckpoint,
PendingCheckpointInfo,
Expand All @@ -56,7 +54,7 @@ use move_vm_runtime::move_vm::MoveVM;
use move_vm_runtime::native_functions::NativeFunctionTable;
use mysten_common::notify_once::NotifyOnce;
use mysten_metrics::monitored_scope;
use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use prometheus::IntCounter;
use std::cmp::Ordering as CmpOrdering;
use sui_adapter::adapter;
use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
Expand Down Expand Up @@ -117,9 +115,10 @@ pub struct AuthorityPerEpochStore {
reconfig_state_mem: RwLock<ReconfigState>,
consensus_notify_read: NotifyRead<SequencedConsensusTransactionKey, ()>,

// Cache of certificates that are known to have valid signatures.
// Lives in per-epoch store because the caching is only valid within an epoch.
verified_cert_cache: VerifiedCertificateCache,
/// Batch verifier for certificates - also caches certificates that are known to have
/// valid signatures. Lives in per-epoch store because the caching/batching is only valid
/// within for certs within the current epoch.
pub(crate) batch_verifier: BatchCertificateVerifier,

pub(crate) checkpoint_state_notify_read: NotifyRead<CheckpointSequenceNumber, Accumulator>,

Expand Down Expand Up @@ -334,7 +333,7 @@ impl AuthorityPerEpochStore {
epoch_start_configuration: EpochStartConfiguration,
store: Arc<AuthorityStore>,
cache_metrics: Arc<ResolverMetrics>,
verified_cert_cache_metrics: Arc<VerifiedCertificateCacheMetrics>,
batch_verifier_metrics: Arc<BatchCertificateVerifierMetrics>,
) -> Arc<Self> {
let current_time = Instant::now();
let epoch_id = committee.epoch;
Expand Down Expand Up @@ -373,6 +372,8 @@ impl AuthorityPerEpochStore {
.protocol_version();
let protocol_config = ProtocolConfig::get_for_version(protocol_version);
let execution_component = ExecutionComponents::new(&protocol_config, store, cache_metrics);
let batch_verifier =
BatchCertificateVerifier::new(committee.clone(), batch_verifier_metrics);
Arc::new(Self {
committee,
protocol_config,
Expand All @@ -383,7 +384,7 @@ impl AuthorityPerEpochStore {
epoch_alive_notify,
epoch_alive: tokio::sync::RwLock::new(true),
consensus_notify_read: NotifyRead::new(),
verified_cert_cache: VerifiedCertificateCache::new(verified_cert_cache_metrics),
batch_verifier,
checkpoint_state_notify_read: NotifyRead::new(),
end_of_publish: Mutex::new(end_of_publish),
pending_consensus_certificates: Mutex::new(pending_consensus_certificates),
Expand Down Expand Up @@ -429,7 +430,7 @@ impl AuthorityPerEpochStore {
epoch_start_configuration,
store,
self.execution_component.metrics(),
self.verified_cert_cache.metrics.clone(),
self.batch_verifier.metrics.clone(),
)
}

Expand All @@ -453,10 +454,6 @@ impl AuthorityPerEpochStore {
self.committee.epoch
}

pub fn verified_cert_cache(&self) -> &VerifiedCertificateCache {
&self.verified_cert_cache
}

pub fn get_state_hash_for_checkpoint(
&self,
checkpoint: &CheckpointSequenceNumber,
Expand Down Expand Up @@ -1995,78 +1992,3 @@ impl ExecutionComponents {
self.metrics.clone()
}
}

// Cache up to 20000 verified certs. We will need to tune this number in the future - a decent
// guess to start with is that it should be 10-20 times larger than peak transactions per second,
// on the assumption that we should see most certs twice within about 10-20 seconds at most: Once via RPC, once via consensus.
const VERIFIED_CERTIFICATE_CACHE_SIZE: usize = 20000;

pub struct VerifiedCertificateCacheMetrics {
certificate_signatures_cache_hits: IntCounter,
certificate_signatures_cache_evictions: IntCounter,
}

impl VerifiedCertificateCacheMetrics {
pub fn new(registry: &Registry) -> Arc<Self> {
Arc::new(Self {
certificate_signatures_cache_hits: register_int_counter_with_registry!(
"certificate_signatures_cache_hits",
"Number of certificates which were known to be verified because of signature cache.",
registry
)
.unwrap(),
certificate_signatures_cache_evictions: register_int_counter_with_registry!(
"certificate_signatures_cache_evictions",
"Number of times we evict a pre-existing key were known to be verified because of signature cache.",
registry
)
.unwrap(),
})
}
}

pub struct VerifiedCertificateCache {
inner: RwLock<LruCache<CertificateDigest, ()>>,
metrics: Arc<VerifiedCertificateCacheMetrics>,
}

impl VerifiedCertificateCache {
pub fn new(metrics: Arc<VerifiedCertificateCacheMetrics>) -> Self {
Self {
inner: RwLock::new(LruCache::new(
NonZeroUsize::new(VERIFIED_CERTIFICATE_CACHE_SIZE).unwrap(),
)),
metrics,
}
}

pub fn is_cert_verified(&self, digest: &CertificateDigest) -> bool {
let inner = self.inner.read();
if inner.contains(digest) {
self.metrics.certificate_signatures_cache_hits.inc();
true
} else {
false
}
}

pub fn cache_cert_verified(&self, digest: CertificateDigest) {
let mut inner = self.inner.write();
if let Some(old) = inner.push(digest, ()) {
if old.0 != digest {
self.metrics.certificate_signatures_cache_evictions.inc();
}
}
}

pub fn cache_certs_verified(&self, digests: Vec<CertificateDigest>) {
let mut inner = self.inner.write();
digests.into_iter().for_each(|d| {
if let Some(old) = inner.push(d, ()) {
if old.0 != d {
self.metrics.certificate_signatures_cache_evictions.inc();
}
}
});
}
}
Loading

0 comments on commit 4d99ed8

Please sign in to comment.