Skip to content

Commit

Permalink
[Storage] [Pruner] Add more pruning functionality for ledger store pr…
Browse files Browse the repository at this point in the history
…uner

Closes: aptos-labs#255
  • Loading branch information
sitalkedia authored and aptos-bot committed Mar 22, 2022
1 parent 8aadccc commit 5701004
Show file tree
Hide file tree
Showing 24 changed files with 1,087 additions and 257 deletions.
2 changes: 1 addition & 1 deletion storage/aptosdb/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::HashMap;
/// To be specific it carries a batch of db alternations and counter increases that'll be converted
/// to DB alternations on "sealing". This is required to be converted to `SealedChangeSet` before
/// committing to the DB.
pub(crate) struct ChangeSet {
pub struct ChangeSet {
/// A batch of db alternations.
pub batch: SchemaBatch,
/// Counter bumps to be made on commit.
Expand Down
74 changes: 71 additions & 3 deletions storage/aptosdb/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use aptos_types::{
proof::{position::Position, EventAccumulatorProof, EventProof},
transaction::Version,
};
use schemadb::{schema::ValueCodec, ReadOptions, SchemaIterator, DB};
use schemadb::{schema::ValueCodec, ReadOptions, SchemaBatch, SchemaIterator, DB};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
convert::{TryFrom, TryInto},
iter::Peekable,
sync::Arc,
Expand Down Expand Up @@ -119,15 +120,15 @@ impl EventStore {
Ok((event, proof))
}

fn get_txn_ver_by_seq_num(&self, event_key: &EventKey, seq_num: u64) -> Result<u64> {
pub fn get_txn_ver_by_seq_num(&self, event_key: &EventKey, seq_num: u64) -> Result<u64> {
let (ver, _) = self
.db
.get::<EventByKeySchema>(&(*event_key, seq_num))?
.ok_or_else(|| format_err!("Index entry should exist for seq_num {}", seq_num))?;
Ok(ver)
}

fn get_event_by_key(
pub fn get_event_by_key(
&self,
event_key: &EventKey,
seq_num: u64,
Expand Down Expand Up @@ -360,6 +361,73 @@ impl EventStore {
.checked_sub(1)
.ok_or_else(|| format_err!("A block with non-zero seq num started at version 0."))
}

/// Prunes the events by key store for a set of events
pub fn prune_events_by_key(
&self,
candidate_events: &[ContractEvent],
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
let mut sequence_range_by_event_keys: HashMap<EventKey, (u64, u64)> = HashMap::new();

candidate_events.iter().for_each(|event| {
let event_key = event.key();
// Events should be sorted by sequence numbers, so the first sequence number for the
// event key should be the minimum
match sequence_range_by_event_keys.entry(*event_key) {
Entry::Occupied(mut occupied) => {
occupied.insert((occupied.get().0, event.sequence_number()));
}
Entry::Vacant(vacant) => {
vacant.insert((event.sequence_number(), event.sequence_number()));
}
}
});

for (event_key, (min, max)) in sequence_range_by_event_keys {
db_batch
.delete_range_inclusive::<EventByKeySchema>(&(event_key, min), &(event_key, max));
}
Ok(())
}

/// Prunes events by accumulator store for a range of version in [begin, end)
pub fn prune_event_accumulator(
&self,
begin: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
db_batch.delete_range::<EventAccumulatorSchema>(
&(begin, Position::from_inorder_index(0)),
&(end, Position::from_inorder_index(0)),
)
}

/// Prunes events by version store for a set of events in version range [begin, end)
pub fn prune_events_by_version(
&self,
event_keys: HashSet<EventKey>,
begin: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
for event_key in event_keys {
db_batch
.delete_range::<EventByVersionSchema>(&(event_key, begin, 0), &(event_key, end, 0));
}
Ok(())
}

/// Prunes the event schema for a range of version in [begin, end)
pub fn prune_event_schema(
&self,
begin: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
db_batch.delete_range::<EventSchema>(&(begin, 0_u64), &(end, 0_u64))
}
}

type Accumulator<'a> = MerkleAccumulator<EventHashReader<'a>, EventAccumulatorHasher>;
Expand Down
4 changes: 2 additions & 2 deletions storage/aptosdb/src/ledger_counters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::BTreeMap;
/// Types of ledger counters.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, ToPrimitive, NumVariants)]
#[cfg_attr(test, derive(Arbitrary))]
pub(crate) enum LedgerCounter {
pub enum LedgerCounter {
EventsCreated = 101,

NewStateLeaves = 201,
Expand Down Expand Up @@ -89,7 +89,7 @@ impl InnerLedgerCounters {
}

/// Represents `LedgerCounter` bumps yielded by saving a batch of transactions.
pub(crate) struct LedgerCounterBumps {
pub struct LedgerCounterBumps {
bumps: InnerLedgerCounters,
}

Expand Down
60 changes: 1 addition & 59 deletions storage/aptosdb/src/ledger_store/ledger_info_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,8 @@
use super::*;
use crate::{change_set::ChangeSet, AptosDB};
use aptos_temppath::TempPath;
use aptos_types::{
proptest_types::{AccountInfoUniverse, LedgerInfoWithSignaturesGen},
transaction::Version,
};
use ledger_info_test_utils::*;
use proptest::{collection::vec, prelude::*};
use std::path::Path;

fn arb_ledger_infos_with_sigs() -> impl Strategy<Value = Vec<LedgerInfoWithSignatures>> {
(
any_with::<AccountInfoUniverse>(3),
vec((any::<LedgerInfoWithSignaturesGen>(), 1..10usize), 1..10),
)
.prop_map(|(mut universe, gens)| {
let ledger_infos_with_sigs: Vec<_> = gens
.into_iter()
.map(|(ledger_info_gen, block_size)| {
ledger_info_gen.materialize(&mut universe, block_size)
})
.collect();
assert_eq!(get_first_epoch(&ledger_infos_with_sigs), 0);
ledger_infos_with_sigs
})
}

fn get_first_epoch(ledger_infos_with_sigs: &[LedgerInfoWithSignatures]) -> u64 {
ledger_infos_with_sigs
.first()
.unwrap()
.ledger_info()
.epoch()
}

fn get_last_epoch(ledger_infos_with_sigs: &[LedgerInfoWithSignatures]) -> u64 {
ledger_infos_with_sigs.last().unwrap().ledger_info().epoch()
}

fn get_last_version(ledger_infos_with_sigs: &[LedgerInfoWithSignatures]) -> Version {
ledger_infos_with_sigs
.last()
.unwrap()
.ledger_info()
.version()
}

proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
Expand Down Expand Up @@ -176,23 +135,6 @@ proptest! {
}
}

fn set_up(path: &impl AsRef<Path>, ledger_infos_with_sigs: &[LedgerInfoWithSignatures]) -> AptosDB {
let db = AptosDB::new_for_test(path);
let store = &db.ledger_store;

// Write LIs to DB.
let mut cs = ChangeSet::new();
ledger_infos_with_sigs
.iter()
.map(|info| store.put_ledger_info(info, &mut cs))
.collect::<Result<Vec<_>>>()
.unwrap();
store.db.write_schemas(cs.batch).unwrap();
store.set_latest_ledger_info(ledger_infos_with_sigs.last().unwrap().clone());

db
}

fn put_transaction_infos(db: &AptosDB, txn_infos: &[TransactionInfo]) {
let mut cs = ChangeSet::new();
db.ledger_store
Expand Down
70 changes: 70 additions & 0 deletions storage/aptosdb/src/ledger_store/ledger_info_test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0
use crate::{AptosDB, ChangeSet};
use aptos_types::{
ledger_info::LedgerInfoWithSignatures,
proptest_types::{AccountInfoUniverse, LedgerInfoWithSignaturesGen},
transaction::Version,
};
use proptest::{
arbitrary::{any, any_with},
collection::vec,
prelude::Strategy,
};
use std::path::Path;

pub fn arb_ledger_infos_with_sigs() -> impl Strategy<Value = Vec<LedgerInfoWithSignatures>> {
(
any_with::<AccountInfoUniverse>(3),
vec((any::<LedgerInfoWithSignaturesGen>(), 1..50usize), 1..50),
)
.prop_map(|(mut universe, gens)| {
let ledger_infos_with_sigs: Vec<_> = gens
.into_iter()
.map(|(ledger_info_gen, block_size)| {
ledger_info_gen.materialize(&mut universe, block_size)
})
.collect();
assert_eq!(get_first_epoch(&ledger_infos_with_sigs), 0);
ledger_infos_with_sigs
})
}

pub fn get_first_epoch(ledger_infos_with_sigs: &[LedgerInfoWithSignatures]) -> u64 {
ledger_infos_with_sigs
.first()
.unwrap()
.ledger_info()
.epoch()
}

pub fn get_last_epoch(ledger_infos_with_sigs: &[LedgerInfoWithSignatures]) -> u64 {
ledger_infos_with_sigs.last().unwrap().ledger_info().epoch()
}

pub fn get_last_version(ledger_infos_with_sigs: &[LedgerInfoWithSignatures]) -> Version {
ledger_infos_with_sigs
.last()
.unwrap()
.ledger_info()
.version()
}

pub fn set_up(
path: &impl AsRef<Path>,
ledger_infos_with_sigs: &[LedgerInfoWithSignatures],
) -> AptosDB {
let db = AptosDB::new_for_test(path);
let store = &db.ledger_store;

// Write LIs to DB.
let mut cs = ChangeSet::new();
ledger_infos_with_sigs
.iter()
.map(|info| store.put_ledger_info(info, &mut cs))
.collect::<anyhow::Result<Vec<_>>>()
.unwrap();
store.db.write_schemas(cs.batch).unwrap();
store.set_latest_ledger_info(ledger_infos_with_sigs.last().unwrap().clone());
db
}
34 changes: 29 additions & 5 deletions storage/aptosdb/src/ledger_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
change_set::ChangeSet,
errors::AptosDbError,
schema::{
epoch_by_version::EpochByVersionSchema, ledger_info::LedgerInfoSchema,
transaction_accumulator::TransactionAccumulatorSchema,
epoch_by_version::EpochByVersionSchema, ledger_counters::LedgerCountersSchema,
ledger_info::LedgerInfoSchema, transaction_accumulator::TransactionAccumulatorSchema,
transaction_info::TransactionInfoSchema,
},
};
Expand All @@ -30,12 +30,12 @@ use aptos_types::{
};
use arc_swap::ArcSwap;
use itertools::Itertools;
use schemadb::{ReadOptions, SchemaIterator, DB};
use schemadb::{ReadOptions, SchemaBatch, SchemaIterator, DB};
use std::{ops::Deref, sync::Arc};
use storage_interface::{StartupInfo, TreeState};

#[derive(Debug)]
pub(crate) struct LedgerStore {
pub struct LedgerStore {
db: Arc<DB>,

/// We almost always need the latest ledger info and signatures to serve read requests, so we
Expand Down Expand Up @@ -140,7 +140,7 @@ impl LedgerStore {
})
}

fn get_epoch_state(&self, epoch: u64) -> Result<EpochState> {
pub fn get_epoch_state(&self, epoch: u64) -> Result<EpochState> {
ensure!(epoch > 0, "EpochState only queryable for epoch >= 1.",);

let ledger_info_with_sigs =
Expand Down Expand Up @@ -368,6 +368,28 @@ impl LedgerStore {
pub fn get_root_hash(&self, version: Version) -> Result<HashValue> {
Accumulator::get_root_hash(self, version + 1)
}

/// Prune the ledger counters stored in DB in the range [being, end)
pub fn prune_ledger_couners(
&self,
begin: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
db_batch.delete_range::<LedgerCountersSchema>(&begin, &end)?;
Ok(())
}

/// Prune the epoch by version stored in DB in the range [being, end)
pub fn prune_epoch_by_version(
&self,
begin: Version,
end: Version,
db_batch: &mut SchemaBatch,
) -> anyhow::Result<()> {
db_batch.delete_range::<EpochByVersionSchema>(&begin, &end)?;
Ok(())
}
}

pub(crate) type Accumulator = MerkleAccumulator<LedgerStore, TransactionAccumulatorHasher>;
Expand Down Expand Up @@ -456,4 +478,6 @@ impl<'a> Iterator for EpochEndingLedgerInfoIter<'a> {
#[cfg(test)]
mod ledger_info_test;
#[cfg(test)]
pub(crate) mod ledger_info_test_utils;
#[cfg(test)]
mod transaction_info_test;
15 changes: 10 additions & 5 deletions storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub struct AptosDB {
transaction_store: Arc<TransactionStore>,
state_store: Arc<StateStore>,
event_store: Arc<EventStore>,
system_store: SystemStore,
system_store: Arc<SystemStore>,
rocksdb_property_reporter: RocksdbPropertyReporter,
pruner: Option<Pruner>,
}
Expand Down Expand Up @@ -256,21 +256,26 @@ impl AptosDB {
) -> Self {
let db = Arc::new(db);
let transaction_store = Arc::new(TransactionStore::new(Arc::clone(&db)));
let event_store = Arc::new(EventStore::new(Arc::clone(&db)));
let ledger_store = Arc::new(LedgerStore::new(Arc::clone(&db)));
let system_store = Arc::new(SystemStore::new(Arc::clone(&db)));

AptosDB {
db: Arc::clone(&db),
event_store: Arc::new(EventStore::new(Arc::clone(&db))),
ledger_store: Arc::new(LedgerStore::new(Arc::clone(&db))),
event_store: Arc::clone(&event_store),
ledger_store: Arc::clone(&ledger_store),
state_store: Arc::new(StateStore::new(Arc::clone(&db), account_count_migration)),
transaction_store: Arc::clone(&transaction_store),
system_store: SystemStore::new(Arc::clone(&db)),
system_store: Arc::clone(&system_store),
rocksdb_property_reporter: RocksdbPropertyReporter::new(Arc::clone(&db)),
pruner: match storage_pruner_config {
NO_OP_STORAGE_PRUNER_CONFIG => None,
_ => Some(Pruner::new(
Arc::clone(&db),
storage_pruner_config,
Arc::clone(&transaction_store),
transaction_store,
ledger_store,
event_store,
)),
},
}
Expand Down
Loading

0 comments on commit 5701004

Please sign in to comment.