From e0d4daa990c2563d2bd9a048d7350a545f136f00 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 30 Apr 2024 18:35:09 +0300 Subject: [PATCH] feat(en): Add pruning health checks and rework pruning config (#1790) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Adds health checks for pruning tasks. - Reworks the metadata calculator health check so that it's up-to-date in case of pruning. - Extends the snapshot recovery IT so that it tests pruning as well. ## Why ❔ - Pruning is an important aspect to test, and IT would allow to test it in (somewhat) realistic conditions. - Other changes are required for the integration test, but they also make sense in general. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. - [x] Linkcheck has been run via `zk linkcheck`. feat(db): Implement weak references to RocksDB test(en): Add integration test for pruning --- core/bin/external_node/src/config/mod.rs | 37 +- core/bin/external_node/src/init.rs | 5 +- core/bin/external_node/src/main.rs | 38 +- core/lib/health_check/src/lib.rs | 16 + core/lib/merkle_tree/src/domain.rs | 10 + core/lib/merkle_tree/src/pruning.rs | 17 +- core/lib/storage/src/db.rs | 61 ++- core/lib/storage/src/lib.rs | 2 +- .../src/api_server/execution_sandbox/mod.rs | 16 +- .../src/api_server/execution_sandbox/tests.rs | 12 +- .../zksync_core/src/api_server/web3/mod.rs | 13 +- core/lib/zksync_core/src/db_pruner/metrics.rs | 63 ++- core/lib/zksync_core/src/db_pruner/mod.rs | 416 ++++-------------- core/lib/zksync_core/src/db_pruner/tests.rs | 327 ++++++++++++++ core/lib/zksync_core/src/lib.rs | 2 +- .../src/metadata_calculator/helpers.rs | 89 +++- .../src/metadata_calculator/mod.rs | 20 +- .../src/metadata_calculator/pruning.rs | 107 ++++- .../src/metadata_calculator/tests.rs | 6 + .../src/metadata_calculator/updater.rs | 11 - .../layers/metadata_calculator.rs | 4 +- core/tests/revert-test/tests/tester.ts | 2 +- .../tests/snapshot-recovery.test.ts | 157 ++++++- 23 files changed, 1015 insertions(+), 416 deletions(-) create mode 100644 core/lib/zksync_core/src/db_pruner/tests.rs diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index f968e2054059..ec2ecdd68b4b 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -1,6 +1,6 @@ use std::{ env, fmt, - num::{NonZeroU32, NonZeroUsize}, + num::{NonZeroU32, NonZeroU64, NonZeroUsize}, str::FromStr, time::Duration, }; @@ -380,11 +380,24 @@ pub(crate) struct OptionalENConfig { #[serde(default = "OptionalENConfig::default_snapshots_recovery_postgres_max_concurrency")] pub snapshots_recovery_postgres_max_concurrency: NonZeroUsize, + /// Enables pruning of the historical node state (Postgres and Merkle tree). The node will retain + /// recent state and will continuously remove (prune) old enough parts of the state in the background. + #[serde(default)] + pub pruning_enabled: bool, + /// Number of L1 batches pruned at a time. #[serde(default = "OptionalENConfig::default_pruning_chunk_size")] pub pruning_chunk_size: u32, - - /// If set, l1 batches will be pruned after they are that long - pub pruning_data_retention_hours: Option, + /// Delta between soft- and hard-removing data from Postgres. Should be reasonably large (order of 60 seconds). + /// The default value is 60 seconds. + #[serde(default = "OptionalENConfig::default_pruning_removal_delay_sec")] + pruning_removal_delay_sec: NonZeroU64, + /// If set, L1 batches will be pruned after the batch timestamp is this old (in seconds). Note that an L1 batch + /// may be temporarily retained for other reasons; e.g., a batch cannot be pruned until it is executed on L1, + /// which happens roughly 24 hours after its generation on the mainnet. Thus, in practice this value can specify + /// the retention period greater than that implicitly imposed by other criteria (e.g., 7 or 30 days). + /// If set to 0, L1 batches will not be retained based on their timestamp. The default value is 1 hour. + #[serde(default = "OptionalENConfig::default_pruning_data_retention_sec")] + pruning_data_retention_sec: u64, } #[derive(Debug, Clone, PartialEq, Deserialize)] @@ -528,6 +541,14 @@ impl OptionalENConfig { 10 } + fn default_pruning_removal_delay_sec() -> NonZeroU64 { + NonZeroU64::new(60).unwrap() + } + + fn default_pruning_data_retention_sec() -> u64 { + 3_600 // 1 hour + } + pub fn polling_interval(&self) -> Duration { Duration::from_millis(self.polling_interval) } @@ -604,6 +625,14 @@ impl OptionalENConfig { Duration::from_millis(self.mempool_cache_update_interval) } + pub fn pruning_removal_delay(&self) -> Duration { + Duration::from_secs(self.pruning_removal_delay_sec.get()) + } + + pub fn pruning_data_retention(&self) -> Duration { + Duration::from_secs(self.pruning_data_retention_sec) + } + #[cfg(test)] fn mock() -> Self { // Set all values to their defaults diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index b53f4c550649..95eb2ba5d19c 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -80,9 +80,8 @@ pub(crate) async fn ensure_storage_initialized( InitDecision::SnapshotRecovery => { anyhow::ensure!( consider_snapshot_recovery, - "Snapshot recovery is required to proceed, but it is not enabled. Enable by supplying \ - `--enable-snapshots-recovery` command-line arg to the node binary, or reset the node storage \ - to sync from genesis" + "Snapshot recovery is required to proceed, but it is not enabled. Enable by setting \ + `EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery" ); tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk"); diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index c16c71f2eb03..f64de55f6d99 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -160,13 +160,24 @@ async fn run_tree( .await .context("failed creating DB pool for Merkle tree recovery")?; - let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None, tree_pool) - .await - .context("failed initializing metadata calculator")? - .with_recovery_pool(recovery_pool); + let mut metadata_calculator = + MetadataCalculator::new(metadata_calculator_config, None, tree_pool) + .await + .context("failed initializing metadata calculator")? + .with_recovery_pool(recovery_pool); let tree_reader = Arc::new(metadata_calculator.tree_reader()); - app_health.insert_component(metadata_calculator.tree_health_check())?; + app_health.insert_custom_component(Arc::new(metadata_calculator.tree_health_check()))?; + + if config.optional.pruning_enabled { + tracing::warn!("Proceeding with node state pruning for the Merkle tree. This is an experimental feature; use at your own risk"); + + let pruning_task = + metadata_calculator.pruning_task(config.optional.pruning_removal_delay() / 2); + app_health.insert_component(pruning_task.health_check())?; + let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver.clone())); + task_futures.push(pruning_task_handle); + } if let Some(api_config) = api_config { let address = (Ipv4Addr::UNSPECIFIED, api_config.port).into(); @@ -281,21 +292,22 @@ async fn run_core( } })); - if let Some(data_retention_hours) = config.optional.pruning_data_retention_hours { - let minimum_l1_batch_age = Duration::from_secs(3600 * data_retention_hours); + if config.optional.pruning_enabled { + tracing::warn!("Proceeding with node state pruning for Postgres. This is an experimental feature; use at your own risk"); + + let minimum_l1_batch_age = config.optional.pruning_data_retention(); tracing::info!( "Configured pruning of batches after they become {minimum_l1_batch_age:?} old" ); let db_pruner = DbPruner::new( DbPrunerConfig { - // don't change this value without adjusting API server pruning info cache max age - soft_and_hard_pruning_time_delta: Duration::from_secs(60), - next_iterations_delay: Duration::from_secs(30), + removal_delay: config.optional.pruning_removal_delay(), pruned_batch_chunk_size: config.optional.pruning_chunk_size, minimum_l1_batch_age, }, connection_pool.clone(), ); + app_health.insert_component(db_pruner.health_check())?; task_handles.push(tokio::spawn(db_pruner.run(stop_receiver.clone()))); } @@ -498,6 +510,10 @@ async fn run_api( mempool_cache_update_task.run(stop_receiver.clone()), )); + // The refresh interval should be several times lower than the pruning removal delay, so that + // soft-pruning will timely propagate to the API server. + let pruning_info_refresh_interval = config.optional.pruning_removal_delay() / 5; + if components.contains(&Component::HttpApi) { let mut builder = ApiBuilder::jsonrpsee_backend(config.clone().into(), connection_pool.clone()) @@ -505,6 +521,7 @@ async fn run_api( .with_filter_limit(config.optional.filters_limit) .with_batch_request_size_limit(config.optional.max_batch_request_size) .with_response_body_size_limit(config.optional.max_response_body_size()) + .with_pruning_info_refresh_interval(pruning_info_refresh_interval) .with_tx_sender(tx_sender.clone()) .with_vm_barrier(vm_barrier.clone()) .with_sync_state(sync_state.clone()) @@ -534,6 +551,7 @@ async fn run_api( .with_batch_request_size_limit(config.optional.max_batch_request_size) .with_response_body_size_limit(config.optional.max_response_body_size()) .with_polling_interval(config.optional.polling_interval()) + .with_pruning_info_refresh_interval(pruning_info_refresh_interval) .with_tx_sender(tx_sender) .with_vm_barrier(vm_barrier) .with_sync_state(sync_state) diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index 07f3af330765..8a3068d661d9 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -79,6 +79,11 @@ impl Health { pub fn status(&self) -> HealthStatus { self.status } + + /// Returns health details. Mostly useful for testing. + pub fn details(&self) -> Option<&serde_json::Value> { + self.details.as_ref() + } } impl From for Health { @@ -347,6 +352,17 @@ impl ReactiveHealthCheck { }; (this, updater) } + + /// Waits until the specified `condition` is true for the tracked [`Health`], and returns health. + /// Mostly useful for testing. + /// + /// If the health updater associated with this check is dropped, this method can wait indefinitely. + pub async fn wait_for(&mut self, condition: impl FnMut(&Health) -> bool) -> Health { + match self.health_receiver.wait_for(condition).await { + Ok(health) => health.clone(), + Err(_) => future::pending().await, + } + } } #[async_trait] diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index 68bb1273708f..ecd9b4c1fbe4 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -365,6 +365,16 @@ impl Clone for ZkSyncTreeReader { } impl ZkSyncTreeReader { + /// Creates a tree reader based on the provided database. + pub fn new(db: RocksDBWrapper) -> Self { + Self(MerkleTree::new(db)) + } + + /// Returns a reference to the database this. + pub fn db(&self) -> &RocksDBWrapper { + &self.0.db + } + /// Returns the current root hash of this tree. pub fn root_hash(&self) -> ValueHash { self.0.latest_root_hash() diff --git a/core/lib/merkle_tree/src/pruning.rs b/core/lib/merkle_tree/src/pruning.rs index 9e99098cbfd4..5ab578621005 100644 --- a/core/lib/merkle_tree/src/pruning.rs +++ b/core/lib/merkle_tree/src/pruning.rs @@ -138,13 +138,13 @@ impl MerkleTreePruner { // We must retain at least one tree version. let last_prunable_version = self.last_prunable_version(); if last_prunable_version.is_none() { - tracing::info!("Nothing to prune; skipping"); + tracing::debug!("Nothing to prune; skipping"); return None; } let target_retained_version = last_prunable_version?.min(target_retained_version); let stale_key_new_versions = min_stale_key_version..=target_retained_version; if stale_key_new_versions.is_empty() { - tracing::info!( + tracing::debug!( "No Merkle tree versions can be pruned; min stale key version is {min_stale_key_version}, \ target retained version is {target_retained_version}" ); @@ -165,7 +165,7 @@ impl MerkleTreePruner { load_stale_keys_latency.observe(); if pruned_keys.is_empty() { - tracing::info!("No stale keys to remove; skipping"); + tracing::debug!("No stale keys to remove; skipping"); return None; } let deleted_stale_key_versions = min_stale_key_version..(max_stale_key_version + 1); @@ -203,21 +203,24 @@ impl MerkleTreePruner { let mut wait_interval = Duration::ZERO; while !self.wait_for_abort(wait_interval) { let retained_version = self.target_retained_version.load(Ordering::Relaxed); - if let Some(stats) = self.prune_up_to(retained_version) { + wait_interval = if let Some(stats) = self.prune_up_to(retained_version) { tracing::debug!( "Performed pruning for target retained version {retained_version}: {stats:?}" ); stats.report(); if stats.has_more_work() { - continue; + // Continue pruning right away instead of waiting for abort. + Duration::ZERO + } else { + self.poll_interval } } else { tracing::debug!( "Pruning was not performed; waiting {:?}", self.poll_interval ); - } - wait_interval = self.poll_interval; + self.poll_interval + }; } } } diff --git a/core/lib/storage/src/db.rs b/core/lib/storage/src/db.rs index 583d612023ea..e16a3580ac1c 100644 --- a/core/lib/storage/src/db.rs +++ b/core/lib/storage/src/db.rs @@ -9,7 +9,7 @@ use std::{ path::Path, sync::{ atomic::{AtomicU64, Ordering}, - Arc, Condvar, Mutex, + Arc, Condvar, Mutex, Weak, }, thread, time::{Duration, Instant}, @@ -314,8 +314,8 @@ impl Default for RocksDBOptions { /// Thin wrapper around a RocksDB instance. /// -/// The wrapper is cheaply cloneable (internally, it wraps a DB instance in an [`Arc`]). -#[derive(Debug, Clone)] +/// The wrapper is cheaply cloneable; internally, it wraps a DB instance in an [`Arc`]. +#[derive(Debug)] pub struct RocksDB { inner: Arc, sync_writes: bool, @@ -323,6 +323,17 @@ pub struct RocksDB { _cf: PhantomData, } +impl Clone for RocksDB { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + sync_writes: self.sync_writes, + stalled_writes_retries: self.stalled_writes_retries, + _cf: PhantomData, + } + } +} + impl RocksDB { pub fn new(path: &Path) -> Result { Self::with_options(path, RocksDBOptions::default()) @@ -448,6 +459,15 @@ impl RocksDB { options } + pub fn downgrade(&self) -> WeakRocksDB { + WeakRocksDB { + inner: Arc::downgrade(&self.inner), + sync_writes: self.sync_writes, + stalled_writes_retries: self.stalled_writes_retries, + _cf: PhantomData, + } + } + pub fn estimated_number_of_entries(&self, cf: CF) -> u64 { const ERROR_MSG: &str = "failed to get estimated number of entries"; @@ -628,6 +648,41 @@ impl RocksDB<()> { } } +/// Weak reference to a RocksDB instance. Doesn't prevent dropping the underlying instance; +/// to work with it, you should [upgrade](Self::upgrade()) the reference first. +/// +/// The wrapper is cheaply cloneable; internally, it wraps a DB instance in a [`Weak`]. +#[derive(Debug)] +pub struct WeakRocksDB { + inner: Weak, + sync_writes: bool, + stalled_writes_retries: StalledWritesRetries, + _cf: PhantomData, +} + +impl Clone for WeakRocksDB { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + sync_writes: self.sync_writes, + stalled_writes_retries: self.stalled_writes_retries, + _cf: PhantomData, + } + } +} + +impl WeakRocksDB { + /// Tries to upgrade to a strong reference to RocksDB. If the RocksDB instance has been dropped, returns `None`. + pub fn upgrade(&self) -> Option> { + Some(RocksDB { + inner: self.inner.upgrade()?, + sync_writes: self.sync_writes, + stalled_writes_retries: self.stalled_writes_retries, + _cf: PhantomData, + }) + } +} + /// Profiling information for a logical I/O operation on RocksDB. Can be used to profile operations /// distributed in time, including on multiple threads. #[must_use = "`start_profiling()` should be called one or more times to actually perform profiling"] diff --git a/core/lib/storage/src/lib.rs b/core/lib/storage/src/lib.rs index 729a547981d0..22d44b26cf38 100644 --- a/core/lib/storage/src/lib.rs +++ b/core/lib/storage/src/lib.rs @@ -1,5 +1,5 @@ pub mod db; mod metrics; -pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries}; +pub use db::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB}; pub use rocksdb; diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs index c1659c9fd25e..42de8e285d91 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs @@ -197,12 +197,11 @@ struct BlockStartInfoInner { } impl BlockStartInfoInner { - const MAX_CACHE_AGE: Duration = Duration::from_secs(20); // We make max age a bit random so that all threads don't start refreshing cache at the same time const MAX_RANDOM_DELAY: Duration = Duration::from_millis(100); - fn is_expired(&self, now: Instant) -> bool { - if let Some(expired_for) = (now - self.cached_at).checked_sub(Self::MAX_CACHE_AGE) { + fn is_expired(&self, now: Instant, max_cache_age: Duration) -> bool { + if let Some(expired_for) = (now - self.cached_at).checked_sub(max_cache_age) { if expired_for > Self::MAX_RANDOM_DELAY { return true; // The cache is definitely expired, regardless of the randomness below } @@ -218,16 +217,21 @@ impl BlockStartInfoInner { #[derive(Debug, Clone)] pub(crate) struct BlockStartInfo { cached_pruning_info: Arc>, + max_cache_age: Duration, } impl BlockStartInfo { - pub async fn new(storage: &mut Connection<'_, Core>) -> anyhow::Result { + pub async fn new( + storage: &mut Connection<'_, Core>, + max_cache_age: Duration, + ) -> anyhow::Result { let info = storage.pruning_dal().get_pruning_info().await?; Ok(Self { cached_pruning_info: Arc::new(RwLock::new(BlockStartInfoInner { info, cached_at: Instant::now(), })), + max_cache_age, }) } @@ -248,7 +252,7 @@ impl BlockStartInfo { let mut new_cached_pruning_info = self .cached_pruning_info .write() - .expect("BlockStartInfo is poisoned"); + .map_err(|_| anyhow::anyhow!("BlockStartInfo is poisoned"))?; Ok(if new_cached_pruning_info.cached_at < now { *new_cached_pruning_info = BlockStartInfoInner { info, @@ -267,7 +271,7 @@ impl BlockStartInfo { ) -> anyhow::Result { let inner = self.copy_inner(); let now = Instant::now(); - if inner.is_expired(now) { + if inner.is_expired(now, self.max_cache_age) { // Multiple threads may execute this query if we're very unlucky self.update_cache(storage, now).await } else { diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs index 9ba988911298..1eb427147ab6 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs @@ -32,7 +32,9 @@ async fn creating_block_args() { assert_eq!(pending_block_args.resolved_block_number, L2BlockNumber(2)); assert_eq!(pending_block_args.l1_batch_timestamp_s, None); - let start_info = BlockStartInfo::new(&mut storage).await.unwrap(); + let start_info = BlockStartInfo::new(&mut storage, Duration::MAX) + .await + .unwrap(); assert_eq!( start_info.first_l2_block(&mut storage).await.unwrap(), L2BlockNumber(0) @@ -86,7 +88,9 @@ async fn creating_block_args_after_snapshot_recovery() { ); assert_eq!(pending_block_args.l1_batch_timestamp_s, None); - let start_info = BlockStartInfo::new(&mut storage).await.unwrap(); + let start_info = BlockStartInfo::new(&mut storage, Duration::MAX) + .await + .unwrap(); assert_eq!( start_info.first_l2_block(&mut storage).await.unwrap(), snapshot_recovery.l2_block_number + 1 @@ -170,7 +174,9 @@ async fn instantiating_vm() { let block_args = BlockArgs::pending(&mut storage).await.unwrap(); test_instantiating_vm(pool.clone(), block_args).await; - let start_info = BlockStartInfo::new(&mut storage).await.unwrap(); + let start_info = BlockStartInfo::new(&mut storage, Duration::MAX) + .await + .unwrap(); let block_args = BlockArgs::new(&mut storage, api::BlockId::Number(0.into()), &start_info) .await .unwrap(); diff --git a/core/lib/zksync_core/src/api_server/web3/mod.rs b/core/lib/zksync_core/src/api_server/web3/mod.rs index 12fd0d8e2727..00050fb85812 100644 --- a/core/lib/zksync_core/src/api_server/web3/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/mod.rs @@ -148,6 +148,7 @@ pub struct ApiServer { transport: ApiTransport, tx_sender: TxSender, polling_interval: Duration, + pruning_info_refresh_interval: Duration, namespaces: Vec, method_tracer: Arc, optional: OptionalApiParams, @@ -159,6 +160,7 @@ pub struct ApiBuilder { updaters_pool: ConnectionPool, config: InternalApiConfig, polling_interval: Duration, + pruning_info_refresh_interval: Duration, // Mandatory params that must be set using builder methods. transport: Option, tx_sender: Option, @@ -171,6 +173,7 @@ pub struct ApiBuilder { impl ApiBuilder { const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_millis(200); + const DEFAULT_PRUNING_INFO_REFRESH_INTERVAL: Duration = Duration::from_secs(10); pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool) -> Self { Self { @@ -178,6 +181,7 @@ impl ApiBuilder { pool, config, polling_interval: Self::DEFAULT_POLLING_INTERVAL, + pruning_info_refresh_interval: Self::DEFAULT_PRUNING_INFO_REFRESH_INTERVAL, transport: None, tx_sender: None, namespaces: None, @@ -254,6 +258,11 @@ impl ApiBuilder { self } + pub fn with_pruning_info_refresh_interval(mut self, interval: Duration) -> Self { + self.pruning_info_refresh_interval = interval; + self + } + pub fn enable_api_namespaces(mut self, namespaces: Vec) -> Self { self.namespaces = Some(namespaces); self @@ -305,6 +314,7 @@ impl ApiBuilder { transport, tx_sender: self.tx_sender.context("Transaction sender not set")?, polling_interval: self.polling_interval, + pruning_info_refresh_interval: self.pruning_info_refresh_interval, namespaces: self.namespaces.unwrap_or_else(|| { tracing::warn!( "debug_ and snapshots_ API namespace will be disabled by default in ApiBuilder" @@ -327,7 +337,8 @@ impl ApiServer { last_sealed_l2_block: SealedL2BlockNumber, ) -> anyhow::Result { let mut storage = self.updaters_pool.connection_tagged("api").await?; - let start_info = BlockStartInfo::new(&mut storage).await?; + let start_info = + BlockStartInfo::new(&mut storage, self.pruning_info_refresh_interval).await?; drop(storage); // Disable filter API for HTTP endpoints, WS endpoints are unaffected by the `filters_disabled` flag diff --git a/core/lib/zksync_core/src/db_pruner/metrics.rs b/core/lib/zksync_core/src/db_pruner/metrics.rs index 1011d820517b..73bcefd041dd 100644 --- a/core/lib/zksync_core/src/db_pruner/metrics.rs +++ b/core/lib/zksync_core/src/db_pruner/metrics.rs @@ -1,24 +1,75 @@ use std::time::Duration; use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit}; +use zksync_dal::pruning_dal::HardPruningStats; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "prune_type", rename_all = "snake_case")] -pub(crate) enum MetricPruneType { +pub(super) enum MetricPruneType { Soft, Hard, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "type", rename_all = "snake_case")] +enum PrunedEntityType { + L1Batch, + L2Block, + StorageLogFromPrunedBatch, + StorageLogFromPastBatch, + Event, + L2ToL1Log, + CallTrace, +} + +const ENTITY_COUNT_BUCKETS: Buckets = Buckets::values(&[ + 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1_000.0, 2_000.0, 5_000.0, 10_000.0, +]); + #[derive(Debug, Metrics)] #[metrics(prefix = "db_pruner")] -pub(crate) struct DbPrunerMetrics { - /// Total latency of pruning chunk of l1 batches. +pub(super) struct DbPrunerMetrics { + /// Total latency of pruning chunk of L1 batches. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub pruning_chunk_duration: Family>, - - /// Number of not-pruned l1 batches + /// Number of not-pruned L1 batches. pub not_pruned_l1_batches_count: Gauge, + /// Number of entities deleted during a single hard pruning iteration, grouped by entity type. + #[metrics(buckets = ENTITY_COUNT_BUCKETS)] + deleted_entities: Family>, +} + +impl DbPrunerMetrics { + pub fn observe_hard_pruning(&self, stats: HardPruningStats) { + let HardPruningStats { + deleted_l1_batches, + deleted_l2_blocks, + deleted_storage_logs_from_past_batches, + deleted_storage_logs_from_pruned_batches, + deleted_events, + deleted_call_traces, + deleted_l2_to_l1_logs, + } = stats; + let deleted_storage_logs = + deleted_storage_logs_from_past_batches + deleted_storage_logs_from_pruned_batches; + tracing::info!( + "Performed pruning of database, deleted {deleted_l1_batches} L1 batches, {deleted_l2_blocks} L2 blocks, \ + {deleted_storage_logs} storage logs ({deleted_storage_logs_from_pruned_batches} from pruned batches + \ + {deleted_storage_logs_from_past_batches} from past batches), \ + {deleted_events} events, {deleted_call_traces} call traces, {deleted_l2_to_l1_logs} L2-to-L1 logs" + ); + + self.deleted_entities[&PrunedEntityType::L1Batch].observe(deleted_l1_batches); + self.deleted_entities[&PrunedEntityType::L2Block].observe(deleted_l2_blocks); + self.deleted_entities[&PrunedEntityType::StorageLogFromPastBatch] + .observe(deleted_storage_logs_from_past_batches); + self.deleted_entities[&PrunedEntityType::StorageLogFromPrunedBatch] + .observe(deleted_storage_logs_from_pruned_batches); + self.deleted_entities[&PrunedEntityType::Event].observe(deleted_events); + self.deleted_entities[&PrunedEntityType::L2ToL1Log].observe(deleted_l2_to_l1_logs); + self.deleted_entities[&PrunedEntityType::CallTrace].observe(deleted_call_traces); + } } #[vise::register] -pub(crate) static METRICS: vise::Global = vise::Global::new(); +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/zksync_core/src/db_pruner/mod.rs b/core/lib/zksync_core/src/db_pruner/mod.rs index 8024cc93cff9..4b7eec11ad58 100644 --- a/core/lib/zksync_core/src/db_pruner/mod.rs +++ b/core/lib/zksync_core/src/db_pruner/mod.rs @@ -4,9 +4,11 @@ use std::{fmt, sync::Arc, time::Duration}; use anyhow::Context as _; use async_trait::async_trait; +use serde::Serialize; use tokio::sync::watch; -use zksync_dal::{pruning_dal::HardPruningStats, Connection, ConnectionPool, Core, CoreDal}; -use zksync_types::L1BatchNumber; +use zksync_dal::{pruning_dal::PruningInfo, Connection, ConnectionPool, Core, CoreDal}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; +use zksync_types::{L1BatchNumber, L2BlockNumber}; use self::{ metrics::{MetricPruneType, METRICS}, @@ -18,14 +20,14 @@ use self::{ mod metrics; mod prune_conditions; +#[cfg(test)] +mod tests; /// Configuration #[derive(Debug)] pub struct DbPrunerConfig { /// Delta between soft- and hard-removing data from Postgres. - pub soft_and_hard_pruning_time_delta: Duration, - /// Sleep interval between pruning iterations. - pub next_iterations_delay: Duration, + pub removal_delay: Duration, /// Number of L1 batches pruned at a time. The pruner will do nothing if there is less than this number /// of batches to prune. pub pruned_batch_chunk_size: u32, @@ -34,11 +36,35 @@ pub struct DbPrunerConfig { pub minimum_l1_batch_age: Duration, } +#[derive(Debug, Serialize)] +struct DbPrunerHealth { + #[serde(skip_serializing_if = "Option::is_none")] + last_soft_pruned_l1_batch: Option, + #[serde(skip_serializing_if = "Option::is_none")] + last_soft_pruned_l2_block: Option, + #[serde(skip_serializing_if = "Option::is_none")] + last_hard_pruned_l1_batch: Option, + #[serde(skip_serializing_if = "Option::is_none")] + last_hard_pruned_l2_block: Option, +} + +impl From for DbPrunerHealth { + fn from(info: PruningInfo) -> Self { + Self { + last_soft_pruned_l1_batch: info.last_soft_pruned_l1_batch, + last_soft_pruned_l2_block: info.last_soft_pruned_l2_block, + last_hard_pruned_l1_batch: info.last_hard_pruned_l1_batch, + last_hard_pruned_l2_block: info.last_hard_pruned_l2_block, + } + } +} + /// Postgres database pruning component. #[derive(Debug)] pub struct DbPruner { config: DbPrunerConfig, connection_pool: ConnectionPool, + health_updater: HealthUpdater, prune_conditions: Vec>, } @@ -50,7 +76,7 @@ trait PruneCondition: fmt::Debug + fmt::Display + Send + Sync + 'static { impl DbPruner { pub fn new(config: DbPrunerConfig, connection_pool: ConnectionPool) -> Self { - let conditions: Vec> = vec![ + let mut conditions: Vec> = vec![ Arc::new(L1BatchExistsCondition { conn: connection_pool.clone(), }), @@ -60,14 +86,18 @@ impl DbPruner { Arc::new(NextL1BatchWasExecutedCondition { conn: connection_pool.clone(), }), - Arc::new(L1BatchOlderThanPruneCondition { - minimum_age: config.minimum_l1_batch_age, - conn: connection_pool.clone(), - }), Arc::new(ConsistencyCheckerProcessedBatch { conn: connection_pool.clone(), }), ]; + if config.minimum_l1_batch_age > Duration::ZERO { + // Do not add a condition if it's trivial in order to not clutter logs. + conditions.push(Arc::new(L1BatchOlderThanPruneCondition { + minimum_age: config.minimum_l1_batch_age, + conn: connection_pool.clone(), + })); + } + Self::with_conditions(config, connection_pool, conditions) } @@ -79,10 +109,15 @@ impl DbPruner { Self { config, connection_pool, + health_updater: ReactiveHealthCheck::new("db_pruner").1, prune_conditions, } } + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } + async fn is_l1_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> bool { let mut successful_conditions = vec![]; let mut failed_conditions = vec![]; @@ -100,11 +135,11 @@ impl DbPruner { } let result = failed_conditions.is_empty() && errored_conditions.is_empty(); if !result { - tracing::info!( - "Pruning l1 batch {l1_batch_number} is not possible, \ - successful conditions: {successful_conditions:?}, \ - failed conditions: {failed_conditions:?}, \ - errored_conditions: {errored_conditions:?}" + tracing::debug!( + "Pruning L1 batch {l1_batch_number} is not possible, \ + successful conditions: {successful_conditions:?}, \ + failed conditions: {failed_conditions:?}, \ + errored conditions: {errored_conditions:?}" ); } result @@ -127,11 +162,16 @@ impl DbPruner { Ok(()) } + fn update_health(&self, info: PruningInfo) { + let health = Health::from(HealthStatus::Ready).with_details(DbPrunerHealth::from(info)); + self.health_updater.update(health); + } + async fn soft_prune(&self, storage: &mut Connection<'_, Core>) -> anyhow::Result { let latency = METRICS.pruning_chunk_duration[&MetricPruneType::Soft].start(); let mut transaction = storage.start_transaction().await?; - let current_pruning_info = transaction.pruning_dal().get_pruning_info().await?; + let mut current_pruning_info = transaction.pruning_dal().get_pruning_info().await?; let next_l1_batch_to_prune = L1BatchNumber( current_pruning_info .last_soft_pruned_l1_batch @@ -161,6 +201,9 @@ impl DbPruner { "Soft pruned db l1_batches up to {next_l1_batch_to_prune} and L2 blocks up to {next_l2_block_to_prune}, operation took {latency:?}", ); + current_pruning_info.last_soft_pruned_l1_batch = Some(next_l1_batch_to_prune); + current_pruning_info.last_soft_pruned_l2_block = Some(next_l2_block_to_prune); + self.update_health(current_pruning_info); Ok(true) } @@ -168,7 +211,7 @@ impl DbPruner { let latency = METRICS.pruning_chunk_duration[&MetricPruneType::Hard].start(); let mut transaction = storage.start_transaction().await?; - let current_pruning_info = transaction.pruning_dal().get_pruning_info().await?; + let mut current_pruning_info = transaction.pruning_dal().get_pruning_info().await?; let last_soft_pruned_l1_batch = current_pruning_info.last_soft_pruned_l1_batch.with_context(|| { format!("bogus pruning info {current_pruning_info:?}: trying to hard-prune data, but there is no soft-pruned L1 batch") @@ -182,7 +225,7 @@ impl DbPruner { .pruning_dal() .hard_prune_batches_range(last_soft_pruned_l1_batch, last_soft_pruned_l2_block) .await?; - Self::report_hard_pruning_stats(stats); + METRICS.observe_hard_pruning(stats); transaction.commit().await?; let mut storage = self.connection_pool.connection_tagged("db_pruner").await?; @@ -196,32 +239,16 @@ impl DbPruner { "Hard pruned db l1_batches up to {last_soft_pruned_l1_batch} and L2 blocks up to {last_soft_pruned_l2_block}, \ operation took {latency:?}" ); + current_pruning_info.last_hard_pruned_l1_batch = Some(last_soft_pruned_l1_batch); + current_pruning_info.last_hard_pruned_l2_block = Some(last_soft_pruned_l2_block); + self.update_health(current_pruning_info); Ok(()) } - fn report_hard_pruning_stats(stats: HardPruningStats) { - let HardPruningStats { - deleted_l1_batches, - deleted_l2_blocks, - deleted_storage_logs_from_past_batches, - deleted_storage_logs_from_pruned_batches, - deleted_events, - deleted_call_traces, - deleted_l2_to_l1_logs, - } = stats; - let deleted_storage_logs = - deleted_storage_logs_from_past_batches + deleted_storage_logs_from_pruned_batches; - tracing::info!( - "Performed pruning of database, deleted {deleted_l1_batches} L1 batches, {deleted_l2_blocks} L2 blocks, \ - {deleted_storage_logs} storage logs ({deleted_storage_logs_from_pruned_batches} from pruned batches + \ - {deleted_storage_logs_from_past_batches} from past batches), \ - {deleted_events} events, {deleted_call_traces} call traces, {deleted_l2_to_l1_logs} L2-to-L1 logs" - ); - } - async fn run_single_iteration(&self) -> anyhow::Result { let mut storage = self.connection_pool.connection_tagged("db_pruner").await?; let current_pruning_info = storage.pruning_dal().get_pruning_info().await?; + self.update_health(current_pruning_info); // If this `if` is not entered, it means that the node has restarted after soft pruning if current_pruning_info.last_soft_pruned_l1_batch @@ -234,12 +261,23 @@ impl DbPruner { } drop(storage); // Don't hold a connection across a timeout - tokio::time::sleep(self.config.soft_and_hard_pruning_time_delta).await; + tokio::time::sleep(self.config.removal_delay).await; let mut storage = self.connection_pool.connection_tagged("db_pruner").await?; self.hard_prune(&mut storage).await?; Ok(true) } + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let next_iteration_delay = self.config.removal_delay / 2; + tracing::info!( + "Starting Postgres pruning with configuration {:?}, prune conditions {:?}", + self.config, + self.prune_conditions + .iter() + .map(ToString::to_string) + .collect::>() + ); + while !*stop_receiver.borrow_and_update() { if let Err(err) = self.update_l1_batches_metric().await { tracing::warn!("Error updating DB pruning metrics: {err:?}"); @@ -249,16 +287,20 @@ impl DbPruner { Err(err) => { // As this component is not really mission-critical, all errors are generally ignored tracing::warn!( - "Pruning error, retrying in {:?}, error was: {err:?}", - self.config.next_iterations_delay + "Pruning error, retrying in {next_iteration_delay:?}, error was: {err:?}" ); + let health = + Health::from(HealthStatus::Affected).with_details(serde_json::json!({ + "error": err.to_string(), + })); + self.health_updater.update(health); true } Ok(pruning_done) => !pruning_done, }; if should_sleep - && tokio::time::timeout(self.config.next_iterations_delay, stop_receiver.changed()) + && tokio::time::timeout(next_iteration_delay, stop_receiver.changed()) .await .is_ok() { @@ -271,293 +313,3 @@ impl DbPruner { Ok(()) } } - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use anyhow::anyhow; - use test_log::test; - use zksync_dal::pruning_dal::PruningInfo; - use zksync_db_connection::connection::Connection; - use zksync_types::{L2BlockNumber, ProtocolVersion}; - - use super::*; - use crate::utils::testonly::create_l2_block; - - #[derive(Debug)] - struct ConditionMock { - pub name: &'static str, - pub is_batch_prunable_responses: HashMap, - } - - impl ConditionMock { - fn name(name: &'static str) -> ConditionMock { - Self { - name, - is_batch_prunable_responses: HashMap::default(), - } - } - - fn with_response(mut self, l1_batch_number: L1BatchNumber, value: bool) -> Self { - self.is_batch_prunable_responses - .insert(l1_batch_number, value); - self - } - } - - impl fmt::Display for ConditionMock { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(formatter, "{}", self.name) - } - } - - #[async_trait] - impl PruneCondition for ConditionMock { - async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result { - if !self - .is_batch_prunable_responses - .contains_key(&l1_batch_number) - { - return Err(anyhow!("Error!")); - } - Ok(self - .is_batch_prunable_responses - .get(&l1_batch_number) - .cloned() - .unwrap()) - } - } - - #[test(tokio::test)] - async fn is_l1_batch_prunable_works() { - let failing_check = Arc::new( - ConditionMock::name("some failing some passing1") - .with_response(L1BatchNumber(1), true) - .with_response(L1BatchNumber(2), true) - .with_response(L1BatchNumber(3), false) - .with_response(L1BatchNumber(4), true), - ); - let other_failing_check = Arc::new( - ConditionMock::name("some failing some passing2") - .with_response(L1BatchNumber(2), false) - .with_response(L1BatchNumber(3), true) - .with_response(L1BatchNumber(4), true), - ); - let pruner = DbPruner::with_conditions( - DbPrunerConfig { - soft_and_hard_pruning_time_delta: Duration::ZERO, - pruned_batch_chunk_size: 1, - next_iterations_delay: Duration::ZERO, - minimum_l1_batch_age: Duration::ZERO, - }, - ConnectionPool::test_pool().await, - vec![failing_check, other_failing_check], - ); - // first check succeeds, but second returns an error - assert!(!pruner.is_l1_batch_prunable(L1BatchNumber(1)).await); - // second check fails - assert!(!pruner.is_l1_batch_prunable(L1BatchNumber(2)).await); - // first check fails - assert!(!pruner.is_l1_batch_prunable(L1BatchNumber(3)).await); - - assert!(pruner.is_l1_batch_prunable(L1BatchNumber(4)).await); - } - - async fn insert_l2_blocks( - conn: &mut Connection<'_, Core>, - l1_batches_count: u32, - l2_blocks_per_batch: u32, - ) { - conn.protocol_versions_dal() - .save_protocol_version_with_tx(&ProtocolVersion::default()) - .await - .unwrap(); - - for l1_batch_number in 0..l1_batches_count { - for l2_block_index in 0..l2_blocks_per_batch { - let l2_block_number = l1_batch_number * l2_blocks_per_batch + l2_block_index; - let l2_block_header = create_l2_block(l2_block_number); - - conn.blocks_dal() - .insert_l2_block(&l2_block_header) - .await - .unwrap(); - conn.blocks_dal() - .mark_l2_blocks_as_executed_in_l1_batch(L1BatchNumber(l1_batch_number)) - .await - .unwrap(); - } - } - } - - #[test(tokio::test)] - async fn hard_pruning_ignores_conditions_checks() { - let pool = ConnectionPool::::test_pool().await; - let mut conn = pool.connection().await.unwrap(); - - insert_l2_blocks(&mut conn, 10, 2).await; - conn.pruning_dal() - .soft_prune_batches_range(L1BatchNumber(2), L2BlockNumber(5)) - .await - .unwrap(); - - let nothing_prunable_check = Arc::new(ConditionMock::name("nothing prunable")); - let pruner = DbPruner::with_conditions( - DbPrunerConfig { - soft_and_hard_pruning_time_delta: Duration::ZERO, - pruned_batch_chunk_size: 5, - next_iterations_delay: Duration::ZERO, - minimum_l1_batch_age: Duration::ZERO, - }, - pool.clone(), - vec![nothing_prunable_check], - ); - - pruner.run_single_iteration().await.unwrap(); - - assert_eq!( - PruningInfo { - last_soft_pruned_l1_batch: Some(L1BatchNumber(2)), - last_soft_pruned_l2_block: Some(L2BlockNumber(5)), - last_hard_pruned_l1_batch: Some(L1BatchNumber(2)), - last_hard_pruned_l2_block: Some(L2BlockNumber(5)), - }, - conn.pruning_dal().get_pruning_info().await.unwrap() - ); - } - #[test(tokio::test)] - async fn pruner_should_catch_up_with_hard_pruning_up_to_soft_pruning_boundary_ignoring_chunk_size( - ) { - let pool = ConnectionPool::::test_pool().await; - let mut conn = pool.connection().await.unwrap(); - - insert_l2_blocks(&mut conn, 10, 2).await; - conn.pruning_dal() - .soft_prune_batches_range(L1BatchNumber(2), L2BlockNumber(5)) - .await - .unwrap(); - let pruner = DbPruner::with_conditions( - DbPrunerConfig { - soft_and_hard_pruning_time_delta: Duration::ZERO, - pruned_batch_chunk_size: 5, - next_iterations_delay: Duration::ZERO, - minimum_l1_batch_age: Duration::ZERO, - }, - pool.clone(), - vec![], //No checks, so every batch is prunable - ); - - pruner.run_single_iteration().await.unwrap(); - - assert_eq!( - PruningInfo { - last_soft_pruned_l1_batch: Some(L1BatchNumber(2)), - last_soft_pruned_l2_block: Some(L2BlockNumber(5)), - last_hard_pruned_l1_batch: Some(L1BatchNumber(2)), - last_hard_pruned_l2_block: Some(L2BlockNumber(5)), - }, - conn.pruning_dal().get_pruning_info().await.unwrap() - ); - - pruner.run_single_iteration().await.unwrap(); - assert_eq!( - PruningInfo { - last_soft_pruned_l1_batch: Some(L1BatchNumber(7)), - last_soft_pruned_l2_block: Some(L2BlockNumber(15)), - last_hard_pruned_l1_batch: Some(L1BatchNumber(7)), - last_hard_pruned_l2_block: Some(L2BlockNumber(15)), - }, - conn.pruning_dal().get_pruning_info().await.unwrap() - ); - } - - #[test(tokio::test)] - async fn unconstrained_pruner_with_fresh_database() { - let pool = ConnectionPool::::test_pool().await; - let mut conn = pool.connection().await.unwrap(); - - insert_l2_blocks(&mut conn, 10, 2).await; - - let pruner = DbPruner::with_conditions( - DbPrunerConfig { - soft_and_hard_pruning_time_delta: Duration::ZERO, - pruned_batch_chunk_size: 3, - next_iterations_delay: Duration::ZERO, - minimum_l1_batch_age: Duration::ZERO, - }, - pool.clone(), - vec![], //No checks, so every batch is prunable - ); - - pruner.run_single_iteration().await.unwrap(); - - assert_eq!( - PruningInfo { - last_soft_pruned_l1_batch: Some(L1BatchNumber(3)), - last_soft_pruned_l2_block: Some(L2BlockNumber(7)), - last_hard_pruned_l1_batch: Some(L1BatchNumber(3)), - last_hard_pruned_l2_block: Some(L2BlockNumber(7)), - }, - conn.pruning_dal().get_pruning_info().await.unwrap() - ); - - pruner.run_single_iteration().await.unwrap(); - assert_eq!( - PruningInfo { - last_soft_pruned_l1_batch: Some(L1BatchNumber(6)), - last_soft_pruned_l2_block: Some(L2BlockNumber(13)), - last_hard_pruned_l1_batch: Some(L1BatchNumber(6)), - last_hard_pruned_l2_block: Some(L2BlockNumber(13)), - }, - conn.pruning_dal().get_pruning_info().await.unwrap() - ); - } - - #[test(tokio::test)] - async fn pruning_blocked_after_first_chunk() { - let pool = ConnectionPool::::test_pool().await; - let mut conn = pool.connection().await.unwrap(); - - insert_l2_blocks(&mut conn, 10, 2).await; - - let first_chunk_prunable_check = Arc::new( - ConditionMock::name("first chunk prunable").with_response(L1BatchNumber(3), true), - ); - - let pruner = DbPruner::with_conditions( - DbPrunerConfig { - soft_and_hard_pruning_time_delta: Duration::ZERO, - pruned_batch_chunk_size: 3, - next_iterations_delay: Duration::ZERO, - minimum_l1_batch_age: Duration::ZERO, - }, - pool.clone(), - vec![first_chunk_prunable_check], - ); - - pruner.run_single_iteration().await.unwrap(); - - assert_eq!( - PruningInfo { - last_soft_pruned_l1_batch: Some(L1BatchNumber(3)), - last_soft_pruned_l2_block: Some(L2BlockNumber(7)), - last_hard_pruned_l1_batch: Some(L1BatchNumber(3)), - last_hard_pruned_l2_block: Some(L2BlockNumber(7)), - }, - conn.pruning_dal().get_pruning_info().await.unwrap() - ); - - pruner.run_single_iteration().await.unwrap(); - // pruning shouldn't have progressed as chunk 6 cannot be pruned - assert_eq!( - PruningInfo { - last_soft_pruned_l1_batch: Some(L1BatchNumber(3)), - last_soft_pruned_l2_block: Some(L2BlockNumber(7)), - last_hard_pruned_l1_batch: Some(L1BatchNumber(3)), - last_hard_pruned_l2_block: Some(L2BlockNumber(7)), - }, - conn.pruning_dal().get_pruning_info().await.unwrap() - ); - } -} diff --git a/core/lib/zksync_core/src/db_pruner/tests.rs b/core/lib/zksync_core/src/db_pruner/tests.rs new file mode 100644 index 000000000000..a7f09103254c --- /dev/null +++ b/core/lib/zksync_core/src/db_pruner/tests.rs @@ -0,0 +1,327 @@ +use std::collections::HashMap; + +use assert_matches::assert_matches; +use multivm::zk_evm_latest::ethereum_types::H256; +use test_log::test; +use zksync_dal::pruning_dal::PruningInfo; +use zksync_db_connection::connection::Connection; +use zksync_health_check::CheckHealth; +use zksync_types::{block::L2BlockHeader, Address, L2BlockNumber, ProtocolVersion}; + +use super::*; + +#[derive(Debug)] +struct ConditionMock { + pub name: &'static str, + pub is_batch_prunable_responses: HashMap, +} + +impl ConditionMock { + fn name(name: &'static str) -> ConditionMock { + Self { + name, + is_batch_prunable_responses: HashMap::default(), + } + } + + fn with_response(mut self, l1_batch_number: L1BatchNumber, value: bool) -> Self { + self.is_batch_prunable_responses + .insert(l1_batch_number, value); + self + } +} + +impl fmt::Display for ConditionMock { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(formatter, "{}", self.name) + } +} + +#[async_trait] +impl PruneCondition for ConditionMock { + async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result { + self.is_batch_prunable_responses + .get(&l1_batch_number) + .cloned() + .context("error!") + } +} + +#[test(tokio::test)] +async fn is_l1_batch_prunable_works() { + let failing_check = Arc::new( + ConditionMock::name("some failing some passing1") + .with_response(L1BatchNumber(1), true) + .with_response(L1BatchNumber(2), true) + .with_response(L1BatchNumber(3), false) + .with_response(L1BatchNumber(4), true), + ); + let other_failing_check = Arc::new( + ConditionMock::name("some failing some passing2") + .with_response(L1BatchNumber(2), false) + .with_response(L1BatchNumber(3), true) + .with_response(L1BatchNumber(4), true), + ); + let pruner = DbPruner::with_conditions( + DbPrunerConfig { + removal_delay: Duration::ZERO, + pruned_batch_chunk_size: 1, + minimum_l1_batch_age: Duration::ZERO, + }, + ConnectionPool::test_pool().await, + vec![failing_check, other_failing_check], + ); + // first check succeeds, but second returns an error + assert!(!pruner.is_l1_batch_prunable(L1BatchNumber(1)).await); + // second check fails + assert!(!pruner.is_l1_batch_prunable(L1BatchNumber(2)).await); + // first check fails + assert!(!pruner.is_l1_batch_prunable(L1BatchNumber(3)).await); + + assert!(pruner.is_l1_batch_prunable(L1BatchNumber(4)).await); +} + +async fn insert_l2_blocks( + conn: &mut Connection<'_, Core>, + l1_batches_count: u64, + l2_blocks_per_batch: u64, +) { + conn.protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + + for l1_batch_number in 0..l1_batches_count { + for l2_block_index in 0..l2_blocks_per_batch { + let l2_block_number = + L2BlockNumber((l1_batch_number * l2_blocks_per_batch + l2_block_index) as u32); + let l2_block_header = L2BlockHeader { + number: l2_block_number, + timestamp: 0, + hash: H256::from_low_u64_be(u64::from(l2_block_number.0)), + l1_tx_count: 0, + l2_tx_count: 0, + fee_account_address: Address::repeat_byte(1), + base_fee_per_gas: 0, + gas_per_pubdata_limit: 0, + batch_fee_input: Default::default(), + base_system_contracts_hashes: Default::default(), + protocol_version: Some(Default::default()), + virtual_blocks: 0, + gas_limit: 0, + }; + + conn.blocks_dal() + .insert_l2_block(&l2_block_header) + .await + .unwrap(); + conn.blocks_dal() + .mark_l2_blocks_as_executed_in_l1_batch(L1BatchNumber(l1_batch_number as u32)) + .await + .unwrap(); + } + } +} + +#[test(tokio::test)] +async fn hard_pruning_ignores_conditions_checks() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + + insert_l2_blocks(&mut conn, 10, 2).await; + conn.pruning_dal() + .soft_prune_batches_range(L1BatchNumber(2), L2BlockNumber(5)) + .await + .unwrap(); + + let nothing_prunable_check = Arc::new(ConditionMock::name("nothing prunable")); + let pruner = DbPruner::with_conditions( + DbPrunerConfig { + removal_delay: Duration::ZERO, + pruned_batch_chunk_size: 5, + minimum_l1_batch_age: Duration::ZERO, + }, + pool.clone(), + vec![nothing_prunable_check], + ); + let health_check = pruner.health_check(); + + pruner.run_single_iteration().await.unwrap(); + + assert_eq!( + PruningInfo { + last_soft_pruned_l1_batch: Some(L1BatchNumber(2)), + last_soft_pruned_l2_block: Some(L2BlockNumber(5)), + last_hard_pruned_l1_batch: Some(L1BatchNumber(2)), + last_hard_pruned_l2_block: Some(L2BlockNumber(5)), + }, + conn.pruning_dal().get_pruning_info().await.unwrap() + ); + let health = health_check.check_health().await; + assert_matches!(health.status(), HealthStatus::Ready); +} +#[test(tokio::test)] +async fn pruner_catches_up_with_hard_pruning_up_to_soft_pruning_boundary_ignoring_chunk_size() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + insert_l2_blocks(&mut conn, 10, 2).await; + conn.pruning_dal() + .soft_prune_batches_range(L1BatchNumber(2), L2BlockNumber(5)) + .await + .unwrap(); + + let pruner = DbPruner::with_conditions( + DbPrunerConfig { + removal_delay: Duration::ZERO, + pruned_batch_chunk_size: 5, + minimum_l1_batch_age: Duration::ZERO, + }, + pool.clone(), + vec![], //No checks, so every batch is prunable + ); + + pruner.run_single_iteration().await.unwrap(); + + assert_eq!( + PruningInfo { + last_soft_pruned_l1_batch: Some(L1BatchNumber(2)), + last_soft_pruned_l2_block: Some(L2BlockNumber(5)), + last_hard_pruned_l1_batch: Some(L1BatchNumber(2)), + last_hard_pruned_l2_block: Some(L2BlockNumber(5)), + }, + conn.pruning_dal().get_pruning_info().await.unwrap() + ); + + pruner.run_single_iteration().await.unwrap(); + assert_eq!( + PruningInfo { + last_soft_pruned_l1_batch: Some(L1BatchNumber(7)), + last_soft_pruned_l2_block: Some(L2BlockNumber(15)), + last_hard_pruned_l1_batch: Some(L1BatchNumber(7)), + last_hard_pruned_l2_block: Some(L2BlockNumber(15)), + }, + conn.pruning_dal().get_pruning_info().await.unwrap() + ); +} + +#[test(tokio::test)] +async fn unconstrained_pruner_with_fresh_database() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + + insert_l2_blocks(&mut conn, 10, 2).await; + + let pruner = DbPruner::with_conditions( + DbPrunerConfig { + removal_delay: Duration::ZERO, + pruned_batch_chunk_size: 3, + minimum_l1_batch_age: Duration::ZERO, + }, + pool.clone(), + vec![], //No checks, so every batch is prunable + ); + + pruner.run_single_iteration().await.unwrap(); + + assert_eq!( + PruningInfo { + last_soft_pruned_l1_batch: Some(L1BatchNumber(3)), + last_soft_pruned_l2_block: Some(L2BlockNumber(7)), + last_hard_pruned_l1_batch: Some(L1BatchNumber(3)), + last_hard_pruned_l2_block: Some(L2BlockNumber(7)), + }, + conn.pruning_dal().get_pruning_info().await.unwrap() + ); + + pruner.run_single_iteration().await.unwrap(); + assert_eq!( + PruningInfo { + last_soft_pruned_l1_batch: Some(L1BatchNumber(6)), + last_soft_pruned_l2_block: Some(L2BlockNumber(13)), + last_hard_pruned_l1_batch: Some(L1BatchNumber(6)), + last_hard_pruned_l2_block: Some(L2BlockNumber(13)), + }, + conn.pruning_dal().get_pruning_info().await.unwrap() + ); +} + +#[test(tokio::test)] +async fn pruning_blocked_after_first_chunk() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + insert_l2_blocks(&mut conn, 10, 2).await; + + let first_chunk_prunable_check = + Arc::new(ConditionMock::name("first chunk prunable").with_response(L1BatchNumber(3), true)); + + let pruner = DbPruner::with_conditions( + DbPrunerConfig { + removal_delay: Duration::ZERO, + pruned_batch_chunk_size: 3, + minimum_l1_batch_age: Duration::ZERO, + }, + pool.clone(), + vec![first_chunk_prunable_check], + ); + pruner.run_single_iteration().await.unwrap(); + + assert_eq!( + PruningInfo { + last_soft_pruned_l1_batch: Some(L1BatchNumber(3)), + last_soft_pruned_l2_block: Some(L2BlockNumber(7)), + last_hard_pruned_l1_batch: Some(L1BatchNumber(3)), + last_hard_pruned_l2_block: Some(L2BlockNumber(7)), + }, + conn.pruning_dal().get_pruning_info().await.unwrap() + ); + + pruner.run_single_iteration().await.unwrap(); + // pruning shouldn't have progressed as chunk 6 cannot be pruned + assert_eq!( + PruningInfo { + last_soft_pruned_l1_batch: Some(L1BatchNumber(3)), + last_soft_pruned_l2_block: Some(L2BlockNumber(7)), + last_hard_pruned_l1_batch: Some(L1BatchNumber(3)), + last_hard_pruned_l2_block: Some(L2BlockNumber(7)), + }, + conn.pruning_dal().get_pruning_info().await.unwrap() + ); +} + +#[tokio::test] +async fn pruner_is_resistant_to_errors() { + let pool = ConnectionPool::::test_pool().await; + + // This condition returns `true` despite the batch not present in Postgres. + let erroneous_condition = + Arc::new(ConditionMock::name("always returns true").with_response(L1BatchNumber(3), true)); + + let pruner = DbPruner::with_conditions( + DbPrunerConfig { + removal_delay: Duration::ZERO, + pruned_batch_chunk_size: 3, + minimum_l1_batch_age: Duration::ZERO, + }, + pool.clone(), + vec![erroneous_condition], + ); + pruner.run_single_iteration().await.unwrap_err(); + + let mut health_check = pruner.health_check(); + let (stop_sender, stop_receiver) = watch::channel(false); + let pruner_task_handle = tokio::spawn(pruner.run(stop_receiver)); + + let health = health_check + .wait_for(|health| matches!(health.status(), HealthStatus::Affected)) + .await; + let health_details = health.details().unwrap(); + let error = health_details["error"].as_str().unwrap(); + // Matching error messages is an anti-pattern, but we essentially test UX here. + assert!( + error.contains("L1 batch #3 is ready to be pruned, but has no L2 blocks"), + "{error}" + ); + + stop_sender.send_replace(true); + pruner_task_handle.await.unwrap().unwrap(); +} diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index dd9c3bff9833..a51556cb54d1 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -1035,7 +1035,7 @@ async fn run_tree( } let tree_health_check = metadata_calculator.tree_health_check(); - app_health.insert_component(tree_health_check)?; + app_health.insert_custom_component(Arc::new(tree_health_check))?; let tree_task = tokio::spawn(metadata_calculator.run(stop_receiver)); task_futures.push(tree_task); diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index a00e3ee5535b..997c40a5ab9a 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -5,23 +5,27 @@ use std::{ future, future::Future, path::Path, + sync::Arc, time::Duration, }; use anyhow::Context as _; +use async_trait::async_trait; +use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; #[cfg(test)] use tokio::sync::mpsc; use tokio::sync::watch; use zksync_config::configs::database::MerkleTreeMode; use zksync_dal::{Connection, Core, CoreDal}; -use zksync_health_check::{Health, HealthStatus}; +use zksync_health_check::{CheckHealth, Health, HealthStatus, ReactiveHealthCheck}; use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, recovery::MerkleTreeRecovery, - Database, Key, NoVersionError, RocksDBWrapper, TreeEntry, TreeEntryWithProof, TreeInstruction, + Database, Key, MerkleTreeColumnFamily, NoVersionError, RocksDBWrapper, TreeEntry, + TreeEntryWithProof, TreeInstruction, }; -use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries}; +use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB}; use zksync_types::{block::L1BatchHeader, L1BatchNumber, StorageKey, H256}; use super::{ @@ -64,9 +68,59 @@ impl From for Health { } } -impl From for Health { - fn from(info: MerkleTreeInfo) -> Self { - Self::from(HealthStatus::Ready).with_details(MerkleTreeHealth::MainLoop(info)) +/// Health check for the Merkle tree. +/// +/// [`ReactiveHealthCheck`] is not sufficient for the tree because in the main loop, tree info +/// can be updated by multiple tasks (the metadata calculator and the pruning task). Additionally, +/// keeping track of all places where the info is updated is error-prone. +#[derive(Debug)] +pub(super) struct MerkleTreeHealthCheck { + reactive_check: ReactiveHealthCheck, + weak_reader: Arc>, +} + +impl MerkleTreeHealthCheck { + pub fn new(reactive_check: ReactiveHealthCheck, reader: LazyAsyncTreeReader) -> Self { + // We must not retain a strong RocksDB ref in the health check because it will prevent + // proper node shutdown (which waits until all RocksDB instances are dropped); health checks + // are dropped after all components are terminated. + let weak_reader = Arc::>::default(); + let weak_reader_for_task = weak_reader.clone(); + tokio::spawn(async move { + weak_reader_for_task + .set(reader.wait().await.downgrade()) + .ok(); + }); + + Self { + reactive_check, + weak_reader, + } + } +} + +#[async_trait] +impl CheckHealth for MerkleTreeHealthCheck { + fn name(&self) -> &'static str { + "tree" + } + + async fn check_health(&self) -> Health { + let health = self.reactive_check.check_health().await; + if !matches!(health.status(), HealthStatus::Ready) { + return health; + } + + if let Some(reader) = self + .weak_reader + .get() + .and_then(WeakAsyncTreeReader::upgrade) + { + let info = reader.info().await; + health.with_details(MerkleTreeHealth::MainLoop(info)) + } else { + health + } } } @@ -236,6 +290,13 @@ pub struct AsyncTreeReader { } impl AsyncTreeReader { + fn downgrade(&self) -> WeakAsyncTreeReader { + WeakAsyncTreeReader { + db: self.inner.db().clone().into_inner().downgrade(), + mode: self.mode, + } + } + pub async fn info(self) -> MerkleTreeInfo { tokio::task::spawn_blocking(move || MerkleTreeInfo { mode: self.mode, @@ -267,6 +328,22 @@ impl AsyncTreeReader { } } +/// Version of async tree reader that holds a weak reference to RocksDB. Used in [`MerkleTreeHealthCheck`]. +#[derive(Debug)] +struct WeakAsyncTreeReader { + db: WeakRocksDB, + mode: MerkleTreeMode, +} + +impl WeakAsyncTreeReader { + fn upgrade(&self) -> Option { + Some(AsyncTreeReader { + inner: ZkSyncTreeReader::new(self.db.upgrade()?.into()), + mode: self.mode, + }) + } +} + /// Lazily initialized [`AsyncTreeReader`]. #[derive(Debug)] pub struct LazyAsyncTreeReader(pub(super) watch::Receiver>); diff --git a/core/lib/zksync_core/src/metadata_calculator/mod.rs b/core/lib/zksync_core/src/metadata_calculator/mod.rs index 14c130da4b54..04d62e171f5d 100644 --- a/core/lib/zksync_core/src/metadata_calculator/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/mod.rs @@ -14,13 +14,13 @@ use zksync_config::configs::{ database::{MerkleTreeConfig, MerkleTreeMode}, }; use zksync_dal::{ConnectionPool, Core}; -use zksync_health_check::{HealthUpdater, ReactiveHealthCheck}; +use zksync_health_check::{CheckHealth, HealthUpdater, ReactiveHealthCheck}; use zksync_object_store::ObjectStore; pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo}; pub use self::{helpers::LazyAsyncTreeReader, pruning::MerkleTreePruningTask}; use self::{ - helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth}, + helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth, MerkleTreeHealthCheck}, metrics::{ConfigLabels, METRICS}, pruning::PruningHandles, updater::TreeUpdater, @@ -148,8 +148,8 @@ impl MetadataCalculator { } /// Returns a health check for this calculator. - pub fn tree_health_check(&self) -> ReactiveHealthCheck { - self.health_updater.subscribe() + pub fn tree_health_check(&self) -> impl CheckHealth { + MerkleTreeHealthCheck::new(self.health_updater.subscribe(), self.tree_reader()) } /// Returns a reference to the tree reader. @@ -199,20 +199,20 @@ impl MetadataCalculator { let Some(mut tree) = tree else { return Ok(()); // recovery was aborted because a stop signal was received }; - let tree_reader = tree.reader(); - tracing::info!( - "Merkle tree is initialized and ready to process L1 batches: {:?}", - tree_reader.clone().info().await - ); + let tree_reader = tree.reader(); + let tree_info = tree_reader.clone().info().await; if !self.pruning_handles_sender.is_closed() { self.pruning_handles_sender.send(tree.pruner()).ok(); } self.tree_reader.send_replace(Some(tree_reader)); + tracing::info!("Merkle tree is initialized and ready to process L1 batches: {tree_info:?}"); + self.health_updater + .update(MerkleTreeHealth::MainLoop(tree_info).into()); let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store); updater - .loop_updating_tree(self.delayer, &self.pool, stop_receiver, self.health_updater) + .loop_updating_tree(self.delayer, &self.pool, stop_receiver) .await } } diff --git a/core/lib/zksync_core/src/metadata_calculator/pruning.rs b/core/lib/zksync_core/src/metadata_calculator/pruning.rs index 9346a5be2c86..42f615de7728 100644 --- a/core/lib/zksync_core/src/metadata_calculator/pruning.rs +++ b/core/lib/zksync_core/src/metadata_calculator/pruning.rs @@ -3,18 +3,46 @@ use std::time::Duration; use anyhow::Context as _; +use serde::Serialize; use tokio::sync::{oneshot, watch}; use zksync_dal::{ConnectionPool, Core, CoreDal}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_merkle_tree::{MerkleTreePruner, MerkleTreePrunerHandle, RocksDBWrapper}; +use zksync_types::L1BatchNumber; pub(super) type PruningHandles = (MerkleTreePruner, MerkleTreePrunerHandle); +#[derive(Debug, Serialize)] +#[serde(tag = "stage", rename_all = "snake_case")] +enum MerkleTreePruningTaskHealth { + Initialization, + Pruning { + #[serde(skip_serializing_if = "Option::is_none")] + target_retained_l1_batch_number: Option, + }, + PruningStopped, + ShuttingDown, +} + +impl From for Health { + fn from(health: MerkleTreePruningTaskHealth) -> Self { + let status = match &health { + MerkleTreePruningTaskHealth::Initialization + | MerkleTreePruningTaskHealth::PruningStopped => HealthStatus::Affected, + MerkleTreePruningTaskHealth::Pruning { .. } => HealthStatus::Ready, + MerkleTreePruningTaskHealth::ShuttingDown => HealthStatus::ShuttingDown, + }; + Health::from(status).with_details(health) + } +} + /// Task performing Merkle tree pruning according to the pruning entries in Postgres. #[derive(Debug)] #[must_use = "Task should `run()` in a managed Tokio task"] pub struct MerkleTreePruningTask { handles: oneshot::Receiver, pool: ConnectionPool, + health_updater: HealthUpdater, poll_interval: Duration, } @@ -27,11 +55,20 @@ impl MerkleTreePruningTask { Self { handles, pool, + health_updater: ReactiveHealthCheck::new("tree_pruner").1, poll_interval, } } + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + // The pruning task is "affected" (not functioning) until the Merkle tree is initialized. + self.health_updater + .update(MerkleTreePruningTaskHealth::Initialization.into()); + let (mut pruner, pruner_handle); tokio::select! { res = self.handles => { @@ -48,6 +85,10 @@ impl MerkleTreePruningTask { return Ok(()); } } + let health = MerkleTreePruningTaskHealth::Pruning { + target_retained_l1_batch_number: None, + }; + self.health_updater.update(health.into()); tracing::info!("Obtained pruning handles; starting Merkle tree pruning"); // Pruner is not allocated a managed task because it is blocking; its cancellation awareness inherently @@ -61,16 +102,24 @@ impl MerkleTreePruningTask { drop(storage); if let Some(l1_batch_number) = pruning_info.last_hard_pruned_l1_batch { - let target_retained_version = u64::from(l1_batch_number.0) + 1; + let target_retained_l1_batch_number = l1_batch_number + 1; + let target_retained_version = u64::from(target_retained_l1_batch_number.0); let Ok(prev_target_version) = pruner_handle.set_target_retained_version(target_retained_version) else { + self.health_updater + .update(MerkleTreePruningTaskHealth::PruningStopped.into()); tracing::error!("Merkle tree pruning thread unexpectedly stopped"); return pruner_task_handle .await .context("Merkle tree pruning thread panicked"); }; + if prev_target_version != target_retained_version { + let health = MerkleTreePruningTaskHealth::Pruning { + target_retained_l1_batch_number: Some(target_retained_l1_batch_number), + }; + self.health_updater.update(health.into()); tracing::info!("Set target retained tree version from {prev_target_version} to {target_retained_version}"); } } @@ -83,6 +132,8 @@ impl MerkleTreePruningTask { } } + self.health_updater + .update(MerkleTreePruningTaskHealth::ShuttingDown.into()); tracing::info!("Stop signal received, Merkle tree pruning is shutting down"); drop(pruner_handle); pruner_task_handle @@ -94,6 +145,7 @@ impl MerkleTreePruningTask { #[cfg(test)] mod tests { use tempfile::TempDir; + use test_casing::test_casing; use zksync_types::{L1BatchNumber, L2BlockNumber}; use super::*; @@ -124,10 +176,14 @@ mod tests { .unwrap(); let reader = calculator.tree_reader(); let pruning_task = calculator.pruning_task(POLL_INTERVAL); + let mut health_check = pruning_task.health_check(); let (stop_sender, stop_receiver) = watch::channel(false); let calculator_handle = tokio::spawn(calculator.run(stop_receiver.clone())); let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver)); + health_check + .wait_for(|health| matches!(health.status(), HealthStatus::Ready)) + .await; // Wait until the calculator is initialized. let reader = reader.wait().await; while reader.clone().info().await.next_l1_batch_number < L1BatchNumber(6) { @@ -150,6 +206,55 @@ mod tests { stop_sender.send_replace(true); calculator_handle.await.unwrap().unwrap(); pruning_task_handle.await.unwrap().unwrap(); + health_check + .wait_for(|health| matches!(health.status(), HealthStatus::ShutDown)) + .await; + } + + #[derive(Debug)] + enum PrematureExitScenario { + CalculatorDrop, + StopSignal, + } + + impl PrematureExitScenario { + const ALL: [Self; 2] = [Self::CalculatorDrop, Self::StopSignal]; + } + + #[test_casing(2, PrematureExitScenario::ALL)] + #[tokio::test] + async fn pruning_task_premature_exit(scenario: PrematureExitScenario) { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let config = mock_config(temp_dir.path()); + let mut storage = pool.connection().await.unwrap(); + insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let mut calculator = MetadataCalculator::new(config, None, pool.clone()) + .await + .unwrap(); + let pruning_task = calculator.pruning_task(POLL_INTERVAL); + let mut health_check = pruning_task.health_check(); + let (stop_sender, stop_receiver) = watch::channel(false); + let pruning_task_handle = tokio::spawn(pruning_task.run(stop_receiver)); + + // Task health should be set to "affected" until `calculator` is started (which is never). + health_check + .wait_for(|health| matches!(health.status(), HealthStatus::Affected)) + .await; + + match scenario { + PrematureExitScenario::CalculatorDrop => drop(calculator), + PrematureExitScenario::StopSignal => { + stop_sender.send_replace(true); + } + } + health_check + .wait_for(|health| matches!(health.status(), HealthStatus::ShutDown)) + .await; + pruning_task_handle.await.unwrap().unwrap(); } #[tokio::test] diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 35acf532a6a0..3f602d5e6258 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -15,6 +15,7 @@ use zksync_health_check::{CheckHealth, HealthStatus}; use zksync_merkle_tree::domain::ZkSyncTree; use zksync_object_store::{ObjectStore, ObjectStoreFactory}; use zksync_prover_interface::inputs::PrepareBasicCircuitsJob; +use zksync_storage::RocksDB; use zksync_types::{ block::L1BatchHeader, AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey, StorageLog, H256, @@ -166,6 +167,11 @@ async fn status_receiver_has_correct_states() { other_tree_health_check.check_health().await.status(), HealthStatus::ShutDown ); + + // Check that health checks don't prevent dropping RocksDB instances. + tokio::task::spawn_blocking(RocksDB::await_rocksdb_termination) + .await + .unwrap(); } #[tokio::test] diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index fe039b15132f..1b853557558e 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -6,7 +6,6 @@ use anyhow::Context as _; use futures::{future, FutureExt}; use tokio::sync::watch; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; -use zksync_health_check::HealthUpdater; use zksync_merkle_tree::domain::TreeMetadata; use zksync_object_store::ObjectStore; use zksync_types::{ @@ -205,7 +204,6 @@ impl TreeUpdater { delayer: Delayer, pool: &ConnectionPool, mut stop_receiver: watch::Receiver, - health_updater: HealthUpdater, ) -> anyhow::Result<()> { let Some(earliest_l1_batch) = wait_for_l1_batch(pool, delayer.delay_interval(), &mut stop_receiver).await? @@ -245,8 +243,6 @@ impl TreeUpdater { last L1 batch with metadata: {last_l1_batch_with_metadata:?}", max_batches_per_iter = self.max_l1_batches_per_iter ); - let tree_info = tree.reader().info().await; - health_updater.update(tree_info.into()); // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after // recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component @@ -272,9 +268,6 @@ impl TreeUpdater { tree.save().await?; next_l1_batch_to_seal = tree.next_l1_batch_number(); tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); - - let tree_info = tree.reader().info().await; - health_updater.update(tree_info.into()); } } @@ -294,9 +287,6 @@ impl TreeUpdater { ); delayer.wait(&self.tree).left_future() } else { - let tree_info = self.tree.reader().info().await; - health_updater.update(tree_info.into()); - tracing::trace!( "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) made progress from #{snapshot}" ); @@ -313,7 +303,6 @@ impl TreeUpdater { () = delay => { /* The delay has passed */ } } } - drop(health_updater); // Explicitly mark where the updater should be dropped Ok(()) } } diff --git a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs index 8746af995824..c0f71b3514b4 100644 --- a/core/node/node_framework/src/implementations/layers/metadata_calculator.rs +++ b/core/node/node_framework/src/implementations/layers/metadata_calculator.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Context as _; use zksync_core::metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig}; use zksync_storage::RocksDB; @@ -63,7 +65,7 @@ impl WiringLayer for MetadataCalculatorLayer { let AppHealthCheckResource(app_health) = context.get_resource_or_default().await; app_health - .insert_component(metadata_calculator.tree_health_check()) + .insert_custom_component(Arc::new(metadata_calculator.tree_health_check())) .map_err(WiringError::internal)?; let task = Box::new(MetadataCalculatorTask { diff --git a/core/tests/revert-test/tests/tester.ts b/core/tests/revert-test/tests/tester.ts index 1590c64a7be9..246af0280aeb 100644 --- a/core/tests/revert-test/tests/tester.ts +++ b/core/tests/revert-test/tests/tester.ts @@ -66,7 +66,7 @@ export class Tester { } /// Ensures that the main wallet has enough base token. - /// This can not be done inside the `init` function becasue `init` function can be called before the + /// This can not be done inside the `init` function because `init` function can be called before the /// L2 RPC is active, but we need the L2 RPC to get the base token address. async fundSyncWallet() { const baseTokenAddress = await this.syncWallet.provider.getBaseTokenContractAddress(); diff --git a/core/tests/snapshot-recovery-test/tests/snapshot-recovery.test.ts b/core/tests/snapshot-recovery-test/tests/snapshot-recovery.test.ts index 80f586ec58d7..4bb74c047c4a 100644 --- a/core/tests/snapshot-recovery-test/tests/snapshot-recovery.test.ts +++ b/core/tests/snapshot-recovery-test/tests/snapshot-recovery.test.ts @@ -7,6 +7,7 @@ import fs, { FileHandle } from 'node:fs/promises'; import { ChildProcess, spawn, exec } from 'node:child_process'; import path from 'node:path'; import { promisify } from 'node:util'; +import * as zksync from 'zksync-ethers'; interface AllSnapshotsResponse { readonly snapshotsL1BatchNumbers: number[]; @@ -62,15 +63,29 @@ interface ReorgDetectorDetails { readonly last_correct_l2_block?: number; } +interface TreeDetails { + readonly min_l1_batch_number?: number | null; +} + +interface DbPrunerDetails { + readonly last_soft_pruned_l1_batch?: number; + readonly last_hard_pruned_l1_batch?: number; +} + interface HealthCheckResponse { readonly components: { snapshot_recovery?: Health; consistency_checker?: Health; reorg_detector?: Health; + tree?: Health; + db_pruner?: Health; + tree_pruner?: Health<{}>; }; } /** + * Tests snapshot recovery and node state pruning. + * * Assumptions: * * - Main node is run for the duration of the test. @@ -79,6 +94,8 @@ interface HealthCheckResponse { */ describe('snapshot recovery', () => { const STORAGE_LOG_SAMPLE_PROBABILITY = 0.1; + // Number of L1 batches awaited to be pruned. + const PRUNED_BATCH_COUNT = 1; const homeDir = process.env.ZKSYNC_HOME!!; @@ -87,7 +104,7 @@ describe('snapshot recovery', () => { (process.env.DEPLOYMENT_MODE === 'Validium' ? '-validium' : '') + (process.env.IN_DOCKER ? '-docker' : ''); console.log('Using external node env profile', externalNodeEnvProfile); - const externalNodeEnv = { + let externalNodeEnv: { [key: string]: string } = { ...process.env, ZKSYNC_ENV: externalNodeEnvProfile, EN_SNAPSHOTS_RECOVERY_ENABLED: 'true' @@ -99,7 +116,9 @@ describe('snapshot recovery', () => { let externalNodeLogs: FileHandle; let externalNodeProcess: ChildProcess; - before(async () => { + let fundedWallet: zkweb3.Wallet; + + before('prepare environment', async () => { expect(process.env.ZKSYNC_ENV, '`ZKSYNC_ENV` should not be set to allow running both server and EN components') .to.be.undefined; mainNode = new zkweb3.Provider('http://127.0.0.1:3050'); @@ -107,6 +126,13 @@ describe('snapshot recovery', () => { await killExternalNode(); }); + before('create test wallet', async () => { + const testConfigPath = path.join(process.env.ZKSYNC_HOME!, `etc/test_config/constant/eth.json`); + const ethTestConfig = JSON.parse(await fs.readFile(testConfigPath, { encoding: 'utf-8' })); + const mnemonic = ethTestConfig.test_mnemonic as string; + fundedWallet = zkweb3.Wallet.fromMnemonic(mnemonic, "m/44'/60'/0'/0/0").connect(mainNode); + }); + after(async () => { if (externalNodeProcess) { externalNodeProcess.kill(); @@ -221,13 +247,7 @@ describe('snapshot recovery', () => { step('initialize external node', async () => { externalNodeLogs = await fs.open('snapshot-recovery.log', 'w'); - - const enableConsensus = process.env.ENABLE_CONSENSUS === 'true'; - let args = ['external-node', '--']; - if (enableConsensus) { - args.push('--enable-consensus'); - } - externalNodeProcess = spawn('zk', args, { + externalNodeProcess = spawn('zk', externalNodeArgs(), { cwd: homeDir, stdio: [null, externalNodeLogs.fd, externalNodeLogs.fd], shell: true, @@ -326,6 +346,99 @@ describe('snapshot recovery', () => { expect(mainNodeTokens).to.deep.equal(externalNodeTokens); }); + + step('restart EN', async () => { + console.log('Stopping external node'); + await stopExternalNode(); + await waitForProcess(externalNodeProcess); + + const pruningParams = { + EN_PRUNING_ENABLED: 'true', + EN_PRUNING_REMOVAL_DELAY_SEC: '1', + EN_PRUNING_DATA_RETENTION_SEC: '0', // immediately prune executed batches + EN_PRUNING_CHUNK_SIZE: '1' + }; + externalNodeEnv = { ...externalNodeEnv, ...pruningParams }; + console.log('Starting EN with pruning params', pruningParams); + externalNodeProcess = spawn('zk', externalNodeArgs(), { + cwd: homeDir, + stdio: [null, externalNodeLogs.fd, externalNodeLogs.fd], + shell: true, + env: externalNodeEnv + }); + + let isDbPrunerReady = false; + let isTreePrunerReady = false; + while (!isDbPrunerReady || !isTreePrunerReady) { + await sleep(1000); + const health = await getExternalNodeHealth(); + if (health === null) { + continue; + } + + if (!isDbPrunerReady) { + console.log('DB pruner health', health.components.db_pruner); + const status = health.components.db_pruner?.status; + expect(status).to.be.oneOf([undefined, 'not_ready', 'ready']); + isDbPrunerReady = status === 'ready'; + } + if (!isTreePrunerReady) { + console.log('Tree pruner health', health.components.tree_pruner); + const status = health.components.tree_pruner?.status; + expect(status).to.be.oneOf([undefined, 'not_ready', 'ready']); + isTreePrunerReady = status === 'ready'; + } + } + }); + + // The logic below works fine if there is other transaction activity on the test network; we still + // create *at least* `PRUNED_BATCH_COUNT + 1` L1 batches; thus, at least `PRUNED_BATCH_COUNT` of them + // should be pruned eventually. + step(`generate ${PRUNED_BATCH_COUNT + 1} transactions`, async () => { + let pastL1BatchNumber = snapshotMetadata.l1BatchNumber; + for (let i = 0; i < PRUNED_BATCH_COUNT + 1; i++) { + const transactionResponse = await fundedWallet.transfer({ + to: fundedWallet.address, + amount: 1, + token: zksync.utils.ETH_ADDRESS + }); + console.log('Generated a transaction from funded wallet', transactionResponse); + const receipt = await transactionResponse.wait(); + console.log('Got finalized transaction receipt', receipt); + + // Wait until an L1 batch number with the transaction is sealed. + let newL1BatchNumber: number; + while ((newL1BatchNumber = await mainNode.getL1BatchNumber()) <= pastL1BatchNumber) { + await sleep(1000); + } + console.log(`Sealed L1 batch #${newL1BatchNumber}`); + pastL1BatchNumber = newL1BatchNumber; + } + }); + + step(`wait for pruning ${PRUNED_BATCH_COUNT} L1 batches`, async () => { + const expectedPrunedBatchNumber = snapshotMetadata.l1BatchNumber + PRUNED_BATCH_COUNT; + console.log(`Waiting for L1 batch #${expectedPrunedBatchNumber} to be pruned`); + let isDbPruned = false; + let isTreePruned = false; + + while (!isDbPruned || !isTreePruned) { + await sleep(1000); + const health = (await getExternalNodeHealth())!; + + const dbPrunerHealth = health.components.db_pruner!; + console.log('DB pruner health', dbPrunerHealth); + expect(dbPrunerHealth.status).to.be.equal('ready'); + isDbPruned = dbPrunerHealth.details!.last_hard_pruned_l1_batch! >= expectedPrunedBatchNumber; + + const treeHealth = health.components.tree!; + console.log('Tree health', treeHealth); + expect(treeHealth.status).to.be.equal('ready'); + const minTreeL1BatchNumber = treeHealth.details?.min_l1_batch_number; + // The batch number pruned from the tree is one less than `minTreeL1BatchNumber`. + isTreePruned = minTreeL1BatchNumber ? minTreeL1BatchNumber - 1 >= expectedPrunedBatchNumber : false; + } + }); }); async function waitForProcess(childProcess: ChildProcess) { @@ -379,6 +492,32 @@ async function getExternalNodeHealth() { } } +function externalNodeArgs() { + const enableConsensus = process.env.ENABLE_CONSENSUS === 'true'; + const args = ['external-node', '--']; + if (enableConsensus) { + args.push('--enable-consensus'); + } + return args; +} + +async function stopExternalNode() { + interface ChildProcessError extends Error { + readonly code: number | null; + } + + try { + await promisify(exec)('killall -q -INT zksync_external_node'); + } catch (err) { + const typedErr = err as ChildProcessError; + if (typedErr.code === 1) { + // No matching processes were found; this is fine. + } else { + throw err; + } + } +} + async function killExternalNode() { interface ChildProcessError extends Error { readonly code: number | null;