Skip to content

Commit

Permalink
perf(merkle tree): Manage indices / filters in RocksDB (matter-labs#1550
Browse files Browse the repository at this point in the history
)

## What ❔

- Allows including indices and Bloom filters as a part of block cache in
`RocksDB`. Adds the corresponding option for the Merkle tree on ENs.
- Allows profiling RocksDB operations and enables profiling for the
Merkle tree operations.

## Why ❔

- Indices and Bloom filters are a major contributor to the EN RAM usage;
by default, they are entirely loaded into RAM and are never unloaded. On
the mainnet, Merkle tree indices / filters are >23 GiB currently.
- Operation profiling could allow to estimate the impact of the new
RocksDB options more precisely.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Apr 5, 2024
1 parent 9fcb87e commit 6bbfa06
Show file tree
Hide file tree
Showing 21 changed files with 593 additions and 86 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{env, num::NonZeroUsize, time::Duration};
use std::{
env,
num::{NonZeroU32, NonZeroUsize},
time::Duration,
};

use anyhow::Context;
use serde::Deserialize;
Expand Down Expand Up @@ -231,6 +235,9 @@ pub(crate) struct OptionalENConfig {
default = "OptionalENConfig::default_max_l1_batches_per_tree_iter"
)]
pub max_l1_batches_per_tree_iter: usize,
/// Maximum number of files concurrently opened by Merkle tree RocksDB. Useful to fit into OS limits; can be used
/// as a rudimentary way to control RAM usage of the tree.
pub merkle_tree_max_open_files: Option<NonZeroU32>,
/// Chunk size for multi-get operations. Can speed up loading data for the Merkle tree on some environments,
/// but the effects vary wildly depending on the setup (e.g., the filesystem used).
#[serde(default = "OptionalENConfig::default_merkle_tree_multi_get_chunk_size")]
Expand All @@ -239,6 +246,11 @@ pub(crate) struct OptionalENConfig {
/// The default value is 128 MiB.
#[serde(default = "OptionalENConfig::default_merkle_tree_block_cache_size_mb")]
merkle_tree_block_cache_size_mb: usize,
/// If specified, RocksDB indices and Bloom filters will be managed by the block cache, rather than
/// being loaded entirely into RAM on the RocksDB initialization. The block cache capacity should be increased
/// correspondingly; otherwise, RocksDB performance can significantly degrade.
#[serde(default)]
pub merkle_tree_include_indices_and_filters_in_block_cache: bool,
/// Byte capacity of memtables (recent, non-persisted changes to RocksDB). Setting this to a reasonably
/// large value (order of 512 MiB) is helpful for large DBs that experience write stalls.
#[serde(default = "OptionalENConfig::default_merkle_tree_memtable_capacity_mb")]
Expand Down
4 changes: 4 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,15 @@ async fn run_tree(
) -> anyhow::Result<Arc<dyn TreeApiClient>> {
let metadata_calculator_config = MetadataCalculatorConfig {
db_path: config.required.merkle_tree_path.clone(),
max_open_files: config.optional.merkle_tree_max_open_files,
mode: MerkleTreeMode::Lightweight,
delay_interval: config.optional.metadata_calculator_delay(),
max_l1_batches_per_iter: config.optional.max_l1_batches_per_tree_iter,
multi_get_chunk_size: config.optional.merkle_tree_multi_get_chunk_size,
block_cache_capacity: config.optional.merkle_tree_block_cache_size(),
include_indices_and_filters_in_block_cache: config
.optional
.merkle_tree_include_indices_and_filters_in_block_cache,
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
};
Expand Down
1 change: 1 addition & 0 deletions core/lib/merkle_tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ leb128.workspace = true
once_cell.workspace = true
rayon.workspace = true
thiserror.workspace = true
thread_local.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
12 changes: 11 additions & 1 deletion core/lib/merkle_tree/examples/loadtest/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! `Database` implementation that flushes changes at the specified batches.
use std::any::Any;

use zksync_merkle_tree::{
unstable::{DeserializeError, Manifest, Node, NodeKey, Root},
unstable::{DeserializeError, Manifest, Node, NodeKey, ProfiledTreeOperation, Root},
Database, PatchSet, Patched,
};

Expand Down Expand Up @@ -39,6 +41,14 @@ impl Database for WithBatching<'_> {
self.inner.try_tree_node(key, is_leaf)
}

fn tree_nodes(&self, keys: &[(NodeKey, bool)]) -> Vec<Option<Node>> {
self.inner.tree_nodes(keys)
}

fn start_profiling(&self, operation: ProfiledTreeOperation) -> Box<dyn Any> {
self.inner.start_profiling(operation)
}

fn apply_patch(&mut self, patch: PatchSet) {
self.inner.apply_patch(patch);

Expand Down
5 changes: 5 additions & 0 deletions core/lib/merkle_tree/examples/loadtest/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ struct Cli {
/// Block cache capacity for RocksDB in bytes.
#[arg(long = "block-cache", conflicts_with = "in_memory")]
block_cache: Option<usize>,
/// If specified, RocksDB indices and Bloom filters will be managed by the block cache rather than
/// being loaded entirely into RAM.
#[arg(long = "cache-indices", conflicts_with = "in_memory")]
cache_indices: bool,
/// Chunk size for RocksDB multi-get operations.
#[arg(long = "chunk-size", conflicts_with = "in_memory")]
chunk_size: Option<usize>,
Expand Down Expand Up @@ -92,6 +96,7 @@ impl Cli {
);
let db_options = RocksDBOptions {
block_cache_capacity: self.block_cache,
include_indices_and_filters_in_block_cache: self.cache_indices,
..RocksDBOptions::default()
};
let db = RocksDB::with_options(dir.path(), db_options).unwrap();
Expand Down
6 changes: 5 additions & 1 deletion core/lib/merkle_tree/src/getters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
hasher::HasherWithStats,
recovery::MerkleTreeRecovery,
storage::{LoadAncestorsResult, SortedKeys, WorkingPatchSet},
types::{Nibbles, Node, TreeEntry, TreeEntryWithProof},
types::{Nibbles, Node, ProfiledTreeOperation, TreeEntry, TreeEntryWithProof},
Database, HashTree, Key, MerkleTree, NoVersionError, PruneDatabase, ValueHash,
};

Expand All @@ -21,6 +21,7 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
version: u64,
leaf_keys: &[Key],
) -> Result<Vec<TreeEntry>, NoVersionError> {
let _profiling_guard = self.db.start_profiling(ProfiledTreeOperation::GetEntries);
load_and_transform_entries(&self.db, version, leaf_keys, extract_entry)
}

Expand All @@ -36,6 +37,9 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
leaf_keys: &[Key],
) -> Result<Vec<TreeEntryWithProof>, NoVersionError> {
let mut hasher = HasherWithStats::new(&self.hasher);
let _profiling_guard = self
.db
.start_profiling(ProfiledTreeOperation::GetEntriesWithProofs);
load_and_transform_entries(
&self.db,
version,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/merkle_tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ mod utils;
pub mod unstable {
pub use crate::{
errors::DeserializeError,
types::{Manifest, Node, NodeKey, Root},
types::{Manifest, Node, NodeKey, ProfiledTreeOperation, Root},
};
}

Expand Down
23 changes: 21 additions & 2 deletions core/lib/merkle_tree/src/storage/database.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! `Database` trait and its implementations.
use std::ops;
use std::{any::Any, ops};

use crate::{
errors::DeserializeError,
storage::patch::PatchSet,
types::{Manifest, Node, NodeKey, Root},
types::{Manifest, Node, NodeKey, ProfiledTreeOperation, Root},
};

/// Slice of node keys together with an indicator whether a node at the requested key is a leaf.
Expand Down Expand Up @@ -77,6 +77,9 @@ pub trait Database: Send + Sync {
.unwrap_or_else(|err| panic!("{err}"))
}

/// Starts profiling I/O operations and returns a thread-local guard to be dropped when profiling should be finished.
fn start_profiling(&self, operation: ProfiledTreeOperation) -> Box<dyn Any>;

/// Applies changes in the `patch` to this database. This operation should be atomic.
fn apply_patch(&mut self, patch: PatchSet);
}
Expand All @@ -98,6 +101,14 @@ impl<DB: Database + ?Sized> Database for &mut DB {
(**self).try_tree_node(key, is_leaf)
}

fn tree_nodes(&self, keys: &NodeKeys) -> Vec<Option<Node>> {
(**self).tree_nodes(keys)
}

fn start_profiling(&self, operation: ProfiledTreeOperation) -> Box<dyn Any> {
(**self).start_profiling(operation)
}

fn apply_patch(&mut self, patch: PatchSet) {
(**self).apply_patch(patch);
}
Expand Down Expand Up @@ -135,6 +146,10 @@ impl Database for PatchSet {
Ok(Some(node))
}

fn start_profiling(&self, _operation: ProfiledTreeOperation) -> Box<dyn Any> {
Box::new(()) // no stats are collected
}

fn apply_patch(&mut self, mut other: PatchSet) {
if let Some(other_updated_version) = other.updated_version {
if let Some(updated_version) = self.updated_version {
Expand Down Expand Up @@ -324,6 +339,10 @@ impl<DB: Database> Database for Patched<DB> {
values.collect()
}

fn start_profiling(&self, operation: ProfiledTreeOperation) -> Box<dyn Any> {
self.inner.start_profiling(operation)
}

fn apply_patch(&mut self, patch: PatchSet) {
if let Some(existing_patch) = &mut self.patch {
existing_patch.apply_patch(patch);
Expand Down
5 changes: 3 additions & 2 deletions core/lib/merkle_tree/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{
hasher::HashTree,
metrics::{TreeUpdaterStats, BLOCK_TIMINGS, GENERAL_METRICS},
types::{
BlockOutput, ChildRef, InternalNode, Key, LeafNode, Manifest, Nibbles, Node, Root,
TreeEntry, TreeLogEntry, TreeTags, ValueHash,
BlockOutput, ChildRef, InternalNode, Key, LeafNode, Manifest, Nibbles, Node,
ProfiledTreeOperation, Root, TreeEntry, TreeLogEntry, TreeTags, ValueHash,
},
};

Expand Down Expand Up @@ -89,6 +89,7 @@ impl TreeUpdater {
sorted_keys: &SortedKeys,
db: &DB,
) -> Vec<Nibbles> {
let _profiling_guard = db.start_profiling(ProfiledTreeOperation::LoadAncestors);
let LoadAncestorsResult {
longest_prefixes,
db_reads,
Expand Down
53 changes: 50 additions & 3 deletions core/lib/merkle_tree/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
//! RocksDB implementation of [`Database`].
use std::path::Path;
use std::{any::Any, cell::RefCell, path::Path, sync::Arc};

use rayon::prelude::*;
use zksync_storage::{db::NamedColumnFamily, rocksdb, rocksdb::DBPinnableSlice, RocksDB};
use thread_local::ThreadLocal;
use zksync_storage::{
db::{NamedColumnFamily, ProfileGuard, ProfiledOperation},
rocksdb,
rocksdb::DBPinnableSlice,
RocksDB,
};

use crate::{
errors::{DeserializeError, ErrorContext},
Expand All @@ -12,7 +18,10 @@ use crate::{
database::{PruneDatabase, PrunePatchSet},
Database, NodeKeys, PatchSet,
},
types::{InternalNode, LeafNode, Manifest, Nibbles, Node, NodeKey, Root, StaleNodeKey},
types::{
InternalNode, LeafNode, Manifest, Nibbles, Node, NodeKey, ProfiledTreeOperation, Root,
StaleNodeKey,
},
};

/// RocksDB column families used by the tree.
Expand Down Expand Up @@ -41,6 +50,8 @@ impl NamedColumnFamily for MerkleTreeColumnFamily {
}
}

type LocalProfiledOperation = RefCell<Option<Arc<ProfiledOperation>>>;

/// Main [`Database`] implementation wrapping a [`RocksDB`] reference.
///
/// # Cloning
Expand All @@ -56,6 +67,9 @@ impl NamedColumnFamily for MerkleTreeColumnFamily {
#[derive(Debug, Clone)]
pub struct RocksDBWrapper {
db: RocksDB<MerkleTreeColumnFamily>,
// We want to scope profiled operations both by the thread and by DB instance, hence the use of `ThreadLocal`
// struct (as opposed to `thread_local!` vars).
profiled_operation: Arc<ThreadLocal<LocalProfiledOperation>>,
multi_get_chunk_size: usize,
}

Expand Down Expand Up @@ -98,10 +112,19 @@ impl RocksDBWrapper {
}

fn raw_nodes(&self, keys: &NodeKeys) -> Vec<Option<DBPinnableSlice<'_>>> {
// Propagate the currently profiled operation to rayon threads used in the parallel iterator below.
let profiled_operation = self
.profiled_operation
.get()
.and_then(|cell| cell.borrow().clone());

// `par_chunks()` below uses `rayon` to speed up multi-get I/O;
// see `Self::set_multi_get_chunk_size()` docs for an explanation why this makes sense.
keys.par_chunks(self.multi_get_chunk_size)
.map(|chunk| {
let _guard = profiled_operation
.as_ref()
.and_then(ProfiledOperation::start_profiling);
let keys = chunk.iter().map(|(key, _)| key.to_db_key());
let results = self.db.multi_get_cf(MerkleTreeColumnFamily::Tree, keys);
results
Expand Down Expand Up @@ -143,6 +166,7 @@ impl From<RocksDB<MerkleTreeColumnFamily>> for RocksDBWrapper {
fn from(db: RocksDB<MerkleTreeColumnFamily>) -> Self {
Self {
db,
profiled_operation: Arc::new(ThreadLocal::new()),
multi_get_chunk_size: usize::MAX,
}
}
Expand Down Expand Up @@ -191,6 +215,29 @@ impl Database for RocksDBWrapper {
.unwrap_or_else(|err| panic!("{err}"))
}

fn start_profiling(&self, operation: ProfiledTreeOperation) -> Box<dyn Any> {
struct Guard {
profiled_operation: Arc<ThreadLocal<LocalProfiledOperation>>,
_guard: ProfileGuard,
}

impl Drop for Guard {
fn drop(&mut self) {
*self.profiled_operation.get_or_default().borrow_mut() = None;
}
}

let profiled_operation = Arc::new(self.db.new_profiled_operation(operation.as_str()));
let guard = profiled_operation.start_profiling().unwrap();
// ^ `unwrap()` is safe: the operation has just been created
*self.profiled_operation.get_or_default().borrow_mut() = Some(profiled_operation);
Box::new(Guard {
profiled_operation: self.profiled_operation.clone(),
_guard: guard,
})
}

#[allow(clippy::missing_panics_doc)]
fn apply_patch(&mut self, patch: PatchSet) {
let tree_cf = MerkleTreeColumnFamily::Tree;
let mut write_batch = self.db.new_write_batch();
Expand Down
22 changes: 22 additions & 0 deletions core/lib/merkle_tree/src/types/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,28 @@ impl StaleNodeKey {
}
}

/// Profiled Merkle tree operation used in `Database::start_profiling()`.
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub enum ProfiledTreeOperation {
/// Loading ancestors for nodes inserted or updated in the tree.
LoadAncestors,
/// Getting entries from the tree without Merkle proofs.
GetEntries,
/// Getting entries from the tree with Merkle proofs.
GetEntriesWithProofs,
}

impl ProfiledTreeOperation {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::LoadAncestors => "load_ancestors",
Self::GetEntries => "get_entries",
Self::GetEntriesWithProofs => "get_entries_with_proofs",
}
}
}

#[cfg(test)]
mod tests {
use zksync_types::U256;
Expand Down
4 changes: 3 additions & 1 deletion core/lib/merkle_tree/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use zksync_types::{H256, U256};
pub(crate) use self::internal::{
ChildRef, Nibbles, NibblesBytes, StaleNodeKey, TreeTags, HASH_SIZE, KEY_SIZE, TREE_DEPTH,
};
pub use self::internal::{InternalNode, LeafNode, Manifest, Node, NodeKey, Root};
pub use self::internal::{
InternalNode, LeafNode, Manifest, Node, NodeKey, ProfiledTreeOperation, Root,
};

mod internal;

Expand Down
1 change: 1 addition & 0 deletions core/lib/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ once_cell.workspace = true
rocksdb = { workspace = true, features = [
"snappy",
] }
thread_local.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
Loading

0 comments on commit 6bbfa06

Please sign in to comment.