Skip to content

Commit

Permalink
[Storage][Pruner] Batch pruning support
Browse files Browse the repository at this point in the history
  • Loading branch information
sitalkedia authored and gregnazario committed Apr 27, 2022
1 parent 9ca378b commit 203c831
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 339 deletions.
25 changes: 12 additions & 13 deletions config/src/config/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ pub struct StorageConfig {

pub const NO_OP_STORAGE_PRUNER_CONFIG: StoragePrunerConfig = StoragePrunerConfig {
state_store_prune_window: None,
default_prune_window: None,
max_version_to_prune_per_batch: Some(100),
ledger_prune_window: None,
pruning_batch_size: 10_000,
};

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
Expand All @@ -63,23 +63,22 @@ pub struct StoragePrunerConfig {
/// This is the default pruning window for any other store except for state store. State store
/// being big in size, we might want to configure a smaller window for state store vs other
/// store.
pub default_prune_window: Option<u64>,

/// Maximum version to prune per batch, should not be too large to avoid spike in disk IO caused
/// by large batches in the pruner.
pub max_version_to_prune_per_batch: Option<u64>,
pub ledger_prune_window: Option<u64>,
/// Batch size of the versions to be sent to the pruner - this is to avoid slowdown due to
/// issuing too many DB calls and batch prune instead.
pub pruning_batch_size: usize,
}

impl StoragePrunerConfig {
pub fn new(
state_store_prune_window: Option<u64>,
default_store_prune_window: Option<u64>,
max_version_to_prune_per_batch: Option<u64>,
ledger_store_prune_window: Option<u64>,
pruning_batch_size: usize,
) -> Self {
StoragePrunerConfig {
state_store_prune_window,
default_prune_window: default_store_prune_window,
max_version_to_prune_per_batch,
ledger_prune_window: ledger_store_prune_window,
pruning_batch_size,
}
}
}
Expand All @@ -99,8 +98,8 @@ impl Default for StorageConfig {
// depending on the size of an average account blob.
storage_pruner_config: StoragePrunerConfig {
state_store_prune_window: Some(1_000_000),
default_prune_window: Some(10_000_000),
max_version_to_prune_per_batch: Some(100),
ledger_prune_window: Some(10_000_000),
pruning_batch_size: 10_000,
},
data_dir: PathBuf::from("/opt/aptos/data"),
// Default read/write/connection timeout, in milliseconds
Expand Down
6 changes: 1 addition & 5 deletions execution/executor-benchmark/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ enum Command {

#[structopt(long)]
default_store_prune_window: Option<u64>,

#[structopt(long)]
max_version_to_prune_per_batch: Option<u64>,
},
RunExecutor {
#[structopt(
Expand Down Expand Up @@ -77,7 +74,6 @@ fn main() {
init_account_balance,
state_store_prune_window,
default_store_prune_window,
max_version_to_prune_per_batch,
} => {
executor_benchmark::db_generator::run(
num_accounts,
Expand All @@ -87,7 +83,7 @@ fn main() {
StoragePrunerConfig::new(
Some(state_store_prune_window.unwrap_or(1_000_000)),
Some(default_store_prune_window.unwrap_or(10_000_000)),
Some(max_version_to_prune_per_batch.unwrap_or(100)),
10_000,
),
);
}
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ impl AptosDB {

fn wake_pruner(&self, latest_version: Version) {
if let Some(pruner) = self.pruner.as_ref() {
pruner.wake(latest_version)
pruner.maybe_wake_pruner(latest_version)
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions storage/aptosdb/src/pruner/db_sub_pruner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use schemadb::SchemaBatch;

/// Defines the trait for sub-pruner of a parent DB pruner
pub trait DBSubPruner {
/// Performs the actual pruning, a target version is passed, which is the target the pruner
/// tries to prune.
fn prune(
&self,
db_batch: &mut SchemaBatch,
least_readable_version: u64,
target_version: u64,
) -> anyhow::Result<()>;
}
98 changes: 22 additions & 76 deletions storage/aptosdb/src/pruner/event_store/event_store_pruner.rs
Original file line number Diff line number Diff line change
@@ -1,108 +1,54 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0
use crate::{
event::EventSchema, metrics::APTOS_PRUNER_LEAST_READABLE_VERSION, pruner::db_pruner::DBPruner,
EventStore,
};
use aptos_types::{
contract_event::ContractEvent,
event::EventKey,
transaction::{AtomicVersion, Version},
};
use crate::{pruner::db_sub_pruner::DBSubPruner, EventStore};
use aptos_types::{contract_event::ContractEvent, event::EventKey, transaction::Version};
use itertools::Itertools;
use schemadb::{ReadOptions, SchemaBatch, DB};
use std::{
collections::HashSet,
sync::{atomic::Ordering, Arc},
};

pub const EVENT_STORE_PRUNER_NAME: &str = "event store pruner";
use schemadb::SchemaBatch;
use std::{collections::HashSet, sync::Arc};

pub struct EventStorePruner {
db: Arc<DB>,
event_store: Arc<EventStore>,
/// Keeps track of the target version that the pruner needs to achieve.
target_version: AtomicVersion,
least_readable_version: AtomicVersion,
}

impl DBPruner for EventStorePruner {
fn name(&self) -> &'static str {
EVENT_STORE_PRUNER_NAME
}

fn prune(&self, db_batch: &mut SchemaBatch, max_versions: u64) -> anyhow::Result<Version> {
// Current target version might be less than the target version to ensure we don't prune
// more than max_version in one go.
let current_target_version = self.get_currrent_batch_target(max_versions);
let candidate_events = self
.get_pruning_candidate_events(self.least_readable_version(), current_target_version)?;
impl DBSubPruner for EventStorePruner {
fn prune(
&self,
db_batch: &mut SchemaBatch,
least_readable_version: u64,
target_version: u64,
) -> anyhow::Result<()> {
let candidate_events =
self.get_pruning_candidate_events(least_readable_version, target_version)?;

let event_keys: HashSet<EventKey> =
candidate_events.iter().map(|event| *event.key()).collect();

self.event_store.prune_events_by_version(
event_keys,
self.least_readable_version(),
current_target_version,
least_readable_version,
target_version,
db_batch,
)?;

self.event_store
.prune_events_by_key(&candidate_events, db_batch)?;

self.event_store.prune_event_accumulator(
self.least_readable_version(),
current_target_version,
least_readable_version,
target_version,
db_batch,
)?;

self.event_store.prune_event_schema(
self.least_readable_version(),
current_target_version,
db_batch,
)?;

self.record_progress(current_target_version);
Ok(current_target_version)
}

fn initialize_least_readable_version(&self) -> anyhow::Result<Version> {
let mut iter = self.db.iter::<EventSchema>(ReadOptions::default())?;
iter.seek_to_first();
let version = iter.next().transpose()?.map_or(0, |(key, _)| key.0);
Ok(version)
}

fn least_readable_version(&self) -> Version {
self.least_readable_version.load(Ordering::Relaxed)
}

fn set_target_version(&self, target_version: Version) {
self.target_version.store(target_version, Ordering::Relaxed)
}

fn target_version(&self) -> Version {
self.target_version.load(Ordering::Relaxed)
}
self.event_store
.prune_event_schema(least_readable_version, target_version, db_batch)?;

fn record_progress(&self, least_readable_version: Version) {
self.least_readable_version
.store(least_readable_version, Ordering::Relaxed);
APTOS_PRUNER_LEAST_READABLE_VERSION
.with_label_values(&["event_store"])
.set(least_readable_version as i64);
Ok(())
}
}

impl EventStorePruner {
pub(in crate::pruner) fn new(db: Arc<DB>, event_store: Arc<EventStore>) -> Self {
EventStorePruner {
db,
event_store,
target_version: AtomicVersion::new(0),
least_readable_version: AtomicVersion::new(0),
}
pub(in crate::pruner) fn new(event_store: Arc<EventStore>) -> Self {
EventStorePruner { event_store }
}

fn get_pruning_candidate_events(
Expand Down
6 changes: 3 additions & 3 deletions storage/aptosdb/src/pruner/event_store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ fn verify_event_store_pruner(events: Vec<Vec<ContractEvent>>) {
Arc::clone(&aptos_db.db),
StoragePrunerConfig {
state_store_prune_window: Some(0),
default_prune_window: Some(0),
max_version_to_prune_per_batch: Some(100),
ledger_prune_window: Some(0),
pruning_batch_size: 1,
},
Arc::clone(&aptos_db.transaction_store),
Arc::clone(&aptos_db.ledger_store),
Expand All @@ -63,7 +63,7 @@ fn verify_event_store_pruner(events: Vec<Vec<ContractEvent>>) {
pruner
.wake_and_wait(
i as u64, /* latest_version */
PrunerIndex::EventStorePrunerIndex as usize,
PrunerIndex::LedgerPrunerIndex as usize,
)
.unwrap();
// ensure that all events up to i * 2 has been pruned
Expand Down
29 changes: 29 additions & 0 deletions storage/aptosdb/src/pruner/ledger_store/ledger_counter_pruner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0
use crate::{pruner::db_sub_pruner::DBSubPruner, LedgerStore};
use schemadb::SchemaBatch;
use std::sync::Arc;

pub struct LedgerCounterPruner {
/// Keeps track of the target version that the pruner needs to achieve.
ledger_store: Arc<LedgerStore>,
}

impl DBSubPruner for LedgerCounterPruner {
fn prune(
&self,
db_batch: &mut SchemaBatch,
least_readable_version: u64,
target_version: u64,
) -> anyhow::Result<()> {
self.ledger_store
.prune_ledger_couners(least_readable_version, target_version, db_batch)?;
Ok(())
}
}

impl LedgerCounterPruner {
pub fn new(ledger_store: Arc<LedgerStore>) -> Self {
LedgerCounterPruner { ledger_store }
}
}
Loading

0 comments on commit 203c831

Please sign in to comment.