Skip to content

Commit

Permalink
[storage] cache latest persisted state version
Browse files Browse the repository at this point in the history
in preparation for per-block and less frequent state updating

Closes: aptos-labs#863
  • Loading branch information
msmouse authored and aptos-bot committed May 10, 2022
1 parent e253610 commit 56b1f33
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 396 deletions.
7 changes: 0 additions & 7 deletions execution/executor/proptest-regressions/executor_test.txt

This file was deleted.

17 changes: 9 additions & 8 deletions storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,18 +1287,11 @@ impl DbWriter for AptosDB {
self.commit(sealed_cs)?;
}

// Once everything is successfully persisted, update the latest in-memory ledger info.
if let Some(x) = ledger_info_with_sigs {
self.ledger_store.set_latest_ledger_info(x.clone());

APTOS_STORAGE_LEDGER_VERSION.set(x.ledger_info().version() as i64);
APTOS_STORAGE_NEXT_BLOCK_EPOCH.set(x.ledger_info().next_block_epoch() as i64);
}

// Only increment counter if commit succeeds and there are at least one transaction written
// to the storage. That's also when we'd inform the pruner thread to work.
if num_txns > 0 {
let last_version = first_version + num_txns - 1;
self.state_store.set_latest_version(last_version);
APTOS_STORAGE_COMMITTED_TXNS.inc_by(num_txns);
APTOS_STORAGE_LATEST_TXN_VERSION.set(last_version as i64);
counters
Expand All @@ -1314,6 +1307,14 @@ impl DbWriter for AptosDB {
self.wake_pruner(last_version);
}

// Once everything is successfully persisted, update the latest in-memory ledger info.
if let Some(x) = ledger_info_with_sigs {
self.ledger_store.set_latest_ledger_info(x.clone());

APTOS_STORAGE_LEDGER_VERSION.set(x.ledger_info().version() as i64);
APTOS_STORAGE_NEXT_BLOCK_EPOCH.set(x.ledger_info().next_block_epoch() as i64);
}

Ok(())
})
}
Expand Down
1 change: 1 addition & 0 deletions storage/aptosdb/src/pruner/state_store/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ fn put_value_set(
.put_value_sets(vec![&value_set], None, version, &mut cs)
.unwrap()[0];
db.write_schemas(cs.batch).unwrap();
state_store.set_latest_version(version);

root
}
Expand Down
62 changes: 59 additions & 3 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
};
use anyhow::{anyhow, ensure, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_infallible::Mutex;
use aptos_jellyfish_merkle::{
iterator::JellyfishMerkleIterator, node_type::NodeKey, restore::JellyfishMerkleRestore,
JellyfishMerkleTree, TreeReader, TreeWriter,
Expand All @@ -29,7 +30,7 @@ use aptos_types::{
state_key_prefix::StateKeyPrefix,
state_value::{StateKeyAndValue, StateValue, StateValueChunkWithProof},
},
transaction::Version,
transaction::{Version, PRE_GENESIS_VERSION},
};
use itertools::process_results;
use schemadb::{SchemaBatch, DB};
Expand All @@ -45,11 +46,57 @@ pub const MAX_VALUES_TO_FETCH_FOR_KEY_PREFIX: usize = 10_000;
#[derive(Debug)]
pub(crate) struct StateStore {
db: Arc<DB>,
latest_version: Mutex<Option<Version>>,
}

impl StateStore {
pub fn new(db: Arc<DB>) -> Self {
Self { db }
let latest_version = Self::find_latest_persisted_version_from_db(&db, Version::MAX)
.expect("Failed to query latest node on initialization.");

Self {
db,
latest_version: Mutex::new(latest_version),
}
}

pub fn latest_version(&self) -> Option<Version> {
*self.latest_version.lock()
}

pub fn set_latest_version(&self, version: Version) {
*self.latest_version.lock() = Some(version)
}

pub fn find_latest_persisted_version_less_than(
&self,
next_version: Version,
) -> Result<Option<Version>> {
let latest_version = self.latest_version();
if let Some(version) = &latest_version {
if *version != PRE_GENESIS_VERSION && *version >= next_version {
if next_version == 0 {
return Ok(None);
} else {
return Self::find_latest_persisted_version_from_db(&self.db, next_version - 1);
}
}
}
Ok(latest_version)
}

pub fn find_latest_persisted_version_from_db(
db: &Arc<DB>,
max_version: Version,
) -> Result<Option<Version>> {
let mut iter = db
.iter::<JellyfishMerkleNodeSchema>(Default::default())
.expect("Failed to create DB iterator.");
// TODO: If we break up a single update batch to multiple commits, we would need to
// deal with a partial version, which hasn't got the root committed.
iter.seek_for_prev(&NodeKey::new_empty_path(max_version))?;
let latest_node = iter.next().transpose()?;
Ok(latest_node.map(|(key, _node)| key.version()))
}

/// Get the state value with proof given the state key and root hash of state Merkle tree
Expand Down Expand Up @@ -238,7 +285,12 @@ impl StateStore {
.collect::<Vec<_>>();

let (new_root_hash_vec, tree_update_batch) = JellyfishMerkleTree::new(self)
.batch_put_value_sets(value_sets_ref, node_hashes, first_version)?;
.batch_put_value_sets(
value_sets_ref,
node_hashes,
self.find_latest_persisted_version_less_than(first_version)?,
first_version,
)?;

let num_versions = new_root_hash_vec.len();
assert_eq!(num_versions, tree_update_batch.node_stats.len());
Expand Down Expand Up @@ -421,6 +473,10 @@ impl TreeWriter<StateKeyAndValue> for StateStore {
add_node_batch_and_index(&mut batch, node_batch)?;
self.db.write_schemas(batch)
}

fn finish_version(&self, version: Version) {
self.set_latest_version(version)
}
}

fn add_node_batch_and_index(batch: &mut SchemaBatch, node_batch: &NodeBatch) -> Result<()> {
Expand Down
10 changes: 4 additions & 6 deletions storage/aptosdb/src/state_store/state_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fn put_value_set(
.put_value_sets(vec![&value_set], None, version, &mut cs)
.unwrap()[0];
state_store.db.write_schemas(cs.batch).unwrap();
state_store.set_latest_version(version);
root
}

Expand Down Expand Up @@ -517,14 +518,11 @@ fn update_store(
for (i, (key, value)) in input.enumerate() {
let mut cs = ChangeSet::new();
let value_state_set: HashMap<_, _> = std::iter::once((key, value)).collect();
let version = first_version + i as Version;
store
.put_value_sets(
vec![&value_state_set],
None,
first_version + i as Version,
&mut cs,
)
.put_value_sets(vec![&value_state_set], None, version, &mut cs)
.unwrap();
store.db.write_schemas(cs.batch).unwrap();
store.set_latest_version(version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ impl StateSnapshotRestoreController {
leaf_idx.set(chunk.last_idx as i64);
}

receiver.finish()?;
Ok(())
receiver.finish()
}

async fn read_state_value(
Expand Down
2 changes: 2 additions & 0 deletions storage/backup/backup-cli/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ impl TreeWriter<StateKeyAndValue> for MockTreeWriter {
fn write_node_batch(&self, _node_batch: &NodeBatch<StateKeyAndValue>) -> Result<()> {
Ok(())
}

fn finish_version(&self, _version: Version) {}
}

impl RestoreRunMode {
Expand Down
6 changes: 3 additions & 3 deletions storage/jellyfish-merkle/src/iterator/iterator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn test_n_leaves_same_version(n: usize) {
}

let (_root_hash, batch) = tree
.put_value_set(btree.clone().into_iter().collect(), 0 /* version */)
.put_value_set_test(btree.clone().into_iter().collect(), 0 /* version */)
.unwrap();
db.write_tree_update_batch(batch).unwrap();
let btree: BTreeMap<_, _> = btree
Expand All @@ -70,7 +70,7 @@ fn test_n_leaves_multiple_versions(n: usize) {
let value = &ValueBlob::from(i.to_be_bytes().to_vec());
assert_eq!(btree.insert(key, value.clone()), None);
let (_root_hash, batch) = tree
.put_value_set(vec![(key, value)], i as Version)
.put_value_set_test(vec![(key, value)], i as Version)
.unwrap();
db.write_tree_update_batch(batch).unwrap();
run_tests(Arc::clone(&db), &btree, i as Version);
Expand All @@ -89,7 +89,7 @@ fn test_n_consecutive_addresses(n: usize) {
.collect();

let (_root_hash, batch) = tree
.put_value_set(btree.clone().into_iter().collect(), 0 /* version */)
.put_value_set_test(btree.clone().into_iter().collect(), 0 /* version */)
.unwrap();
db.write_tree_update_batch(batch).unwrap();
let btree: BTreeMap<_, _> = btree
Expand Down
35 changes: 20 additions & 15 deletions storage/jellyfish-merkle/src/jellyfish_merkle_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn test_insert_to_empty_tree() {

// batch version
let (_new_root_hash, batch) = tree
.batch_put_value_sets(vec![vec![(key, &value)]], None, 0 /* version */)
.batch_put_value_sets_test(vec![vec![(key, &value)]], None, 0 /* version */)
.unwrap();
assert!(batch.stale_node_index_batch.is_empty());

Expand All @@ -62,7 +62,12 @@ fn test_insert_to_pre_genesis() {
let value2 = ValueBlob::from(vec![3u8, 4u8]);
// batch version
let (_root_hash, batch) = tree
.batch_put_value_sets(vec![vec![(key2, &value2)]], None, 0 /* version */)
.batch_put_value_sets(
vec![vec![(key2, &value2)]],
None,
Some(PRE_GENESIS_VERSION),
0, /* version */
)
.unwrap();

// Check pre-genesis node prunes okay.
Expand All @@ -86,7 +91,7 @@ fn test_insert_at_leaf_with_internal_created() {
let value1 = ValueBlob::from(vec![1u8, 2u8]);

let (_root0_hash, batch) = tree
.batch_put_value_sets(vec![vec![(key1, &value1)]], None, 0 /* version */)
.batch_put_value_sets_test(vec![vec![(key1, &value1)]], None, 0 /* version */)
.unwrap();

assert!(batch.stale_node_index_batch.is_empty());
Expand All @@ -99,7 +104,7 @@ fn test_insert_at_leaf_with_internal_created() {
let value2 = ValueBlob::from(vec![3u8, 4u8]);

let (_root1_hash, batch) = tree
.batch_put_value_sets(vec![vec![(key2, &value2)]], None, 1 /* version */)
.batch_put_value_sets_test(vec![vec![(key2, &value2)]], None, 1 /* version */)
.unwrap();
assert_eq!(batch.stale_node_index_batch.len(), 1);
db.write_tree_update_batch(batch).unwrap();
Expand Down Expand Up @@ -149,7 +154,7 @@ fn test_insert_at_leaf_with_multiple_internals_created() {
let value1 = ValueBlob::from(vec![1u8, 2u8]);

let (_root0_hash, batch) = tree
.batch_put_value_sets(vec![vec![(key1, &value1)]], None, 0 /* version */)
.batch_put_value_sets_test(vec![vec![(key1, &value1)]], None, 0 /* version */)
.unwrap();
db.write_tree_update_batch(batch).unwrap();
assert_eq!(tree.get(key1, 0).unwrap().unwrap(), value1);
Expand All @@ -160,7 +165,7 @@ fn test_insert_at_leaf_with_multiple_internals_created() {
let value2 = ValueBlob::from(vec![3u8, 4u8]);

let (_root1_hash, batch) = tree
.batch_put_value_sets(vec![vec![(key2, &value2)]], None, 1 /* version */)
.batch_put_value_sets_test(vec![vec![(key2, &value2)]], None, 1 /* version */)
.unwrap();
db.write_tree_update_batch(batch).unwrap();
assert_eq!(tree.get(key1, 0).unwrap().unwrap(), value1);
Expand Down Expand Up @@ -219,7 +224,7 @@ fn test_insert_at_leaf_with_multiple_internals_created() {
// 3. Update leaf2 with new value
let value2_update = ValueBlob::from(vec![5u8, 6u8]);
let (_root2_hash, batch) = tree
.batch_put_value_sets(
.batch_put_value_sets_test(
vec![vec![(key2, &value2_update)]],
None,
2, /* version */
Expand Down Expand Up @@ -306,7 +311,7 @@ fn test_batch_insertion() {
let db = MockTreeStore::default();
let tree = JellyfishMerkleTree::new(&db);

let (_root, batch) = tree.put_value_set(one_batch, 0 /* version */).unwrap();
let (_root, batch) = tree.put_value_set_test(one_batch, 0 /* version */).unwrap();
db.write_tree_update_batch(batch).unwrap();
verify_fn(&tree, 0);

Expand All @@ -320,7 +325,7 @@ fn test_batch_insertion() {
let tree = JellyfishMerkleTree::new(&db);

let (_roots, batch) = tree
.batch_put_value_sets(batches, None, 0 /* first_version */)
.batch_put_value_sets_test(batches, None, 0 /* first_version */)
.unwrap();
db.write_tree_update_batch(batch).unwrap();
verify_fn(&tree, 6);
Expand Down Expand Up @@ -443,7 +448,7 @@ fn test_non_existence() {
let value3 = ValueBlob::from(vec![3u8]);

let (roots, batch) = tree
.batch_put_value_sets(
.batch_put_value_sets_test(
vec![vec![(key1, &value1), (key2, &value2), (key3, &value3)]],
None,
0, /* version */
Expand Down Expand Up @@ -517,7 +522,7 @@ fn test_put_value_sets() {
index += 1;
}
let (root, batch) = tree
.put_value_set(keyed_value_set, version as Version)
.put_value_set_test(keyed_value_set, version as Version)
.unwrap();
db.write_tree_update_batch(batch.clone()).unwrap();
root_hashes_one_by_one.push(root);
Expand All @@ -542,7 +547,7 @@ fn test_put_value_sets() {
value_sets.push(keyed_value_set);
}
let (root_hashes, batch) = tree
.batch_put_value_sets(value_sets, None, 0 /* version */)
.batch_put_value_sets_test(value_sets, None, 0 /* version */)
.unwrap();
assert_eq!(root_hashes, root_hashes_one_by_one);
assert_eq!(batch, batch_one_by_one);
Expand Down Expand Up @@ -570,7 +575,7 @@ fn many_keys_get_proof_and_verify_tree_root(seed: &[u8], num_keys: usize) {
}

let (roots, batch) = tree
.batch_put_value_sets(vec![kvs.clone()], None, 0 /* version */)
.batch_put_value_sets_test(vec![kvs.clone()], None, 0 /* version */)
.unwrap();
db.write_tree_update_batch(batch).unwrap();

Expand Down Expand Up @@ -614,7 +619,7 @@ fn many_versions_get_proof_and_verify_tree_root(seed: &[u8], num_versions: usize

for (idx, kvs) in kvs.iter().enumerate() {
let (root, batch) = tree
.batch_put_value_sets(vec![vec![(kvs.0, kvs.1)]], None, idx as Version)
.batch_put_value_sets_test(vec![vec![(kvs.0, kvs.1)]], None, idx as Version)
.unwrap();
roots.push(root[0]);
db.write_tree_update_batch(batch).unwrap();
Expand All @@ -624,7 +629,7 @@ fn many_versions_get_proof_and_verify_tree_root(seed: &[u8], num_versions: usize
for (idx, kvs) in kvs.iter().enumerate() {
let version = (num_versions + idx) as Version;
let (root, batch) = tree
.batch_put_value_sets(vec![vec![(kvs.0, kvs.2)]], None, version)
.batch_put_value_sets_test(vec![vec![(kvs.0, kvs.2)]], None, version)
.unwrap();
roots.push(root[0]);
db.write_tree_update_batch(batch).unwrap();
Expand Down
Loading

0 comments on commit 56b1f33

Please sign in to comment.