Skip to content

Commit

Permalink
[Storage][Pruner] Various fixes to ledger pruning to enable it in pro…
Browse files Browse the repository at this point in the history
…duction

Closes: aptos-labs#778
  • Loading branch information
sitalkedia authored and aptos-bot committed May 13, 2022
1 parent 748b823 commit 61b7665
Show file tree
Hide file tree
Showing 19 changed files with 151 additions and 112 deletions.
2 changes: 1 addition & 1 deletion config/src/config/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Default for StorageConfig {
storage_pruner_config: StoragePrunerConfig {
state_store_prune_window: Some(1_000_000),
ledger_prune_window: Some(10_000_000),
pruning_batch_size: 10_000,
pruning_batch_size: 500,
},
data_dir: PathBuf::from("/opt/aptos/data"),
// Default read/write/connection timeout, in milliseconds
Expand Down
8 changes: 3 additions & 5 deletions execution/executor-benchmark/src/db_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use aptos_jellyfish_merkle::metrics::{
APTOS_JELLYFISH_STORAGE_READS,
};
use aptos_vm::AptosVM;
use aptosdb::{
metrics::APTOS_STORAGE_ROCKSDB_PROPERTIES, schema::JELLYFISH_MERKLE_NODE_CF_NAME, AptosDB,
};
use aptosdb::{metrics::ROCKSDB_PROPERTIES, schema::JELLYFISH_MERKLE_NODE_CF_NAME, AptosDB};
use executor::{
block_executor::BlockExecutor,
db_bootstrapper::{generate_waypoint, maybe_bootstrap},
Expand Down Expand Up @@ -113,13 +111,13 @@ pub fn run(
generator.write_meta(&db_dir);

db.update_rocksdb_properties().unwrap();
let db_size = APTOS_STORAGE_ROCKSDB_PROPERTIES
let db_size = ROCKSDB_PROPERTIES
.with_label_values(&[
JELLYFISH_MERKLE_NODE_CF_NAME,
"aptos_rocksdb_live_sst_files_size_bytes",
])
.get();
let data_size = APTOS_STORAGE_ROCKSDB_PROPERTIES
let data_size = ROCKSDB_PROPERTIES
.with_label_values(&[
JELLYFISH_MERKLE_NODE_CF_NAME,
"aptos_rocksdb_total-sst-files-size",
Expand Down
6 changes: 3 additions & 3 deletions execution/executor-benchmark/src/transaction_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aptos_types::{
transaction::Version,
};
use aptos_vm::AptosVM;
use aptosdb::metrics::APTOS_STORAGE_API_LATENCY_SECONDS;
use aptosdb::metrics::API_LATENCY_SECONDS;
use executor::{
block_executor::BlockExecutor,
metrics::{
Expand Down Expand Up @@ -119,7 +119,7 @@ fn report_block(
APTOS_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum(),
APTOS_EXECUTOR_EXECUTE_BLOCK_SECONDS.get_sample_sum() - APTOS_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.get_sample_sum(),
APTOS_EXECUTOR_COMMIT_BLOCKS_SECONDS.get_sample_sum(),
APTOS_STORAGE_API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum(),
API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum(),
);
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
info!(
Expand All @@ -130,7 +130,7 @@ fn report_block(
/ total_versions,
APTOS_EXECUTOR_COMMIT_BLOCKS_SECONDS.get_sample_sum() * NANOS_PER_SEC
/ total_versions,
APTOS_STORAGE_API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum() * NANOS_PER_SEC
API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum() * NANOS_PER_SEC
/ total_versions,
);
}
4 changes: 0 additions & 4 deletions state-sync/state-sync-v2/state-sync-driver/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@ mock! {
fetch_events: bool,
) -> Result<TransactionWithProof>;

fn get_first_txn_version(&self) -> Result<Option<Version>>;

fn get_first_write_set_version(&self) -> Result<Option<Version>>;

fn get_transaction_outputs(
&self,
start_version: Version,
Expand Down
4 changes: 2 additions & 2 deletions storage/aptosdb/src/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
get_first_seq_num_and_limit, schema::jellyfish_merkle_node::JellyfishMerkleNodeSchema,
test_helper, test_helper::arb_blocks_to_commit, AptosDB, APTOS_STORAGE_ROCKSDB_PROPERTIES,
test_helper, test_helper::arb_blocks_to_commit, AptosDB, ROCKSDB_PROPERTIES,
};
use aptos_crypto::{
hash::{CryptoHash, SPARSE_MERKLE_PLACEHOLDER_HASH},
Expand Down Expand Up @@ -129,7 +129,7 @@ fn test_get_latest_tree_state() {
#[test]
fn test_rocksdb_properties_reporter() {
fn get_metric() -> i64 {
APTOS_STORAGE_ROCKSDB_PROPERTIES
ROCKSDB_PROPERTIES
.get_metric_with_label_values(&[
"transaction_info",
"aptos_rocksdb_is-file-deletions-enabled",
Expand Down
40 changes: 34 additions & 6 deletions storage/aptosdb/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,19 +392,47 @@ impl EventStore {
}

/// Prunes events by accumulator store for a range of version in [begin, end)
pub fn prune_event_accumulator(
fn prune_event_accumulator(
&self,
begin: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
db_batch.delete_range::<EventAccumulatorSchema>(
&(begin, Position::from_inorder_index(0)),
&(end, Position::from_inorder_index(0)),
)
let mut iter = self.db.iter::<EventAccumulatorSchema>(Default::default())?;
iter.seek(&(begin, Position::from_inorder_index(0)))?;
while let Some(((version, position), _)) = iter.next().transpose()? {
if version >= end {
return Ok(());
}
db_batch.delete::<EventAccumulatorSchema>(&(version, position))?;
}
Ok(())
}

/// Prune a set of candidate events in the range of version in [begin, end) and all related indices
pub fn prune_events(
&self,
start: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
let mut current_version = start;
for events in self.get_events_by_version_iter(start, (end - start) as usize)? {
for (current_index, event) in (events?).into_iter().enumerate() {
db_batch.delete::<EventByVersionSchema>(&(
*event.key(),
current_version as u64,
event.sequence_number(),
))?;
db_batch.delete::<EventByKeySchema>(&(*event.key(), event.sequence_number()))?;
db_batch.delete::<EventSchema>(&(current_version as u64, current_index as u64))?;
}
current_version += 1;
}
self.prune_event_accumulator(start, end, db_batch)?;
Ok(())
}

/// Prunes events by version store for a set of events in version range [begin, end)
pub fn prune_events_by_version(
&self,
event_keys: HashSet<EventKey>,
Expand Down
4 changes: 2 additions & 2 deletions storage/aptosdb/src/ledger_counters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::APTOS_STORAGE_LEDGER;
use crate::metrics::LEDGER_COUNTER;
use num_derive::ToPrimitive;
use num_traits::ToPrimitive;
use num_variants::NumVariants;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl LedgerCounters {
/// Bump Prometheus counters.
pub fn bump_op_counters(&self) {
for counter in &LedgerCounter::VARIANTS {
APTOS_STORAGE_LEDGER
LEDGER_COUNTER
.with_label_values(&[counter.name()])
.set(self.get(*counter) as i64);
}
Expand Down
6 changes: 4 additions & 2 deletions storage/aptosdb/src/ledger_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,15 @@ impl LedgerStore {
}

/// Prune the ledger counters stored in DB in the range [being, end)
pub fn prune_ledger_couners(
pub fn prune_ledger_counters(
&self,
begin: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
db_batch.delete_range::<LedgerCountersSchema>(&begin, &end)?;
for version in begin..end {
db_batch.delete::<LedgerCountersSchema>(&version)?;
}
Ok(())
}
}
Expand Down
39 changes: 23 additions & 16 deletions storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ use crate::{
ledger_counters::LedgerCounters,
ledger_store::LedgerStore,
metrics::{
APTOS_STORAGE_API_LATENCY_SECONDS, APTOS_STORAGE_COMMITTED_TXNS,
APTOS_STORAGE_LATEST_ACCOUNT_COUNT, APTOS_STORAGE_LATEST_TXN_VERSION,
APTOS_STORAGE_LEDGER_VERSION, APTOS_STORAGE_NEXT_BLOCK_EPOCH,
APTOS_STORAGE_OTHER_TIMERS_SECONDS, APTOS_STORAGE_ROCKSDB_PROPERTIES,
API_LATENCY_SECONDS, COMMITTED_TXNS, LATEST_TXN_VERSION, LEDGER_VERSION, NEXT_BLOCK_EPOCH,
OTHER_TIMERS_SECONDS, ROCKSDB_PROPERTIES, STATE_ITEM_COUNT,
},
pruner::{utils, Pruner},
schema::*,
Expand Down Expand Up @@ -153,12 +151,12 @@ fn gen_rocksdb_options(config: &RocksdbConfig) -> Options {
}

fn update_rocksdb_properties(db: &DB) -> Result<()> {
let _timer = APTOS_STORAGE_OTHER_TIMERS_SECONDS
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["update_rocksdb_properties"])
.start_timer();
for cf_name in AptosDB::column_families() {
for (rockdb_property_name, aptos_rocksdb_property_name) in &*ROCKSDB_PROPERTY_MAP {
APTOS_STORAGE_ROCKSDB_PROPERTIES
ROCKSDB_PROPERTIES
.with_label_values(&[cf_name, aptos_rocksdb_property_name])
.set(db.get_property(cf_name, rockdb_property_name)? as i64);
}
Expand Down Expand Up @@ -576,7 +574,7 @@ impl AptosDB {

// Account state updates. Gather account state root hashes
{
let _timer = APTOS_STORAGE_OTHER_TIMERS_SECONDS
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["save_transactions_state"])
.start_timer();

Expand All @@ -595,7 +593,7 @@ impl AptosDB {

// Event updates. Gather event accumulator root hashes.
{
let _timer = APTOS_STORAGE_OTHER_TIMERS_SECONDS
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["save_transactions_events"])
.start_timer();
zip_eq(first_version..=last_version, txns_to_commit)
Expand All @@ -606,7 +604,7 @@ impl AptosDB {
}

let new_root_hash = {
let _timer = APTOS_STORAGE_OTHER_TIMERS_SECONDS
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["save_transactions_txn_infos"])
.start_timer();
zip_eq(first_version..=last_version, txns_to_commit).try_for_each(
Expand Down Expand Up @@ -1211,6 +1209,15 @@ impl DbReader for AptosDB {
.map(|x| x.get_state_store_pruner_window() as usize))
})
}

fn get_ledger_prune_window(&self) -> Result<Option<usize>> {
gauged_api("get_ledger_prune_window", || {
Ok(self
.pruner
.as_ref()
.map(|x| x.get_ledger_pruner_window() as usize))
})
}
}

impl DbWriter for AptosDB {
Expand Down Expand Up @@ -1277,7 +1284,7 @@ impl DbWriter for AptosDB {
// Persist.
let (sealed_cs, counters) = self.seal_change_set(first_version, num_txns, cs)?;
{
let _timer = APTOS_STORAGE_OTHER_TIMERS_SECONDS
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["save_transactions_commit"])
.start_timer();
self.commit(sealed_cs)?;
Expand All @@ -1288,13 +1295,13 @@ impl DbWriter for AptosDB {
if num_txns > 0 {
let last_version = first_version + num_txns - 1;
self.state_store.set_latest_version(last_version);
APTOS_STORAGE_COMMITTED_TXNS.inc_by(num_txns);
APTOS_STORAGE_LATEST_TXN_VERSION.set(last_version as i64);
COMMITTED_TXNS.inc_by(num_txns);
LATEST_TXN_VERSION.set(last_version as i64);
counters
.expect("Counters should be bumped with transactions being saved.")
.bump_op_counters();
// -1 for "not fully migrated", -2 for "error on get_account_count()"
APTOS_STORAGE_LATEST_ACCOUNT_COUNT.set(
STATE_ITEM_COUNT.set(
self.state_store
.get_value_count(last_version)
.map_or(-1, |c| c as i64),
Expand All @@ -1307,8 +1314,8 @@ impl DbWriter for AptosDB {
if let Some(x) = ledger_info_with_sigs {
self.ledger_store.set_latest_ledger_info(x.clone());

APTOS_STORAGE_LEDGER_VERSION.set(x.ledger_info().version() as i64);
APTOS_STORAGE_NEXT_BLOCK_EPOCH.set(x.ledger_info().next_block_epoch() as i64);
LEDGER_VERSION.set(x.ledger_info().version() as i64);
NEXT_BLOCK_EPOCH.set(x.ledger_info().next_block_epoch() as i64);
}

Ok(())
Expand Down Expand Up @@ -1461,7 +1468,7 @@ where
"Err"
}
};
APTOS_STORAGE_API_LATENCY_SECONDS
API_LATENCY_SECONDS
.with_label_values(&[api_name, res_type])
.observe(timer.elapsed().as_secs_f64());

Expand Down
Loading

0 comments on commit 61b7665

Please sign in to comment.