From 8ae924c5b568c91ff3aa101af703714b99f93465 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Wed, 8 Feb 2023 11:49:49 +0100 Subject: [PATCH] [objects pruner] live pruner support (#7923) First iteration of live pruner. **Implementation**: `AuthorityStorePruner` is subscribed to updates from `CheckpointExecutor` Notes: * for now the same deletion method is used as in original pruner: `delete_range`. Eventually we may want to experiment with point-deletes. Starting with delete_range to address potential tombstone concern separately --- Cargo.lock | 1 + crates/sui-benchmark/Cargo.toml | 1 + crates/sui-benchmark/tests/simtest.rs | 40 ++++- .../data/fullnode-template-with-path.yaml | 1 + crates/sui-config/data/fullnode-template.yaml | 1 + crates/sui-config/src/node.rs | 8 +- crates/sui-config/src/swarm.rs | 11 +- ...ests__network_config_snapshot_matches.snap | 7 + .../sui-core/src/authority/authority_store.rs | 26 ++- .../src/authority/authority_store_pruner.rs | 170 ++++++++++++++---- .../src/authority/authority_store_tables.rs | 1 + .../checkpoints/checkpoint_executor/mod.rs | 98 +++++++--- crates/sui-node/src/lib.rs | 15 +- crates/sui/tests/full_node_tests.rs | 2 +- crates/test-utils/src/network.rs | 8 + 15 files changed, 316 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80d3d4b50ae5b..7d2f49e420c77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8488,6 +8488,7 @@ dependencies = [ "tokio-util 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", "tracing-subscriber 0.3.16", + "typed-store", "workspace-hack", ] diff --git a/crates/sui-benchmark/Cargo.toml b/crates/sui-benchmark/Cargo.toml index 91622d52271af..8d1469214d7c8 100644 --- a/crates/sui-benchmark/Cargo.toml +++ b/crates/sui-benchmark/Cargo.toml @@ -55,6 +55,7 @@ sysinfo = "0.27.5" [target.'cfg(msim)'.dependencies] sui-macros = { path = "../sui-macros" } sui-simulator = { path = "../sui-simulator" } +typed-store = { path = "../typed-store" } [features] benchmark = ["narwhal-node/benchmark"] diff --git a/crates/sui-benchmark/tests/simtest.rs b/crates/sui-benchmark/tests/simtest.rs index 091901eaf4c4a..8fe768d7490cf 100644 --- a/crates/sui-benchmark/tests/simtest.rs +++ b/crates/sui-benchmark/tests/simtest.rs @@ -4,6 +4,7 @@ #[cfg(msim)] mod test { + use itertools::Itertools; use rand::{thread_rng, Rng}; use std::str::FromStr; use std::sync::{Arc, Mutex}; @@ -20,13 +21,19 @@ mod test { workloads::make_combination_workload, LocalValidatorAggregatorProxy, ValidatorProxy, }; - use sui_config::SUI_KEYSTORE_FILENAME; + use sui_config::{AUTHORITIES_DB_NAME, SUI_KEYSTORE_FILENAME}; use sui_macros::{register_fail_points, sim_test}; use sui_simulator::{configs::*, SimConfig}; - use sui_types::object::Owner; + use sui_types::object::{Object, Owner}; + use sui_types::storage::ObjectKey; use test_utils::messages::get_sui_gas_object_with_wallet_context; use test_utils::network::{TestCluster, TestClusterBuilder}; use tracing::info; + use typed_store::rocks::ReadWriteOptions; + use typed_store::{ + rocks::{DBMap, MetricConf}, + traits::Map, + }; fn test_config() -> SimConfig { env_config( @@ -140,6 +147,35 @@ mod test { test_simulated_load(test_cluster, 120).await; } + #[sim_test(config = "test_config()")] + async fn test_simulated_load_pruning() { + let epoch_duration_ms = 1000; + let test_cluster = build_test_cluster(7, epoch_duration_ms).await; + test_simulated_load(test_cluster.clone(), 5).await; + // waiting enough time to get all transactions into checkpoints + tokio::time::sleep(Duration::from_millis(2 * epoch_duration_ms)).await; + + let swarm_dir = test_cluster.swarm.dir().join(AUTHORITIES_DB_NAME); + let validator_path = std::fs::read_dir(swarm_dir).unwrap().next().unwrap(); + let db_path = validator_path.unwrap().path().join("store"); + + let db = typed_store::rocks::open_cf(&db_path, None, MetricConf::default(), &["objects"]); + let objects = DBMap::::reopen( + &db.unwrap(), + Some("objects"), + &ReadWriteOptions { + ignore_range_deletions: false, + }, + ) + .unwrap(); + + let iter = objects.iter().skip_to_last().reverse(); + for (_, group) in &iter.group_by(|item| item.0 .0) { + // assure only last version is kept + assert_eq!(group.count(), 1); + } + } + async fn build_test_cluster( default_num_validators: usize, default_epoch_duration_ms: u64, diff --git a/crates/sui-config/data/fullnode-template-with-path.yaml b/crates/sui-config/data/fullnode-template-with-path.yaml index 57c9f4711e0fa..b948666bb7ddc 100644 --- a/crates/sui-config/data/fullnode-template-with-path.yaml +++ b/crates/sui-config/data/fullnode-template-with-path.yaml @@ -17,6 +17,7 @@ authority-store-pruning-config: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: true protocol-key-pair: path: "protocol.key" diff --git a/crates/sui-config/data/fullnode-template.yaml b/crates/sui-config/data/fullnode-template.yaml index 092de093f953f..f9f7d6a0de6f1 100644 --- a/crates/sui-config/data/fullnode-template.yaml +++ b/crates/sui-config/data/fullnode-template.yaml @@ -17,3 +17,4 @@ authority-store-pruning-config: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: true diff --git a/crates/sui-config/src/node.rs b/crates/sui-config/src/node.rs index 5e65b44e9df17..6b5a6eaba99ef 100644 --- a/crates/sui-config/src/node.rs +++ b/crates/sui-config/src/node.rs @@ -277,16 +277,18 @@ pub struct AuthorityStorePruningConfig { pub objects_pruning_initial_delay_secs: u64, pub num_latest_epoch_dbs_to_retain: usize, pub epoch_db_pruning_period_secs: u64, + pub enable_live_pruner: bool, } impl Default for AuthorityStorePruningConfig { fn default() -> Self { Self { objects_num_latest_versions_to_retain: u64::MAX, - objects_pruning_period_secs: u64::MAX, - objects_pruning_initial_delay_secs: u64::MAX, + objects_pruning_period_secs: 24 * 60 * 60, + objects_pruning_initial_delay_secs: 60 * 60, num_latest_epoch_dbs_to_retain: usize::MAX, epoch_db_pruning_period_secs: u64::MAX, + enable_live_pruner: cfg!(test) || cfg!(msim), } } } @@ -301,6 +303,7 @@ impl AuthorityStorePruningConfig { objects_pruning_initial_delay_secs: 60 * 60, num_latest_epoch_dbs_to_retain: 3, epoch_db_pruning_period_secs: 60 * 60, + enable_live_pruner: cfg!(test) || cfg!(msim), } } pub fn fullnode_config() -> Self { @@ -310,6 +313,7 @@ impl AuthorityStorePruningConfig { objects_pruning_initial_delay_secs: 60 * 60, num_latest_epoch_dbs_to_retain: 3, epoch_db_pruning_period_secs: 60 * 60, + enable_live_pruner: cfg!(test) || cfg!(msim), } } } diff --git a/crates/sui-config/src/swarm.rs b/crates/sui-config/src/swarm.rs index 2403fcba95a39..11b9042d30888 100644 --- a/crates/sui-config/src/swarm.rs +++ b/crates/sui-config/src/swarm.rs @@ -85,6 +85,7 @@ pub struct FullnodeConfigBuilder<'a> { rpc_port: Option, // port for admin interface admin_port: Option, + enable_pruner: bool, } impl<'a> FullnodeConfigBuilder<'a> { @@ -98,6 +99,7 @@ impl<'a> FullnodeConfigBuilder<'a> { p2p_port: None, rpc_port: None, admin_port: None, + enable_pruner: true, } } @@ -156,6 +158,11 @@ impl<'a> FullnodeConfigBuilder<'a> { self } + pub fn set_enable_pruner(mut self, status: bool) -> Self { + self.enable_pruner = status; + self + } + pub fn build(self) -> Result { let protocol_key_pair = get_key_pair_from_rng::(&mut OsRng).1; let worker_key_pair = get_key_pair_from_rng::(&mut OsRng).1; @@ -216,6 +223,8 @@ impl<'a> FullnodeConfigBuilder<'a> { let rpc_port = self.rpc_port.unwrap_or_else(|| get_available_port(9000)); let jsonrpc_server_url = format!("{}:{}", listen_ip, rpc_port); let json_rpc_address: SocketAddr = jsonrpc_server_url.parse().unwrap(); + let mut authority_store_pruning_config = AuthorityStorePruningConfig::fullnode_config(); + authority_store_pruning_config.enable_live_pruner = self.enable_pruner; Ok(NodeConfig { protocol_key_pair: AuthorityKeyPairWithPath::new(protocol_key_pair), @@ -237,7 +246,7 @@ impl<'a> FullnodeConfigBuilder<'a> { grpc_load_shed: None, grpc_concurrency_limit: None, p2p_config, - authority_store_pruning_config: AuthorityStorePruningConfig::fullnode_config(), + authority_store_pruning_config, end_of_epoch_broadcast_channel_capacity: default_end_of_epoch_broadcast_channel_capacity(), checkpoint_executor_config: Default::default(), diff --git a/crates/sui-config/tests/snapshots/snapshot_tests__network_config_snapshot_matches.snap b/crates/sui-config/tests/snapshots/snapshot_tests__network_config_snapshot_matches.snap index f412b3d60986b..77038ae1573e8 100644 --- a/crates/sui-config/tests/snapshots/snapshot_tests__network_config_snapshot_matches.snap +++ b/crates/sui-config/tests/snapshots/snapshot_tests__network_config_snapshot_matches.snap @@ -67,6 +67,7 @@ validator_configs: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: false end-of-epoch-broadcast-channel-capacity: 128 checkpoint-executor-config: checkpoint-execution-max-concurrency: 100 @@ -135,6 +136,7 @@ validator_configs: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: false end-of-epoch-broadcast-channel-capacity: 128 checkpoint-executor-config: checkpoint-execution-max-concurrency: 100 @@ -203,6 +205,7 @@ validator_configs: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: false end-of-epoch-broadcast-channel-capacity: 128 checkpoint-executor-config: checkpoint-execution-max-concurrency: 100 @@ -271,6 +274,7 @@ validator_configs: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: false end-of-epoch-broadcast-channel-capacity: 128 checkpoint-executor-config: checkpoint-execution-max-concurrency: 100 @@ -339,6 +343,7 @@ validator_configs: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: false end-of-epoch-broadcast-channel-capacity: 128 checkpoint-executor-config: checkpoint-execution-max-concurrency: 100 @@ -407,6 +412,7 @@ validator_configs: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: false end-of-epoch-broadcast-channel-capacity: 128 checkpoint-executor-config: checkpoint-execution-max-concurrency: 100 @@ -475,6 +481,7 @@ validator_configs: objects-pruning-initial-delay-secs: 3600 num-latest-epoch-dbs-to-retain: 3 epoch-db-pruning-period-secs: 3600 + enable-live-pruner: false end-of-epoch-broadcast-channel-capacity: 128 checkpoint-executor-config: checkpoint-execution-max-concurrency: 100 diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index 11a3de537bd6c..d585a9226214d 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -4,6 +4,7 @@ use super::authority_store_pruner::AuthorityStorePruner; use super::{authority_store_tables::AuthorityPerpetualTables, *}; use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; +use crate::checkpoints::checkpoint_executor::CheckpointExecutionMessage; use once_cell::sync::OnceCell; use rocksdb::Options; use serde::{Deserialize, Serialize}; @@ -18,7 +19,7 @@ use sui_types::object::Owner; use sui_types::object::PACKAGE_VERSION; use sui_types::storage::{ChildObjectResolver, ObjectKey}; use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure, storage::ParentSync}; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tokio::sync::{mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{debug, info, trace}; use typed_store::rocks::DBBatch; use typed_store::traits::Map; @@ -60,6 +61,7 @@ impl AuthorityStore { genesis: &Genesis, committee_store: &Arc, pruning_config: &AuthorityStorePruningConfig, + checkpoint_stream: mpsc::Receiver, ) -> SuiResult { let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(path, db_options.clone())); if perpetual_tables.database_is_empty()? { @@ -69,7 +71,14 @@ impl AuthorityStore { let committee = committee_store .get_committee(&cur_epoch)? .expect("Committee of the current epoch must exist"); - Self::open_inner(genesis, perpetual_tables, committee, pruning_config).await + Self::open_inner( + genesis, + perpetual_tables, + committee, + pruning_config, + checkpoint_stream, + ) + .await } pub async fn open_with_committee_for_testing( @@ -83,7 +92,14 @@ impl AuthorityStore { // as the genesis committee. assert_eq!(committee.epoch, 0); let perpetual_tables = Arc::new(AuthorityPerpetualTables::open(path, db_options.clone())); - Self::open_inner(genesis, perpetual_tables, committee.clone(), pruning_config).await + Self::open_inner( + genesis, + perpetual_tables, + committee.clone(), + pruning_config, + mpsc::channel(1).1, + ) + .await } async fn open_inner( @@ -91,10 +107,12 @@ impl AuthorityStore { perpetual_tables: Arc, committee: Committee, pruning_config: &AuthorityStorePruningConfig, + checkpoint_stream: mpsc::Receiver, ) -> SuiResult { let epoch = committee.epoch; - let _store_pruner = AuthorityStorePruner::new(perpetual_tables.clone(), pruning_config); + let _store_pruner = + AuthorityStorePruner::new(perpetual_tables.clone(), pruning_config, checkpoint_stream); let store = Self { mutex_table: MutexTable::new(NUM_SHARDS, SHARD_SIZE), diff --git a/crates/sui-core/src/authority/authority_store_pruner.rs b/crates/sui-core/src/authority/authority_store_pruner.rs index 4ad59c1aa2533..e431e21c56b5a 100644 --- a/crates/sui-core/src/authority/authority_store_pruner.rs +++ b/crates/sui-core/src/authority/authority_store_pruner.rs @@ -1,15 +1,25 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::checkpoints::checkpoint_executor::{ + CheckpointExecutionMessage, CheckpointExecutionState, +}; +use mysten_metrics::monitored_scope; +use std::cmp::max; +use std::collections::HashMap; use std::{sync::Arc, time::Duration}; use sui_config::node::AuthorityStorePruningConfig; +use sui_types::base_types::SequenceNumber; use sui_types::object::Object; use sui_types::{ base_types::{ObjectID, VersionNumber}, storage::ObjectKey, }; use tokio::{ - sync::oneshot::{self, Sender}, + sync::{ + mpsc, + oneshot::{self, Sender}, + }, time::{self, Instant}, }; use tracing::log::{error, info}; @@ -122,17 +132,47 @@ impl AuthorityStorePruner { total_pruned } + fn handle_checkpoint( + checkpoint_execution_state: CheckpointExecutionState, + objects: &DBMap, + ) -> anyhow::Result { + let _scope = monitored_scope("ObjectsLivePruner"); + let mut pruned = 0; + let mut wb = objects.batch(); + let mut updates = HashMap::new(); + + for effects in checkpoint_execution_state.effects { + for (object_id, seq_number) in effects.modified_at_versions { + updates + .entry(object_id) + .and_modify(|version| *version = max(*version, seq_number)) + .or_insert(seq_number); + } + } + for (object_id, version) in updates { + let object_key = ObjectKey(object_id, version); + let iter = objects.iter().skip_prior_to(&object_key)?.reverse(); + let mut start_range = object_key; + let end_range = ObjectKey(object_key.0, SequenceNumber::from(object_key.1.value() + 1)); + for (key, _) in iter.take_while(|(key, _)| key.0 == object_key.0) { + start_range = key; + pruned += 1; + } + wb = wb.delete_range(objects, &start_range, &end_range)?; + } + wb.write()?; + Ok(pruned) + } + fn setup_objects_pruning( num_versions_to_retain: u64, pruning_timeperiod: Duration, pruning_initial_delay: Duration, perpetual_db: Arc, + enable_live_pruner: bool, + mut checkpoint_stream: mpsc::Receiver, ) -> Sender<()> { let (sender, mut recv) = tokio::sync::oneshot::channel(); - if num_versions_to_retain == u64::MAX { - info!("Skipping pruning of objects table as we want to retain all versions"); - return sender; - } info!( "Starting object pruning service with num_versions_to_retain={num_versions_to_retain}" ); @@ -142,7 +182,7 @@ impl AuthorityStorePruner { tokio::task::spawn(async move { loop { tokio::select! { - _ = prune_interval.tick() => { + _ = prune_interval.tick(), if num_versions_to_retain != u64::MAX => { info!("Starting pruning of objects table"); let num_pruned = Self::prune_objects(num_versions_to_retain, &perpetual_db.objects); info!("Finished pruning with total object versions pruned = {}", num_pruned); @@ -151,7 +191,22 @@ impl AuthorityStorePruner { } else { error!("Failed to flush objects table"); } - } + }, + Some((state, callback)) = checkpoint_stream.recv(), if enable_live_pruner => { + loop { + match Self::handle_checkpoint(state.clone(), &perpetual_db.objects) { + Ok(pruned) => { + info!("Pruned {} objects", pruned); + callback.send(()).expect("failed to notify checkpoint executor"); + break; + } + Err(err) => { + error!("Failed to prune objects {:?}", err); + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + } + }, _ = &mut recv => break, } } @@ -161,6 +216,7 @@ impl AuthorityStorePruner { pub fn new( perpetual_db: Arc, pruning_config: &AuthorityStorePruningConfig, + checkpoint_stream: mpsc::Receiver, ) -> Self { AuthorityStorePruner { _objects_pruner_cancel_handle: Self::setup_objects_pruning( @@ -168,6 +224,8 @@ impl AuthorityStorePruner { Duration::from_secs(pruning_config.objects_pruning_period_secs), Duration::from_secs(pruning_config.objects_pruning_initial_delay_secs), perpetual_db, + pruning_config.enable_live_pruner, + checkpoint_stream, ), } } @@ -183,9 +241,11 @@ mod tests { use tracing::log::{error, info}; use crate::authority::authority_store_tables::AuthorityPerpetualTables; + use crate::checkpoints::checkpoint_executor::CheckpointExecutionState; #[cfg(not(target_env = "msvc"))] use pprof::Symbol; use sui_types::base_types::VersionNumber; + use sui_types::messages::TransactionEffects; use sui_types::{ base_types::{ObjectID, SequenceNumber}, object::Object, @@ -254,46 +314,92 @@ mod tests { Ok(()) } + fn generate_test_data( + db: Arc, + num_versions_per_object: u64, + num_object_versions_to_retain: u64, + total_unique_object_ids: u32, + ) -> Result<(Vec, Vec), anyhow::Error> { + let (mut to_keep, mut to_delete) = (vec![], vec![]); + + let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?; + for id in ids { + for (counter, i) in (0..num_versions_per_object).rev().enumerate() { + let object_key = ObjectKey(id, SequenceNumber::from_u64(i)); + if counter < num_object_versions_to_retain.try_into().unwrap() { + // latest `num_object_versions_to_retain` should not have been pruned + to_keep.push(object_key); + } else { + to_delete.push(object_key); + } + db.objects.insert( + &ObjectKey(id, SequenceNumber::from(i)), + &Object::immutable_with_id_for_testing(id), + )?; + } + } + assert_eq!( + to_keep.len() as u64, + std::cmp::min(num_object_versions_to_retain, num_versions_per_object) + * total_unique_object_ids as u64 + ); + Ok((to_keep, to_delete)) + } + + #[tokio::test] + async fn test_live_pruning() { + let path = tempfile::tempdir().unwrap().into_path(); + + let to_keep = { + let db = Arc::new(AuthorityPerpetualTables::open(&path, None)); + let (to_keep, to_delete) = generate_test_data(db.clone(), 3, 2, 1000).unwrap(); + let effects = TransactionEffects { + modified_at_versions: to_delete.into_iter().map(|o| (o.0, o.1)).collect(), + ..Default::default() + }; + let checkpoint_state = CheckpointExecutionState { + effects: vec![effects], + checkpoint_sequence_number: 0, + }; + let pruned = + AuthorityStorePruner::handle_checkpoint(checkpoint_state, &db.objects).unwrap(); + assert_eq!(pruned, 1000); + to_keep + }; + + tokio::time::sleep(Duration::from_secs(3)).await; + assert_eq!( + HashSet::from_iter(to_keep), + get_keys_after_pruning(path).unwrap() + ); + } + async fn test_pruning( primary_path: PathBuf, num_versions_per_object: u64, num_object_versions_to_retain: u64, total_unique_object_ids: u32, ) -> Result { - let mut expected = HashSet::new(); - let total_pruned = { + let (total_pruned, expected) = { // create db - // let primary_path = tempfile::tempdir()?.into_path(); let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None)); // this contains the set of keys that should not have been pruned + let (expected, _) = generate_test_data( + perpetual_db.clone(), + num_versions_per_object, + num_object_versions_to_retain, + total_unique_object_ids, + )?; - let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?; - for id in ids { - for (counter, i) in (0..num_versions_per_object).rev().enumerate() { - let object_key = ObjectKey(id, SequenceNumber::from_u64(i)); - if counter < num_object_versions_to_retain.try_into().unwrap() { - // latest `num_object_versions_to_retain` should not have been pruned - expected.insert(object_key); - } - perpetual_db.objects.insert( - &ObjectKey(id, SequenceNumber::from(i)), - &Object::immutable_with_id_for_testing(id), - )?; - } - } - assert_eq!( - expected.len() as u64, - std::cmp::min(num_object_versions_to_retain, num_versions_per_object) - * total_unique_object_ids as u64 - ); - AuthorityStorePruner::prune_objects( + let total_pruned = AuthorityStorePruner::prune_objects( num_object_versions_to_retain, &perpetual_db.objects, - ) + ); + (total_pruned, expected) }; tokio::time::sleep(Duration::from_secs(3)).await; let after_pruning = get_keys_after_pruning(primary_path)?; - assert_eq!(expected, after_pruning); + assert_eq!(HashSet::from_iter(expected), after_pruning); Ok(total_pruned) } diff --git a/crates/sui-core/src/authority/authority_store_tables.rs b/crates/sui-core/src/authority/authority_store_tables.rs index c7f717964a8e4..43fe8b5946f59 100644 --- a/crates/sui-core/src/authority/authority_store_tables.rs +++ b/crates/sui-core/src/authority/authority_store_tables.rs @@ -212,6 +212,7 @@ fn objects_table_default_config() -> DBOptions { DBOptions { options: db_options.options, rw_options: ReadWriteOptions { + // ignore_range_deletions: !cfg!(msim), ignore_range_deletions: true, }, } diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index 964692759b699..3692b2426a908 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -26,19 +26,22 @@ use std::{ use futures::stream::FuturesOrdered; use itertools::izip; -use mysten_metrics::spawn_monitored_task; +use mysten_metrics::{monitored_scope, spawn_monitored_task}; use prometheus::Registry; use sui_config::node::CheckpointExecutorConfig; use sui_types::committee::{Committee, EpochId}; +use sui_types::error::SuiError; use sui_types::{ base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest}, - error::SuiResult, messages::{TransactionEffects, VerifiedCertificate}, messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint}, }; use tap::TapFallible; use tokio::{ - sync::broadcast::{self, error::RecvError}, + sync::{ + broadcast::{self, error::RecvError}, + mpsc, oneshot, + }, task::JoinHandle, time::timeout, }; @@ -59,7 +62,15 @@ mod metrics; #[cfg(test)] pub(crate) mod tests; -type CheckpointExecutionBuffer = FuturesOrdered>; +#[derive(Debug, Clone)] +pub struct CheckpointExecutionState { + pub effects: Vec, + pub checkpoint_sequence_number: CheckpointSequenceNumber, +} +pub type CheckpointExecutionMessage = (CheckpointExecutionState, oneshot::Sender<()>); + +type CheckpointExecutionBuffer = + FuturesOrdered>; pub struct CheckpointExecutor { mailbox: broadcast::Receiver, @@ -68,6 +79,7 @@ pub struct CheckpointExecutor { tx_manager: Arc, config: CheckpointExecutorConfig, metrics: Arc, + pruner_subscriber: mpsc::Sender, } impl CheckpointExecutor { @@ -78,6 +90,7 @@ impl CheckpointExecutor { tx_manager: Arc, config: CheckpointExecutorConfig, prometheus_registry: &Registry, + pruner_subscriber: mpsc::Sender, ) -> Self { Self { mailbox, @@ -86,6 +99,7 @@ impl CheckpointExecutor { tx_manager, config, metrics: CheckpointExecutorMetrics::new(prometheus_registry), + pruner_subscriber, } } @@ -102,6 +116,7 @@ impl CheckpointExecutor { tx_manager, config: Default::default(), metrics: CheckpointExecutorMetrics::new_for_tests(), + pruner_subscriber: mpsc::channel(2).0, } } @@ -154,8 +169,8 @@ impl CheckpointExecutor { // watermark accordingly. Note that given that checkpoints are guaranteed to // be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered // guarantees that we will also ratchet the watermarks in order. - Some(Ok(checkpoint)) = pending.next() => { - self.process_executed_checkpoint(&checkpoint); + Some(Ok((checkpoint, checkpoint_execution_state))) = pending.next() => { + self.process_executed_checkpoint(&checkpoint, checkpoint_execution_state).await; highest_executed = Some(checkpoint); } // Check for newly synced checkpoints from StateSync. @@ -190,7 +205,12 @@ impl CheckpointExecutor { /// Post processing and plumbing after we executed a checkpoint. This function is guaranteed /// to be called in the order of checkpoint sequence number. - fn process_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) { + async fn process_executed_checkpoint( + &self, + checkpoint: &VerifiedCheckpoint, + execution_state: CheckpointExecutionState, + ) { + let _scope = monitored_scope("ProcessExecutedCheckpoint"); // Ensure that we are not skipping checkpoints at any point let seq = checkpoint.sequence_number(); if let Some(prev_highest) = self @@ -204,6 +224,18 @@ impl CheckpointExecutor { } debug!("Bumping highest_executed_checkpoint watermark to {:?}", seq,); + let (callback_sender, callback_receiver) = oneshot::channel(); + match self + .pruner_subscriber + .send((execution_state, callback_sender)) + .await + { + Ok(_) => callback_receiver + .await + .expect("failed to get callback from pruner"), + Err(err) => error!("no active receivers for checkpoint stream: {:?}", err), + } + self.checkpoint_store .update_highest_executed_checkpoint(checkpoint) .unwrap(); @@ -276,26 +308,29 @@ impl CheckpointExecutor { pending.push_back(spawn_monitored_task!(async move { let epoch_store = epoch_store.clone(); - while let Err(err) = execute_checkpoint( - checkpoint.clone(), - authority_store.clone(), - checkpoint_store.clone(), - &epoch_store, - tx_manager.clone(), - local_execution_timeout_sec, - &metrics, - ) - .await - { - error!( - "Error while executing checkpoint, will retry in 1s: {:?}", - err - ); - tokio::time::sleep(Duration::from_secs(1)).await; - metrics.checkpoint_exec_errors.inc(); + loop { + match execute_checkpoint( + checkpoint.clone(), + authority_store.clone(), + checkpoint_store.clone(), + &epoch_store, + tx_manager.clone(), + local_execution_timeout_sec, + &metrics, + ) + .await + { + Ok(execution_state) => return (checkpoint, execution_state), + Err(err) => { + error!( + "Error while executing checkpoint, will retry in 1s: {:?}", + err + ); + tokio::time::sleep(Duration::from_secs(1)).await; + metrics.checkpoint_exec_errors.inc(); + } + } } - - checkpoint })); } } @@ -334,7 +369,7 @@ pub async fn execute_checkpoint( transaction_manager: Arc, local_execution_timeout_sec: u64, metrics: &Arc, -) -> SuiResult { +) -> Result { debug!( "Scheduling checkpoint {:?} for execution", checkpoint.sequence_number(), @@ -376,7 +411,7 @@ async fn execute_transactions( transaction_manager: Arc, log_timeout_sec: u64, checkpoint_sequence: CheckpointSequenceNumber, -) -> SuiResult { +) -> Result { let all_tx_digests: Vec = execution_digests.iter().map(|tx| tx.transaction).collect(); @@ -468,7 +503,12 @@ async fn execute_transactions( epoch_store.epoch(), checkpoint_sequence, )?; - return Ok(()); + + let execution_state = CheckpointExecutionState { + effects: effects.into_iter().map(|fx| fx.data().clone()).collect(), + checkpoint_sequence_number: checkpoint_sequence, + }; + return Ok(execution_state); } } } diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 208dbf447aa80..8967a9d15c13f 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -56,7 +56,7 @@ use sui_storage::{ use sui_types::committee::Committee; use sui_types::crypto::KeypairTraits; use sui_types::quorum_driver_types::QuorumDriverEffectsQueueResult; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tokio::sync::{watch, Mutex}; use tokio::task::JoinHandle; use tower::ServiceBuilder; @@ -69,6 +69,7 @@ pub use handle::SuiNodeHandle; use narwhal_config::SharedWorkerCache; use narwhal_types::TransactionsClient; use sui_core::authority::authority_per_epoch_store::AuthorityPerEpochStore; +use sui_core::checkpoints::checkpoint_executor::CheckpointExecutionMessage; use sui_core::checkpoints::{ CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync, SubmitCheckpointToConsensus, @@ -145,6 +146,7 @@ impl SuiNode { &genesis_committee, None, )); + let (checkpoint_sender, checkpoint_receiver) = mpsc::channel(10); let store = Arc::new( AuthorityStore::open( &config.db_path().join("store"), @@ -152,6 +154,7 @@ impl SuiNode { genesis, &committee_store, &config.authority_store_pruning_config, + checkpoint_receiver, ) .await?, ); @@ -307,7 +310,9 @@ impl SuiNode { info!("SuiNode started!"); let node = Arc::new(node); let node_copy = node.clone(); - spawn_monitored_task!(async move { Self::monitor_reconfiguration(node_copy).await }); + spawn_monitored_task!(async move { + Self::monitor_reconfiguration(node_copy, checkpoint_sender).await + }); Ok(node) } @@ -721,7 +726,10 @@ impl SuiNode { /// This function waits for a signal from the checkpoint executor to indicate that on-chain /// epoch has changed. Upon receiving such signal, we reconfigure the entire system. - pub async fn monitor_reconfiguration(self: Arc) -> Result<()> { + pub async fn monitor_reconfiguration( + self: Arc, + checkpoint_sender: mpsc::Sender, + ) -> Result<()> { let mut checkpoint_executor = CheckpointExecutor::new( self.state_sync.subscribe_to_synced_checkpoints(), self.checkpoint_store.clone(), @@ -729,6 +737,7 @@ impl SuiNode { self.state.transaction_manager().clone(), self.config.checkpoint_executor_config.clone(), &self.registry_service.default_registry(), + checkpoint_sender, ); loop { diff --git a/crates/sui/tests/full_node_tests.rs b/crates/sui/tests/full_node_tests.rs index 8ad55af94cd3a..0d184e60a0030 100644 --- a/crates/sui/tests/full_node_tests.rs +++ b/crates/sui/tests/full_node_tests.rs @@ -1145,7 +1145,7 @@ async fn get_past_obj_read_from_node( #[sim_test] async fn test_get_objects_read() -> Result<(), anyhow::Error> { telemetry_subscribers::init_for_testing(); - let mut test_cluster = TestClusterBuilder::new().build().await?; + let mut test_cluster = TestClusterBuilder::new().disable_pruning().build().await?; let node = test_cluster.fullnode_handle.sui_node.clone(); let context = &mut test_cluster.wallet; diff --git a/crates/test-utils/src/network.rs b/crates/test-utils/src/network.rs index d833e67ec2d78..66d751268f8f3 100644 --- a/crates/test-utils/src/network.rs +++ b/crates/test-utils/src/network.rs @@ -173,6 +173,7 @@ pub struct TestClusterBuilder { fullnode_rpc_port: Option, enable_fullnode_events: bool, epoch_duration_ms: Option, + enable_pruning: bool, } impl TestClusterBuilder { @@ -183,6 +184,7 @@ impl TestClusterBuilder { num_validators: None, enable_fullnode_events: false, epoch_duration_ms: None, + enable_pruning: true, } } @@ -211,6 +213,11 @@ impl TestClusterBuilder { self } + pub fn disable_pruning(mut self) -> Self { + self.enable_pruning = false; + self + } + pub async fn build(self) -> anyhow::Result { let cluster = self.start_test_network_with_customized_ports().await?; Ok(cluster) @@ -230,6 +237,7 @@ impl TestClusterBuilder { .fullnode_config_builder() .set_event_store(self.enable_fullnode_events) .set_rpc_port(self.fullnode_rpc_port) + .set_enable_pruner(self.enable_pruning) .build() .unwrap();