Skip to content

Commit

Permalink
[storage][pruner] Separete ledger and state pruners (aptos-labs#2360)
Browse files Browse the repository at this point in the history
  • Loading branch information
zcchahaha authored Aug 2, 2022
1 parent abda78c commit cf590a0
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 475 deletions.
36 changes: 21 additions & 15 deletions storage/aptosdb/src/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use proptest::prelude::*;

use crate::{
error_if_version_is_pruned, get_first_seq_num_and_limit,
pruner::{Pruner, PrunerIndex},
pruner::{
ledger_pruner_manager::LedgerPrunerManager, state_pruner_manager::StatePrunerManager,
},
test_helper,
test_helper::{arb_blocks_to_commit, put_as_state_root, put_transaction_info},
AptosDB, ROCKSDB_PROPERTIES,
AptosDB, PrunerManager, ROCKSDB_PROPERTIES,
};
use aptos_config::config::StoragePrunerConfig;
use aptos_crypto::{hash::CryptoHash, HashValue};
Expand Down Expand Up @@ -83,8 +85,7 @@ fn test_too_many_requested() {
fn test_error_if_version_is_pruned() {
let tmp_dir = TempPath::new();
let aptos_db = AptosDB::new_for_test(&tmp_dir);
let mut pruner = Pruner::new(
Arc::clone(&aptos_db.ledger_db),
let mut state_pruner = StatePrunerManager::new(
Arc::clone(&aptos_db.state_merkle_db),
StoragePrunerConfig {
state_store_prune_window: Some(0),
Expand All @@ -93,27 +94,32 @@ fn test_error_if_version_is_pruned() {
state_store_pruning_batch_size: 1,
},
);
pruner.testonly_update_min_version(&[Some(5), Some(10)]);
let pruner = Some(pruner);

let mut ledger_pruner = LedgerPrunerManager::new(
Arc::clone(&aptos_db.ledger_db),
StoragePrunerConfig {
state_store_prune_window: Some(0),
ledger_prune_window: Some(0),
ledger_pruning_batch_size: 1,
state_store_pruning_batch_size: 1,
},
);
state_pruner.testonly_update_min_version(Some(5));
ledger_pruner.testonly_update_min_version(Some(10));
assert_eq!(
error_if_version_is_pruned(&pruner, PrunerIndex::StateStorePrunerIndex, "State", 4)
error_if_version_is_pruned(&state_pruner, "State", 4)
.unwrap_err()
.to_string(),
"State version 4 is pruned, min available version is 5."
);
assert!(
error_if_version_is_pruned(&pruner, PrunerIndex::StateStorePrunerIndex, "State", 5).is_ok()
);
assert!(error_if_version_is_pruned(&state_pruner, "State", 5).is_ok());
assert_eq!(
error_if_version_is_pruned(&pruner, PrunerIndex::LedgerPrunerIndex, "Transaction", 9)
error_if_version_is_pruned(&ledger_pruner, "Transaction", 9)
.unwrap_err()
.to_string(),
"Transaction version 9 is pruned, min available version is 10."
);
assert!(
error_if_version_is_pruned(&pruner, PrunerIndex::LedgerPrunerIndex, "Transaction", 10)
.is_ok()
);
assert!(error_if_version_is_pruned(&ledger_pruner, "Transaction", 10).is_ok());
}

#[test]
Expand Down
135 changes: 61 additions & 74 deletions storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ use crate::{
API_LATENCY_SECONDS, COMMITTED_TXNS, LATEST_TXN_VERSION, LEDGER_VERSION, NEXT_BLOCK_EPOCH,
OTHER_TIMERS_SECONDS, ROCKSDB_PROPERTIES, STATE_ITEM_COUNT,
},
pruner::{utils, Pruner, PrunerIndex},
pruner::pruner_manager::PrunerManager,
pruner::utils,
schema::*,
state_store::StateStore,
system_store::SystemStore,
Expand Down Expand Up @@ -104,6 +105,9 @@ use std::{
thread::JoinHandle,
time::{Duration, Instant},
};

use crate::pruner::ledger_pruner_manager::LedgerPrunerManager;
use crate::pruner::state_pruner_manager::StatePrunerManager;
use storage_interface::state_view::DbStateView;
use storage_interface::{
state_delta::StateDelta, DbReader, DbWriter, ExecutedTrees, Order, StartupInfo,
Expand Down Expand Up @@ -167,23 +171,18 @@ fn error_if_too_many_requested(num_requested: u64, max_allowed: u64) -> Result<(
}

fn error_if_version_is_pruned(
pruner: &Option<Pruner>,
pruner_index: PrunerIndex,
pruner: &(dyn PrunerManager),
data_type: &str,
version: Version,
) -> Result<()> {
if let Some(pruner) = pruner.as_ref() {
if let Some(min_readable_version) =
pruner.get_min_readable_version_by_pruner_index(pruner_index)
{
ensure!(
version >= min_readable_version,
"{} version {} is pruned, min available version is {}.",
data_type,
version,
min_readable_version
);
}
if let Some(min_readable_version) = pruner.get_min_readable_version() {
ensure!(
version >= min_readable_version,
"{} version {} is pruned, min available version is {}.",
data_type,
version,
min_readable_version
);
}
Ok(())
}
Expand Down Expand Up @@ -265,7 +264,8 @@ pub struct AptosDB {
system_store: Arc<SystemStore>,
transaction_store: Arc<TransactionStore>,
pruner_config: StoragePrunerConfig,
pruner: Option<Pruner>,
state_pruner: Option<StatePrunerManager>,
ledger_pruner: Option<LedgerPrunerManager>,
_rocksdb_property_reporter: RocksdbPropertyReporter,
ledger_commit_lock: std::sync::Mutex<()>,
indexer: Option<Indexer>,
Expand All @@ -282,17 +282,22 @@ impl AptosDB {
let arc_ledger_rocksdb = Arc::new(ledger_rocksdb);
let arc_state_merkle_rocksdb = Arc::new(state_merkle_rocksdb);
let pruner_config = storage_pruner_config;
let pruner = if pruner_config.ledger_prune_window.is_none()
&& pruner_config.state_store_prune_window.is_none()
{
let state_pruner = if pruner_config.state_store_prune_window.is_none() {
None
} else {
Some(Pruner::new(
Arc::clone(&arc_ledger_rocksdb),
Some(StatePrunerManager::new(
Arc::clone(&arc_state_merkle_rocksdb),
pruner_config,
))
};
let ledger_pruner = if pruner_config.ledger_prune_window.is_none() {
None
} else {
Some(LedgerPrunerManager::new(
Arc::clone(&arc_ledger_rocksdb),
pruner_config,
))
};

AptosDB {
ledger_db: Arc::clone(&arc_ledger_rocksdb),
Expand All @@ -308,7 +313,8 @@ impl AptosDB {
system_store: Arc::new(SystemStore::new(Arc::clone(&arc_ledger_rocksdb))),
transaction_store: Arc::new(TransactionStore::new(Arc::clone(&arc_ledger_rocksdb))),
pruner_config,
pruner,
state_pruner,
ledger_pruner,
_rocksdb_property_reporter: RocksdbPropertyReporter::new(
Arc::clone(&arc_ledger_rocksdb),
Arc::clone(&arc_state_merkle_rocksdb),
Expand Down Expand Up @@ -584,12 +590,9 @@ impl AptosDB {
ledger_version: Version,
fetch_events: bool,
) -> Result<TransactionWithProof> {
error_if_version_is_pruned(
&self.pruner,
PrunerIndex::LedgerPrunerIndex,
"Transaction",
version,
)?;
if let Some(ledger_pruner) = &self.ledger_pruner {
error_if_version_is_pruned(ledger_pruner, "Transaction", version)?;
}
let proof = self
.ledger_store
.get_transaction_info_with_proof(version, ledger_version)?;
Expand Down Expand Up @@ -797,7 +800,10 @@ impl AptosDB {
}

fn wake_pruner(&self, latest_version: Version) {
if let Some(pruner) = self.pruner.as_ref() {
if let Some(pruner) = self.state_pruner.as_ref() {
pruner.maybe_wake_pruner(latest_version)
}
if let Some(pruner) = self.ledger_pruner.as_ref() {
pruner.maybe_wake_pruner(latest_version)
}
}
Expand Down Expand Up @@ -947,13 +953,9 @@ impl DbReader for AptosDB {
if start_version > ledger_version || limit == 0 {
return Ok(TransactionListWithProof::new_empty());
}

error_if_version_is_pruned(
&self.pruner,
PrunerIndex::LedgerPrunerIndex,
"Transaction",
start_version,
)?;
if let Some(ledger_pruner) = &self.ledger_pruner {
error_if_version_is_pruned(ledger_pruner, "Transaction", start_version)?;
}

let limit = std::cmp::min(limit, ledger_version - start_version + 1);

Expand Down Expand Up @@ -993,9 +995,9 @@ impl DbReader for AptosDB {
/// Get the first version that txn starts existent.
fn get_first_txn_version(&self) -> Result<Option<Version>> {
gauged_api("get_first_txn_version", || {
if let Some(pruner) = self.pruner.as_ref() {
if let Some(pruner) = self.ledger_pruner.as_ref() {
// If pruning is enabled, we can get the min readable version from the pruner.
Ok(pruner.get_min_readable_ledger_version())
Ok(pruner.get_min_readable_version())
} else {
self.transaction_store.get_first_txn_version()
}
Expand All @@ -1005,9 +1007,9 @@ impl DbReader for AptosDB {
/// Get the first version that write set starts existent.
fn get_first_write_set_version(&self) -> Result<Option<Version>> {
gauged_api("get_first_write_set_version", || {
if let Some(pruner) = self.pruner.as_ref() {
if let Some(pruner) = self.ledger_pruner.as_ref() {
// If pruning is enabled, we can get the min readable version from the pruner.
Ok(pruner.get_min_readable_ledger_version())
Ok(pruner.get_min_readable_version())
} else {
self.transaction_store.get_first_write_set_version()
}
Expand All @@ -1033,12 +1035,9 @@ impl DbReader for AptosDB {
return Ok(TransactionOutputListWithProof::new_empty());
}

error_if_version_is_pruned(
&self.pruner,
PrunerIndex::LedgerPrunerIndex,
"Transaction",
start_version,
)?;
if let Some(ledger_pruner) = &self.ledger_pruner {
error_if_version_is_pruned(ledger_pruner, "Transaction", start_version)?;
}

let limit = std::cmp::min(limit, ledger_version - start_version + 1);

Expand Down Expand Up @@ -1086,12 +1085,9 @@ impl DbReader for AptosDB {
end_version: Version,
) -> Result<Vec<WriteSet>> {
gauged_api("get_write_sets", || {
error_if_version_is_pruned(
&self.pruner,
PrunerIndex::LedgerPrunerIndex,
"Write set",
begin_version,
)?;
if let Some(ledger_pruner) = &self.ledger_pruner {
error_if_version_is_pruned(ledger_pruner, "Write set", begin_version)?;
}

self.transaction_store
.get_write_sets(begin_version, end_version)
Expand Down Expand Up @@ -1157,12 +1153,9 @@ impl DbReader for AptosDB {
version: Version,
) -> Result<Option<StateValue>> {
gauged_api("get_state_value_by_version", || {
error_if_version_is_pruned(
&self.pruner,
PrunerIndex::StateStorePrunerIndex,
"State",
version,
)?;
if let Some(state_pruner) = &self.state_pruner {
error_if_version_is_pruned(state_pruner, "State", version)?;
}

self.state_store
.get_state_value_by_version(state_store_key, version)
Expand All @@ -1176,12 +1169,9 @@ impl DbReader for AptosDB {
version: Version,
) -> Result<SparseMerkleProof> {
gauged_api("get_proof_by_version", || {
error_if_version_is_pruned(
&self.pruner,
PrunerIndex::StateStorePrunerIndex,
"State",
version,
)?;
if let Some(state_pruner) = &self.state_pruner {
error_if_version_is_pruned(state_pruner, "State", version)?;
}

self.state_store
.get_state_proof_by_version(state_key, version)
Expand Down Expand Up @@ -1244,12 +1234,9 @@ impl DbReader for AptosDB {
version: Version,
) -> Result<(Option<StateValue>, SparseMerkleProof)> {
gauged_api("get_state_value_with_proof_by_version", || {
error_if_version_is_pruned(
&self.pruner,
PrunerIndex::StateStorePrunerIndex,
"State",
version,
)?;
if let Some(state_pruner) = &self.state_pruner {
error_if_version_is_pruned(state_pruner, "State", version)?;
}

self.state_store
.get_state_value_with_proof_by_version(state_store_key, version)
Expand Down Expand Up @@ -1379,8 +1366,8 @@ impl DbReader for AptosDB {
fn get_state_prune_window(&self) -> Result<Option<usize>> {
gauged_api("get_state_prune_window", || {
let mut pruner_window = None;
if let Some(pruner) = self.pruner.as_ref() {
if let Some(window) = pruner.get_state_store_pruner_window() {
if let Some(pruner) = self.state_pruner.as_ref() {
if let Some(window) = pruner.get_pruner_window() {
pruner_window = Some(window as usize);
}
}
Expand All @@ -1391,8 +1378,8 @@ impl DbReader for AptosDB {
fn get_ledger_prune_window(&self) -> Result<Option<usize>> {
gauged_api("get_ledger_prune_window", || {
let mut pruner_window = None;
if let Some(pruner) = self.pruner.as_ref() {
if let Some(window) = pruner.get_ledger_pruner_window() {
if let Some(pruner) = self.ledger_pruner.as_ref() {
if let Some(window) = pruner.get_pruner_window() {
pruner_window = Some(window as usize);
}
}
Expand Down
Loading

0 comments on commit cf590a0

Please sign in to comment.