From 0405b79b85399d96402610970085c286156d714d Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Fri, 19 Aug 2022 02:24:41 -0700 Subject: [PATCH] [Storage] Add caches for JMT nodes. (#2969) --- Cargo.lock | 23 +++++ api/test-context/src/test_context.rs | 4 +- .../src/storage_interface.rs | 6 +- aptos-node/src/lib.rs | 8 +- config/src/config/storage_config.rs | 6 ++ crates/aptos-genesis/src/lib.rs | 9 +- .../src/bin/db-bootstrapper.rs | 6 +- .../executor-benchmark/src/db_generator.rs | 5 +- execution/executor-benchmark/src/lib.rs | 5 +- .../src/tests/driver_factory.rs | 10 +- storage/aptosdb/Cargo.toml | 2 + storage/aptosdb/src/lib.rs | 50 +++++++++- storage/aptosdb/src/lru_node_cache.rs | 52 ++++++++++ storage/aptosdb/src/metrics.rs | 13 +++ .../aptosdb/src/pruner/state_store/test.rs | 31 +++--- storage/aptosdb/src/state_merkle_db.rs | 94 ++++++++++++++++--- storage/aptosdb/src/state_store/mod.rs | 27 +++--- .../state_merkle_batch_committer.rs | 24 +++-- .../src/state_store/state_store_test.rs | 13 ++- storage/aptosdb/src/versioned_node_cache.rs | 87 +++++++++++++++++ .../backup-cli/src/bin/replay-verify.rs | 5 +- storage/backup/backup-cli/src/utils/mod.rs | 4 +- storage/jellyfish-merkle/src/node_type/mod.rs | 4 + 23 files changed, 412 insertions(+), 76 deletions(-) create mode 100644 storage/aptosdb/src/lru_node_cache.rs create mode 100644 storage/aptosdb/src/versioned_node_cache.rs diff --git a/Cargo.lock b/Cargo.lock index b69081a10fb4a..c50a44830ee93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1493,10 +1493,12 @@ dependencies = [ "aptos-vm", "aptosdb-indexer", "arc-swap", + "arr_macro", "bcs", "byteorder", "executor-types", "itertools", + "lru", "move-deps", "num-derive", "num-traits 0.2.15", @@ -1559,6 +1561,27 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" +[[package]] +name = "arr_macro" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a105bfda48707cf19220129e78fca01e9639433ffaef4163546ed8fb04120a5" +dependencies = [ + "arr_macro_impl", + "proc-macro-hack", +] + +[[package]] +name = "arr_macro_impl" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0609c78bd572f4edc74310dfb63a01f5609d53fa8b4dd7c4d98aef3b3e8d72d1" +dependencies = [ + "proc-macro-hack", + "quote 1.0.20", + "syn 1.0.98", +] + [[package]] name = "array_tool" version = "1.0.3" diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index 85fc22cf121cf..054b1bf3bf450 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -8,7 +8,8 @@ use aptos_api_types::{ X_APTOS_LEDGER_TIMESTAMP, X_APTOS_LEDGER_VERSION, }; use aptos_config::config::{ - NodeConfig, RocksdbConfigs, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, + NodeConfig, RocksdbConfigs, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, }; use aptos_crypto::{hash::HashValue, SigningKey}; use aptos_mempool::mocks::MockSharedMempool; @@ -113,6 +114,7 @@ pub fn new_test_context(test_name: String, use_db_with_indexer: bool) -> TestCon RocksdbConfigs::default(), false, /* indexer */ TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, ) .unwrap(), ) diff --git a/aptos-move/aptos-validator-interface/src/storage_interface.rs b/aptos-move/aptos-validator-interface/src/storage_interface.rs index fe5dc02dca393..c6b66b0495eaf 100644 --- a/aptos-move/aptos-validator-interface/src/storage_interface.rs +++ b/aptos-move/aptos-validator-interface/src/storage_interface.rs @@ -3,7 +3,10 @@ use crate::AptosValidatorInterface; use anyhow::{anyhow, Result}; -use aptos_config::config::{RocksdbConfigs, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE}; +use aptos_config::config::{ + RocksdbConfigs, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, NO_OP_STORAGE_PRUNER_CONFIG, + TARGET_SNAPSHOT_SIZE, +}; use aptos_types::{ account_address::AccountAddress, account_state::AccountState, @@ -27,6 +30,7 @@ impl DBDebuggerInterface { RocksdbConfigs::default(), false, TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, )?))) } } diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 455440c5cb445..a49090efe414b 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -42,9 +42,10 @@ use mempool_notifications::MempoolNotificationSender; use network::application::storage::PeerMetadataStorage; use network_builder::builder::NetworkBuilder; use rand::{rngs::StdRng, SeedableRng}; -use state_sync_driver::driver_factory::DriverFactory; -use state_sync_driver::driver_factory::StateSyncRuntimes; -use state_sync_driver::metadata_storage::PersistentMetadataStorage; +use state_sync_driver::{ + driver_factory::{DriverFactory, StateSyncRuntimes}, + metadata_storage::PersistentMetadataStorage, +}; use std::{ boxed::Box, collections::{HashMap, HashSet}, @@ -481,6 +482,7 @@ pub fn setup_environment(node_config: NodeConfig) -> anyhow::Result node_config.storage.rocksdb_configs, node_config.storage.enable_indexer, node_config.storage.target_snapshot_size, + node_config.storage.max_num_nodes_per_lru_cache_shard, ) .map_err(|err| anyhow!("DB failed to open {}", err))?, ); diff --git a/config/src/config/storage_config.rs b/config/src/config/storage_config.rs index ac2ec048d71cb..df3b2d12213d9 100644 --- a/config/src/config/storage_config.rs +++ b/config/src/config/storage_config.rs @@ -8,6 +8,9 @@ use std::{ path::PathBuf, }; +// Lru cache will consume about 2G RAM based on this default value. +pub const DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD: usize = 1 << 13; + pub const TARGET_SNAPSHOT_SIZE: usize = 100_000; /// Port selected RocksDB options for tuning underlying rocksdb instance of AptosDB. @@ -76,6 +79,8 @@ pub struct StorageConfig { data_dir: PathBuf, /// The threshold that determine whether a snapshot should be committed to state merkle db. pub target_snapshot_size: usize, + /// The max # of nodes for a lru cache shard. + pub max_num_nodes_per_lru_cache_shard: usize, /// Rocksdb-specific configurations pub rocksdb_configs: RocksdbConfigs, /// Try to enable the internal indexer. The indexer expects to have seen all transactions @@ -180,6 +185,7 @@ impl Default for StorageConfig { rocksdb_configs: RocksdbConfigs::default(), enable_indexer: false, target_snapshot_size: TARGET_SNAPSHOT_SIZE, + max_num_nodes_per_lru_cache_shard: DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, } } } diff --git a/crates/aptos-genesis/src/lib.rs b/crates/aptos-genesis/src/lib.rs index 672e31eb05943..a7208498a0385 100644 --- a/crates/aptos-genesis/src/lib.rs +++ b/crates/aptos-genesis/src/lib.rs @@ -10,9 +10,11 @@ pub mod keys; #[cfg(any(test, feature = "testing"))] pub mod test_utils; -use crate::builder::GenesisConfiguration; -use crate::config::ValidatorConfiguration; -use aptos_config::config::{RocksdbConfigs, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE}; +use crate::{builder::GenesisConfiguration, config::ValidatorConfiguration}; +use aptos_config::config::{ + RocksdbConfigs, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, NO_OP_STORAGE_PRUNER_CONFIG, + TARGET_SNAPSHOT_SIZE, +}; use aptos_crypto::ed25519::Ed25519PublicKey; use aptos_temppath::TempPath; use aptos_types::{chain_id::ChainId, transaction::Transaction, waypoint::Waypoint}; @@ -135,6 +137,7 @@ impl GenesisInfo { RocksdbConfigs::default(), false, TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, )?; let db_rw = DbReaderWriter::new(aptosdb); executor::db_bootstrapper::generate_waypoint::(&db_rw, genesis) diff --git a/execution/db-bootstrapper/src/bin/db-bootstrapper.rs b/execution/db-bootstrapper/src/bin/db-bootstrapper.rs index dd080e54e8eb1..1e02e994aa3d1 100644 --- a/execution/db-bootstrapper/src/bin/db-bootstrapper.rs +++ b/execution/db-bootstrapper/src/bin/db-bootstrapper.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::{ensure, format_err, Context, Result}; -use aptos_config::config::{RocksdbConfigs, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE}; +use aptos_config::config::{ + RocksdbConfigs, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, NO_OP_STORAGE_PRUNER_CONFIG, + TARGET_SNAPSHOT_SIZE, +}; use aptos_temppath::TempPath; use aptos_types::{transaction::Transaction, waypoint::Waypoint}; use aptos_vm::AptosVM; @@ -55,6 +58,7 @@ fn main() -> Result<()> { RocksdbConfigs::default(), false, /* indexer */ TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, ) } else { // When not committing, we open the DB as secondary so the tool is usable along side a diff --git a/execution/executor-benchmark/src/db_generator.rs b/execution/executor-benchmark/src/db_generator.rs index c62094de6151b..96320442b847f 100644 --- a/execution/executor-benchmark/src/db_generator.rs +++ b/execution/executor-benchmark/src/db_generator.rs @@ -7,7 +7,9 @@ use aptos_config::{ utils::get_genesis_txn, }; -use aptos_config::config::{PrunerConfig, TARGET_SNAPSHOT_SIZE}; +use aptos_config::config::{ + PrunerConfig, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, TARGET_SNAPSHOT_SIZE, +}; use aptos_vm::AptosVM; use aptosdb::AptosDB; use executor::db_bootstrapper::{generate_waypoint, maybe_bootstrap}; @@ -61,6 +63,7 @@ fn bootstrap_with_genesis(db_dir: impl AsRef) { rocksdb_configs, false, /* indexer */ TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, ) .expect("DB should open."), ); diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index dbf40bf8dbef2..ffee53f11f7eb 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -13,7 +13,8 @@ use crate::{ transaction_generator::TransactionGenerator, }; use aptos_config::config::{ - NodeConfig, PrunerConfig, RocksdbConfigs, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, + NodeConfig, PrunerConfig, RocksdbConfigs, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, }; use aptos_jellyfish_merkle::metrics::{ APTOS_JELLYFISH_INTERNAL_ENCODED_BYTES, APTOS_JELLYFISH_LEAF_ENCODED_BYTES, @@ -36,6 +37,7 @@ pub fn init_db_and_executor(config: &NodeConfig) -> (DbReaderWriter, BlockExecut RocksdbConfigs::default(), false, config.storage.target_snapshot_size, + config.storage.max_num_nodes_per_lru_cache_shard, ) .expect("DB should open."), ); @@ -59,6 +61,7 @@ fn create_checkpoint(source_dir: impl AsRef, checkpoint_dir: impl AsRef Self { let arc_ledger_rocksdb = Arc::new(ledger_rocksdb); @@ -288,6 +294,7 @@ impl AptosDB { Arc::clone(&arc_ledger_rocksdb), Arc::clone(&arc_state_merkle_rocksdb), target_snapshot_size, + max_nodes_per_lru_cache_shard, hack_for_tests, )); let ledger_pruner = LedgerPrunerManager::new( @@ -322,6 +329,7 @@ impl AptosDB { rocksdb_configs: RocksdbConfigs, enable_indexer: bool, target_snapshot_size: usize, + max_num_nodes_per_lru_cache_shard: usize, ) -> Result { ensure!( pruner_config.eq(&NO_OP_STORAGE_PRUNER_CONFIG) || !readonly, @@ -369,6 +377,7 @@ impl AptosDB { state_merkle_db, pruner_config, target_snapshot_size, + max_num_nodes_per_lru_cache_shard, readonly, ); @@ -458,6 +467,7 @@ impl AptosDB { )?, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, + 0, true, )) } @@ -467,6 +477,7 @@ impl AptosDB { db_root_path: P, readonly: bool, target_snapshot_size: usize, + max_num_nodes_per_lru_cache_shard: usize, enable_indexer: bool, ) -> Self { Self::open( @@ -476,6 +487,7 @@ impl AptosDB { RocksdbConfigs::default(), enable_indexer, target_snapshot_size, + max_num_nodes_per_lru_cache_shard, ) .expect("Unable to open AptosDB") } @@ -483,13 +495,31 @@ impl AptosDB { /// This opens db in non-readonly mode, without the pruner. #[cfg(any(test, feature = "fuzzing"))] pub fn new_for_test + Clone>(db_root_path: P) -> Self { - Self::new_without_pruner(db_root_path, false, TARGET_SNAPSHOT_SIZE, false) + Self::new_without_pruner( + db_root_path, + false, + TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + false, + ) + } + + /// This opens db in non-readonly mode, without the pruner and cache. + #[cfg(any(test, feature = "fuzzing"))] + pub fn new_for_test_no_cache + Clone>(db_root_path: P) -> Self { + Self::new_without_pruner(db_root_path, false, TARGET_SNAPSHOT_SIZE, 0, false) } /// This opens db in non-readonly mode, without the pruner, and with the indexer #[cfg(any(test, feature = "fuzzing"))] pub fn new_for_test_with_indexer + Clone>(db_root_path: P) -> Self { - Self::new_without_pruner(db_root_path, false, TARGET_SNAPSHOT_SIZE, true) + Self::new_without_pruner( + db_root_path, + false, + TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + true, + ) } /// This opens db in non-readonly mode, without the pruner. @@ -498,13 +528,25 @@ impl AptosDB { db_root_path: P, target_snapshot_size: usize, ) -> Self { - Self::new_without_pruner(db_root_path, false, target_snapshot_size, false) + Self::new_without_pruner( + db_root_path, + false, + target_snapshot_size, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + false, + ) } /// This opens db in non-readonly mode, without the pruner. #[cfg(any(test, feature = "fuzzing"))] pub fn new_readonly_for_test + Clone>(db_root_path: P) -> Self { - Self::new_without_pruner(db_root_path, true, TARGET_SNAPSHOT_SIZE, false) + Self::new_without_pruner( + db_root_path, + true, + TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + false, + ) } /// This gets the current buffered_state in StateStore. diff --git a/storage/aptosdb/src/lru_node_cache.rs b/storage/aptosdb/src/lru_node_cache.rs new file mode 100644 index 0000000000000..06a7c40db30b3 --- /dev/null +++ b/storage/aptosdb/src/lru_node_cache.rs @@ -0,0 +1,52 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use crate::state_merkle_db::Node; +use aptos_infallible::Mutex; +use aptos_jellyfish_merkle::node_type::NodeKey; +use aptos_types::{nibble::nibble_path::NibblePath, transaction::Version}; +use lru::LruCache; + +const NUM_SHARDS: usize = 256; + +#[derive(Debug)] +pub(crate) struct LruNodeCache { + shards: [Mutex>; NUM_SHARDS], +} + +impl LruNodeCache { + pub fn new(max_nodes_per_shard: usize) -> Self { + Self { + // `arr!()` doesn't allow a const in place of the integer literal + shards: arr_macro::arr![Mutex::new(LruCache::new(max_nodes_per_shard)); 256], + } + } + + fn shard(nibble_path: &NibblePath) -> u8 { + let path_bytes = nibble_path.bytes(); + if path_bytes.is_empty() { + 0 + } else { + path_bytes[0] + } + } + + pub fn get(&self, node_key: &NodeKey) -> Option { + let mut r = self.shards[Self::shard(node_key.nibble_path()) as usize].lock(); + let ret = r.get(node_key.nibble_path()).and_then(|(version, node)| { + if *version == node_key.version() { + Some(node.clone()) + } else { + None + } + }); + ret + } + + pub fn put(&self, node_key: NodeKey, node: Node) { + let (version, nibble_path) = node_key.unpack(); + let mut w = self.shards[Self::shard(&nibble_path) as usize].lock(); + let value = (version, node); + w.put(nibble_path, value); + } +} diff --git a/storage/aptosdb/src/metrics.rs b/storage/aptosdb/src/metrics.rs index 032f41c35810f..64f11316cce07 100644 --- a/storage/aptosdb/src/metrics.rs +++ b/storage/aptosdb/src/metrics.rs @@ -128,6 +128,19 @@ pub static OTHER_TIMERS_SECONDS: Lazy = Lazy::new(|| { .unwrap() }); +pub static NODE_CACHE_SECONDS: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "aptos_storage_node_cache_seconds", + // metric description + "Latency of node cache.", + // metric labels (dimensions) + &["name"], + exponential_buckets(/*start=*/ 1e-9, /*factor=*/ 2.0, /*count=*/ 30).unwrap(), + ) + .unwrap() +}); + /// Rocksdb metrics pub static ROCKSDB_PROPERTIES: Lazy = Lazy::new(|| { register_int_gauge_vec!( diff --git a/storage/aptosdb/src/pruner/state_store/test.rs b/storage/aptosdb/src/pruner/state_store/test.rs index 298c8554c3b1a..3359028046e9f 100644 --- a/storage/aptosdb/src/pruner/state_store/test.rs +++ b/storage/aptosdb/src/pruner/state_store/test.rs @@ -3,23 +3,25 @@ use aptos_config::config::{LedgerPrunerConfig, StateMerklePrunerConfig}; use proptest::{prelude::*, proptest}; -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use aptos_crypto::HashValue; use aptos_state_view::state_storage_usage::StateStorageUsage; use aptos_temppath::TempPath; -use aptos_types::state_store::{state_key::StateKey, state_value::StateValue}; -use aptos_types::transaction::Version; +use aptos_types::{ + state_store::{state_key::StateKey, state_value::StateValue}, + transaction::Version, +}; use schemadb::{ReadOptions, DB}; use storage_interface::{jmt_update_refs, jmt_updates, DbReader}; -use crate::pruner::state_pruner_worker::StatePrunerWorker; -use crate::stale_node_index::StaleNodeIndexSchema; -use crate::test_helper::{arb_state_kv_sets, update_store}; use crate::{ - change_set::ChangeSet, pruner::*, state_store::StateStore, AptosDB, LedgerPrunerManager, - PrunerManager, StatePrunerManager, + change_set::ChangeSet, + pruner::{state_pruner_worker::StatePrunerWorker, *}, + stale_node_index::StaleNodeIndexSchema, + state_store::StateStore, + test_helper::{arb_state_kv_sets, update_store}, + AptosDB, LedgerPrunerManager, PrunerManager, StatePrunerManager, }; fn put_value_set( @@ -92,13 +94,8 @@ fn test_state_store_pruner() { let prune_batch_size = 10; let num_versions = 25; let tmp_dir = TempPath::new(); - let aptos_db = AptosDB::new_for_test(&tmp_dir); - let state_store = &StateStore::new( - Arc::clone(&aptos_db.ledger_db), - Arc::clone(&aptos_db.state_merkle_db), - 1000, /* snapshot_size_threshold, does not matter */ - false, /* hack_for_tests */ - ); + let aptos_db = AptosDB::new_for_test_no_cache(&tmp_dir); + let state_store = &aptos_db.state_store; let mut root_hashes = vec![]; // Insert 25 values in the db. @@ -180,7 +177,7 @@ fn test_state_store_pruner_partial_version() { let prune_batch_size = 1; let tmp_dir = TempPath::new(); - let aptos_db = AptosDB::new_for_test(&tmp_dir); + let aptos_db = AptosDB::new_for_test_no_cache(&tmp_dir); let state_store = &aptos_db.state_store; let _root0 = put_value_set( diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index d53e2e3a0f402..fa7fe9998c4cb 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -1,43 +1,57 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use crate::schema::jellyfish_merkle_node::JellyfishMerkleNodeSchema; -use crate::stale_node_index::StaleNodeIndexSchema; -use crate::OTHER_TIMERS_SECONDS; +use crate::{ + lru_node_cache::LruNodeCache, metrics::NODE_CACHE_SECONDS, + schema::jellyfish_merkle_node::JellyfishMerkleNodeSchema, + stale_node_index::StaleNodeIndexSchema, versioned_node_cache::VersionedNodeCache, + OTHER_TIMERS_SECONDS, +}; use anyhow::Result; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_jellyfish_merkle::node_type::NodeType; use aptos_jellyfish_merkle::{ - node_type::NodeKey, JellyfishMerkleTree, TreeReader, TreeUpdateBatch, TreeWriter, + node_type::{NodeKey, NodeType}, + JellyfishMerkleTree, TreeReader, TreeUpdateBatch, TreeWriter, }; -use aptos_types::proof::SparseMerkleProofExt; use aptos_types::{ nibble::{nibble_path::NibblePath, ROOT_NIBBLE_HEIGHT}, - proof::SparseMerkleRangeProof, + proof::{SparseMerkleProofExt, SparseMerkleRangeProof}, state_store::state_key::StateKey, transaction::Version, }; use rayon::prelude::*; use schemadb::{SchemaBatch, DB}; -use std::{collections::HashMap, ops::Deref, sync::Arc}; +use std::{collections::HashMap, ops::Deref, sync::Arc, time::Instant}; pub(crate) type LeafNode = aptos_jellyfish_merkle::node_type::LeafNode; pub(crate) type Node = aptos_jellyfish_merkle::node_type::Node; type NodeBatch = aptos_jellyfish_merkle::NodeBatch; #[derive(Debug)] -pub struct StateMerkleDb(Arc); +pub struct StateMerkleDb { + db: Arc, + enable_cache: bool, + version_cache: VersionedNodeCache, + lru_cache: LruNodeCache, +} impl Deref for StateMerkleDb { type Target = DB; fn deref(&self) -> &Self::Target { - &self.0 + &self.db } } impl StateMerkleDb { - pub fn new(state_merkle_rocksdb: Arc) -> Self { - Self(state_merkle_rocksdb) + pub fn new(state_merkle_rocksdb: Arc, max_nodes_per_lru_cache_shard: usize) -> Self { + Self { + db: state_merkle_rocksdb, + // TODO(grao): Currently when this value is set to 0 we disable both caches. This is + // hacky, need to revisit. + enable_cache: max_nodes_per_lru_cache_shard > 0, + version_cache: VersionedNodeCache::new(), + lru_cache: LruNodeCache::new(max_nodes_per_lru_cache_shard), + } } pub fn get_with_proof_ext( @@ -117,6 +131,18 @@ impl StateMerkleDb { self.batch_put_value_set(value_set, node_hashes, base_version, version) }?; + if self.cache_enabled() { + self.version_cache.add_version( + version, + tree_update_batch + .node_batch + .iter() + .flatten() + .cloned() + .collect(), + ); + } + let batch = SchemaBatch::new(); { let _timer = OTHER_TIMERS_SECONDS @@ -147,6 +173,18 @@ impl StateMerkleDb { Ok((batch, new_root_hash)) } + pub(crate) fn cache_enabled(&self) -> bool { + self.enable_cache + } + + pub(crate) fn version_cache(&self) -> &VersionedNodeCache { + &self.version_cache + } + + pub(crate) fn lru_cache(&self) -> &LruNodeCache { + &self.lru_cache + } + /// Finds the rightmost leaf by scanning the entire DB. #[cfg(test)] pub fn get_rightmost_leaf_naive(&self) -> Result> { @@ -174,7 +212,37 @@ impl StateMerkleDb { impl TreeReader for StateMerkleDb { fn get_node_option(&self, node_key: &NodeKey) -> Result> { - self.get::(node_key) + let start_time = Instant::now(); + if !self.cache_enabled() { + let node_opt = self.get::(node_key)?; + NODE_CACHE_SECONDS + .with_label_values(&["cache_disabled"]) + .observe(start_time.elapsed().as_secs_f64()); + return Ok(node_opt); + } + let node_opt = if let Some(node_cache) = self.version_cache.get_version(node_key.version()) + { + let node = node_cache.get(node_key).cloned(); + NODE_CACHE_SECONDS + .with_label_values(&["versioned_cache_hit"]) + .observe(start_time.elapsed().as_secs_f64()); + node + } else if let Some(node) = self.lru_cache.get(node_key) { + NODE_CACHE_SECONDS + .with_label_values(&["lru_cache_hit"]) + .observe(start_time.elapsed().as_secs_f64()); + Some(node) + } else { + let node_opt = self.get::(node_key)?; + if let Some(node) = &node_opt { + self.lru_cache.put(node_key.clone(), node.clone()); + } + NODE_CACHE_SECONDS + .with_label_values(&["cache_miss"]) + .observe(start_time.elapsed().as_secs_f64()); + node_opt + }; + Ok(node_opt) } fn get_rightmost_leaf(&self) -> Result> { diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 6afad559b1eae..7253e200e0ce7 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -13,33 +13,32 @@ use aptos_jellyfish_merkle::{ iterator::JellyfishMerkleIterator, restore::StateSnapshotRestore, StateValueWriter, }; use aptos_logger::info; -use aptos_state_view::state_storage_usage::StateStorageUsage; -use aptos_state_view::StateViewId; -use aptos_types::state_store::state_value::StaleStateValueIndex; +use aptos_state_view::{state_storage_usage::StateStorageUsage, StateViewId}; use aptos_types::{ proof::{definition::LeafCount, SparseMerkleProofExt, SparseMerkleRangeProof}, state_store::{ state_key::StateKey, state_key_prefix::StateKeyPrefix, - state_value::{StateValue, StateValueChunkWithProof}, + state_value::{StaleStateValueIndex, StateValue, StateValueChunkWithProof}, }, transaction::Version, }; use executor_types::in_memory_state_calculator::InMemoryStateCalculator; use schemadb::{ReadOptions, SchemaBatch, DB}; -use std::ops::Deref; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, ops::Deref, sync::Arc}; use storage_interface::{ cached_state_view::CachedStateView, state_delta::StateDelta, sync_proof_fetcher::SyncProofFetcher, DbReader, StateSnapshotReceiver, }; -use crate::metrics::{STATE_ITEMS, TOTAL_STATE_BYTES}; -use crate::stale_state_value_index::StaleStateValueIndexSchema; -use crate::state_store::buffered_state::BufferedState; -use crate::version_data::{VersionData, VersionDataSchema}; use crate::{ - change_set::ChangeSet, schema::state_value::StateValueSchema, state_merkle_db::StateMerkleDb, + change_set::ChangeSet, + metrics::{STATE_ITEMS, TOTAL_STATE_BYTES}, + schema::state_value::StateValueSchema, + stale_state_value_index::StaleStateValueIndexSchema, + state_merkle_db::StateMerkleDb, + state_store::buffered_state::BufferedState, + version_data::{VersionData, VersionDataSchema}, AptosDbError, LedgerStore, TransactionStore, OTHER_TIMERS_SECONDS, }; @@ -239,9 +238,13 @@ impl StateStore { ledger_db: Arc, state_merkle_db: Arc, target_snapshot_size: usize, + max_nodes_per_lru_cache_shard: usize, hack_for_tests: bool, ) -> Self { - let state_merkle_db = Arc::new(StateMerkleDb::new(state_merkle_db)); + let state_merkle_db = Arc::new(StateMerkleDb::new( + state_merkle_db, + max_nodes_per_lru_cache_shard, + )); let state_db = Arc::new(StateDb { ledger_db, state_merkle_db, diff --git a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs index 8e9bd55f948fc..694dbc65099e9 100644 --- a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs +++ b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs @@ -3,20 +3,20 @@ //! This file defines the state merkle snapshot committer running in background thread. -use crate::jellyfish_merkle_node::JellyfishMerkleNodeSchema; -use crate::metrics::LATEST_SNAPSHOT_VERSION; -use crate::state_store::buffered_state::CommitMessage; -use crate::state_store::StateDb; -use crate::version_data::VersionDataSchema; -use crate::OTHER_TIMERS_SECONDS; +use crate::{ + jellyfish_merkle_node::JellyfishMerkleNodeSchema, + metrics::LATEST_SNAPSHOT_VERSION, + state_store::{buffered_state::CommitMessage, StateDb}, + version_data::VersionDataSchema, + OTHER_TIMERS_SECONDS, +}; use anyhow::{anyhow, ensure, Result}; use aptos_crypto::HashValue; use aptos_jellyfish_merkle::node_type::NodeKey; use aptos_logger::{info, trace}; use aptos_state_view::state_storage_usage::StateStorageUsage; use schemadb::SchemaBatch; -use std::sync::mpsc::Receiver; -use std::sync::Arc; +use std::sync::{mpsc::Receiver, Arc}; use storage_interface::state_delta::StateDelta; pub struct StateMerkleBatch { @@ -62,6 +62,14 @@ impl StateMerkleBatchCommitter { .state_merkle_db .write_schemas(batch) .expect("State merkle batch commit failed."); + if self.state_db.state_merkle_db.cache_enabled() { + self.state_db + .state_merkle_db + .version_cache() + .maybe_evict_version(self.state_db.state_merkle_db.lru_cache()); + } + // TODO(grao): Consider remove the following sender once we verified the + // version cache correctly cached all nodes we need. snapshot_ready_sender.send(()).unwrap(); info!( version = state_delta.current_version, diff --git a/storage/aptosdb/src/state_store/state_store_test.rs b/storage/aptosdb/src/state_store/state_store_test.rs index f06f99352a316..4a0e2ab9728e8 100644 --- a/storage/aptosdb/src/state_store/state_store_test.rs +++ b/storage/aptosdb/src/state_store/state_store_test.rs @@ -10,8 +10,11 @@ use aptos_types::{ }; use storage_interface::{jmt_update_refs, jmt_updates, DbReader, DbWriter, StateSnapshotReceiver}; -use crate::test_helper::{arb_state_kv_sets, update_store}; -use crate::{pruner::state_store::StateMerklePruner, AptosDB}; +use crate::{ + pruner::state_store::StateMerklePruner, + test_helper::{arb_state_kv_sets, update_store}, + AptosDB, +}; use super::*; @@ -254,7 +257,7 @@ fn test_stale_node_index() { let value3_update = StateValue::from(String::from("test_val3_update").into_bytes()); let tmp_dir = TempPath::new(); - let db = AptosDB::new_for_test(&tmp_dir); + let db = AptosDB::new_for_test_no_cache(&tmp_dir); let store = &db.state_store; // Update. @@ -376,7 +379,7 @@ fn test_stale_node_index_with_target_version() { let value3_update = StateValue::from(String::from("test_val3_update").into_bytes()); let tmp_dir = TempPath::new(); - let db = AptosDB::new_for_test(&tmp_dir); + let db = AptosDB::new_for_test_no_cache(&tmp_dir); let store = &db.state_store; // Update. @@ -485,7 +488,7 @@ fn test_stale_node_index_all_at_once() { let value3_update = StateValue::from(String::from("test_val3_update").into_bytes()); let tmp_dir = TempPath::new(); - let db = AptosDB::new_for_test(&tmp_dir); + let db = AptosDB::new_for_test_no_cache(&tmp_dir); let store = &db.state_store; let pruner = StateMerklePruner::new(Arc::clone(&db.state_merkle_db)); diff --git a/storage/aptosdb/src/versioned_node_cache.rs b/storage/aptosdb/src/versioned_node_cache.rs new file mode 100644 index 0000000000000..94dc688cad861 --- /dev/null +++ b/storage/aptosdb/src/versioned_node_cache.rs @@ -0,0 +1,87 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use crate::{lru_node_cache::LruNodeCache, state_merkle_db::Node, OTHER_TIMERS_SECONDS}; +use aptos_infallible::RwLock; +use aptos_jellyfish_merkle::node_type::NodeKey; +use aptos_types::transaction::Version; +use rayon::prelude::*; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; + +type NodeCache = HashMap; + +#[derive(Debug)] +pub(crate) struct VersionedNodeCache { + inner: RwLock)>>, +} + +impl VersionedNodeCache { + const NUM_VERSIONS_TO_CACHE: usize = 2; + + pub fn new() -> Self { + Self { + inner: RwLock::new(Default::default()), + } + } + + pub fn add_version(&self, version: Version, nodes: NodeCache) { + let _timer = OTHER_TIMERS_SECONDS + .with_label_values(&["version_cache_add"]) + .start_timer(); + + let mut locked = self.inner.write(); + if !locked.is_empty() { + let (last_version, _) = locked.back().unwrap(); + assert!( + *last_version < version, + "Updating older version. {} vs latest:{} ", + version, + *last_version, + ); + } + locked.push_back((version, Arc::new(nodes))); + } + + pub fn maybe_evict_version(&self, lru_cache: &LruNodeCache) { + let _timer = OTHER_TIMERS_SECONDS + .with_label_values(&["version_cache_evict"]) + .start_timer(); + + let to_evict = { + let locked = self.inner.read(); + if locked.len() > Self::NUM_VERSIONS_TO_CACHE { + locked + .front() + .map(|(version, cache)| (*version, cache.clone())) + } else { + None + } + }; + + if let Some((version, cache)) = to_evict { + cache + .iter() + .collect::>() + .into_par_iter() + .with_min_len(100) + .for_each(|(node_key, node)| { + lru_cache.put(node_key.clone(), node.clone()); + }); + + let evicted = self.inner.write().pop_front(); + assert_eq!(evicted, Some((version, cache))); + } + } + + pub fn get_version(&self, version: Version) -> Option> { + self.inner + .read() + .iter() + .rev() + .find(|(ver, _nodes)| *ver == version) + .map(|(_ver, nodes)| nodes.clone()) + } +} diff --git a/storage/backup/backup-cli/src/bin/replay-verify.rs b/storage/backup/backup-cli/src/bin/replay-verify.rs index 361a50f9b546f..d43a5c8b1706c 100644 --- a/storage/backup/backup-cli/src/bin/replay-verify.rs +++ b/storage/backup/backup-cli/src/bin/replay-verify.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use aptos_config::config::{NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE}; +use aptos_config::config::{ + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, +}; use aptos_logger::{prelude::*, Level, Logger}; use aptos_types::transaction::Version; use aptosdb::{AptosDB, GetRestoreHandler}; @@ -61,6 +63,7 @@ async fn main_impl() -> Result<()> { opt.rocksdb_opt.into(), false, TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, )?) .get_restore_handler(); ReplayVerifyCoordinator::new( diff --git a/storage/backup/backup-cli/src/utils/mod.rs b/storage/backup/backup-cli/src/utils/mod.rs index f9e38c1f62e5a..74f82e30673de 100644 --- a/storage/backup/backup-cli/src/utils/mod.rs +++ b/storage/backup/backup-cli/src/utils/mod.rs @@ -12,7 +12,8 @@ pub mod test_utils; use anyhow::{anyhow, Result}; use aptos_config::config::{ - RocksdbConfig, RocksdbConfigs, NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, + RocksdbConfig, RocksdbConfigs, DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, + NO_OP_STORAGE_PRUNER_CONFIG, TARGET_SNAPSHOT_SIZE, }; use aptos_crypto::HashValue; use aptos_infallible::duration_since_epoch; @@ -219,6 +220,7 @@ impl TryFrom for GlobalRestoreOptions { opt.rocksdb_opt.into(), false, TARGET_SNAPSHOT_SIZE, + DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD, )?) .get_restore_handler(); RestoreRunMode::Restore { restore_handler } diff --git a/storage/jellyfish-merkle/src/node_type/mod.rs b/storage/jellyfish-merkle/src/node_type/mod.rs index 99cc11b52876d..a5dcfe961eab6 100644 --- a/storage/jellyfish-merkle/src/node_type/mod.rs +++ b/storage/jellyfish-merkle/src/node_type/mod.rs @@ -138,6 +138,10 @@ impl NodeKey { }; Ok(NodeKey::new(version, nibble_path)) } + + pub fn unpack(self) -> (Version, NibblePath) { + (self.version, self.nibble_path) + } } #[derive(Clone, Debug, Eq, PartialEq)]