Skip to content

Commit

Permalink
[storage] TreeState includes state checkpoint and write sets after it
Browse files Browse the repository at this point in the history
Startup embeds TreeState hence changed accordingly -- for one thing,
StateStore can no longer serve StartupInfo alone -- AptosDB needs to do
it by leveraging multiple stores.

in preparation for per-block SMT update

Closes: aptos-labs#925
  • Loading branch information
msmouse authored and aptos-bot committed May 15, 2022
1 parent f9e7243 commit 9ed81ba
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 140 deletions.
35 changes: 26 additions & 9 deletions execution/executor/src/components/in_memory_state_calculator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::components::apply_chunk_output::ParsedTransactionOutput;
use anyhow::{anyhow, bail, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_state_view::account_with_state_cache::AsAccountWithStateCache;
use aptos_state_view::{account_with_state_cache::AsAccountWithStateCache, StateViewId};
use aptos_types::{
account_view::AccountView,
epoch_state::EpochState,
Expand All @@ -25,25 +25,43 @@ use std::{
sync::Arc,
};
use storage_interface::{
in_memory_state::InMemoryState, verified_state_view::StateCache, DbReader, TreeState,
in_memory_state::InMemoryState,
verified_state_view::{StateCache, VerifiedStateView},
DbReader, TreeState,
};

pub trait IntoLedgerView {
fn into_ledger_view(self, db: &Arc<dyn DbReader>) -> Result<ExecutedTrees>;
}

impl IntoLedgerView for TreeState {
fn into_ledger_view(self, _db: &Arc<dyn DbReader>) -> Result<ExecutedTrees> {
let checkpoint_num_txns = self.num_transactions;
let checkpoint =
InMemoryState::new_at_checkpoint(self.state_root_hash, checkpoint_num_txns);

fn into_ledger_view(self, db: &Arc<dyn DbReader>) -> Result<ExecutedTrees> {
let checkpoint_num_txns = self
.num_transactions
.checked_sub(self.write_sets_after_checkpoint.len() as LeafCount)
.ok_or_else(|| anyhow!("State checkpoint pre-dates genesis."))?;
let checkpoint_state =
InMemoryState::new_at_checkpoint(self.state_checkpoint_hash, checkpoint_num_txns);

let checkpoint_state_view = VerifiedStateView::new(
StateViewId::Miscellaneous,
db.clone(),
checkpoint_num_txns.checked_sub(1),
self.state_checkpoint_hash,
checkpoint_state.checkpoint.clone(),
);
let write_sets: Vec<_> = self.write_sets_after_checkpoint.iter().collect();
checkpoint_state_view.prime_cache_by_write_set(&write_sets)?;
let state_cache = checkpoint_state_view.into_state_cache();
let calculator =
InMemoryStateCalculator::new(&checkpoint_state, state_cache, checkpoint_num_txns);
let state = calculator.calculate_for_write_sets_after_checkpoint(&write_sets)?;
let transaction_accumulator = Arc::new(InMemoryAccumulator::new(
self.ledger_frozen_subtree_hashes,
self.num_transactions,
)?);

Ok(ExecutedTrees::new(checkpoint, transaction_accumulator))
Ok(ExecutedTrees::new(state, transaction_accumulator))
}
}

Expand Down Expand Up @@ -280,7 +298,6 @@ impl InMemoryStateCalculator {
Ok((result_state, self.state_cache))
}

#[allow(dead_code)]
pub fn calculate_for_write_sets_after_checkpoint(
mut self,
write_sets: &[&WriteSet],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub fn create_startup_info() -> StartupInfo {
StartupInfo::new(
create_epoch_ending_ledger_info(),
Some(EpochState::empty()),
TreeState::new(0, vec![], HashValue::random()),
TreeState::new_at_state_checkpoint(0, vec![], HashValue::random()),
None,
)
}
Expand Down
49 changes: 21 additions & 28 deletions storage/aptosdb/src/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
get_first_seq_num_and_limit, schema::jellyfish_merkle_node::JellyfishMerkleNodeSchema,
test_helper, test_helper::arb_blocks_to_commit, AptosDB, ROCKSDB_PROPERTIES,
get_first_seq_num_and_limit, test_helper,
test_helper::{arb_blocks_to_commit, put_as_state_root, put_transaction_info},
AptosDB, ROCKSDB_PROPERTIES,
};
use aptos_crypto::{
hash::{CryptoHash, SPARSE_MERKLE_PLACEHOLDER_HASH},
HashValue,
};
use aptos_jellyfish_merkle::node_type::{Node, NodeKey};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_temppath::TempPath;
use aptos_types::{
proof::SparseMerkleLeafNode,
state_store::{
state_key::StateKey,
state_value::{StateKeyAndValue, StateValue},
},
state_store::{state_key::StateKey, state_value::StateValue},
transaction::{ExecutionStatus, TransactionInfo, PRE_GENESIS_VERSION},
};
use proptest::prelude::*;
Expand Down Expand Up @@ -87,42 +81,41 @@ fn test_get_latest_tree_state() {

// entirely emtpy db
let empty = db.get_latest_tree_state().unwrap();
assert_eq!(
empty,
TreeState::new(0, vec![], *SPARSE_MERKLE_PLACEHOLDER_HASH,)
);
assert_eq!(empty, TreeState::new_empty(),);

// unbootstrapped db with pre-genesis state
let key = StateKey::Raw(String::from("test_key").into_bytes());
let value = StateValue::from(String::from("test_val").into_bytes());

db.db
.put::<JellyfishMerkleNodeSchema>(
&NodeKey::new_empty_path(PRE_GENESIS_VERSION),
&Node::new_leaf(
key.hash(),
StateKeyAndValue::new(key.clone(), value.clone()),
),
)
.unwrap();
put_as_state_root(&db, PRE_GENESIS_VERSION, key.clone(), value.clone());
let hash = SparseMerkleLeafNode::new(key.hash(), value.hash()).hash();
let pre_genesis = db.get_latest_tree_state().unwrap();
assert_eq!(pre_genesis, TreeState::new(0, vec![], hash));
assert_eq!(
pre_genesis,
TreeState::new_at_state_checkpoint(0, vec![], hash)
);

// bootstrapped db (any transaction info is in)
put_as_state_root(&db, 0, key, value);
let txn_info = TransactionInfo::new(
HashValue::random(),
HashValue::random(),
HashValue::random(),
Some(HashValue::random()),
Some(hash),
0,
ExecutionStatus::MiscellaneousError(None),
);
test_helper::put_transaction_info(&db, 0, &txn_info);
put_transaction_info(&db, 0, &txn_info);

let bootstrapped = db.get_latest_tree_state().unwrap();
assert_eq!(
bootstrapped,
TreeState::new(1, vec![txn_info.hash()], txn_info.state_change_hash(),)
TreeState::new(
1,
vec![txn_info.hash()],
txn_info.state_checkpoint_hash().unwrap(),
Vec::new()
),
);
}

Expand Down
37 changes: 19 additions & 18 deletions storage/aptosdb/src/backup/backup_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
state_store::StateStore,
transaction_store::TransactionStore,
};
use anyhow::{anyhow, ensure, Result};
use anyhow::{ensure, Result};
use aptos_crypto::hash::HashValue;
use aptos_jellyfish_merkle::iterator::JellyfishMerkleIterator;
use aptos_types::{
Expand Down Expand Up @@ -132,23 +132,24 @@ impl BackupHandler {
pub fn get_db_state(&self) -> Result<Option<DbState>> {
self.ledger_store
.get_startup_info()?
.map(|s| {
Ok(DbState {
epoch: s.get_epoch_state().epoch,
committed_version: s
.committed_tree_state
.num_transactions
.checked_sub(1)
.ok_or_else(|| anyhow!("Bootstrapped DB has no transactions."))?,
synced_version: s
.synced_tree_state
.as_ref()
.unwrap_or(&s.committed_tree_state)
.num_transactions
.checked_sub(1)
.ok_or_else(|| anyhow!("Bootstrapped DB has no transactions."))?,
})
})
.map(
|(latest_li, epoch_state_if_not_in_li, synced_version_opt)| {
Ok(DbState {
epoch: latest_li
.ledger_info()
.next_epoch_state()
.unwrap_or_else(|| {
epoch_state_if_not_in_li
.as_ref()
.expect("EpochState must exist")
})
.epoch,
committed_version: latest_li.ledger_info().version(),
synced_version: synced_version_opt
.unwrap_or_else(|| latest_li.ledger_info().version()),
})
},
)
.transpose()
}

Expand Down
16 changes: 8 additions & 8 deletions storage/aptosdb/src/backup/restore_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,19 @@ impl RestoreHandler {
)
}

pub fn get_tree_state(&self, num_transactions: LeafCount) -> Result<TreeState> {
pub fn get_tree_state(&self, version: Option<Version>) -> Result<TreeState> {
let num_transactions: LeafCount = version.map_or(0, |v| v + 1);
let frozen_subtrees = self
.ledger_store
.get_frozen_subtree_hashes(num_transactions)?;
let state_root_hash = if num_transactions == 0 {
self.state_store
let state_root_hash = match version {
None => self
.state_store
.get_root_hash_option(PRE_GENESIS_VERSION)?
.unwrap_or(*SPARSE_MERKLE_PLACEHOLDER_HASH)
} else {
self.state_store.get_root_hash(num_transactions - 1)?
.unwrap_or(*SPARSE_MERKLE_PLACEHOLDER_HASH),
Some(ver) => self.state_store.get_root_hash(ver)?,
};

Ok(TreeState::new(
Ok(TreeState::new_at_state_checkpoint(
num_transactions,
frozen_subtrees,
state_root_hash,
Expand Down
31 changes: 12 additions & 19 deletions storage/aptosdb/src/ledger_store/ledger_info_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::*;
use crate::{change_set::ChangeSet, AptosDB};
use crate::AptosDB;
use aptos_temppath::TempPath;
use ledger_info_test_utils::*;
use proptest::{collection::vec, prelude::*};
Expand Down Expand Up @@ -109,29 +109,22 @@ proptest! {
let db = set_up(&tmp_dir, &ledger_infos_with_sigs);
put_transaction_infos(&db, &txn_infos);

let startup_info = db.ledger_store.get_startup_info().unwrap().unwrap();
let latest_li = ledger_infos_with_sigs.last().unwrap().ledger_info();
assert_eq!(startup_info.latest_ledger_info, *ledger_infos_with_sigs.last().unwrap());
let expected_epoch_state = if latest_li.next_epoch_state().is_none() {
Some(db.ledger_store.get_epoch_state(latest_li.epoch()).unwrap())
let (latest_li, epoch_state, synced_version_opt) = db.ledger_store.get_startup_info().unwrap().unwrap();
assert_eq!(latest_li, *ledger_infos_with_sigs.last().unwrap());
let li = latest_li.ledger_info();
let expected_epoch_state = if li.next_epoch_state().is_none() {
Some(db.ledger_store.get_epoch_state(li.epoch()).unwrap())
} else {
None
};
assert_eq!(startup_info.latest_epoch_state, expected_epoch_state);
let committed_version = get_last_version(&ledger_infos_with_sigs);
assert_eq!(
startup_info.committed_tree_state.state_root_hash,
txn_infos[committed_version as usize].state_change_hash(),
);
assert_eq!(epoch_state, expected_epoch_state);
let synced_version = (txn_infos.len() - 1) as u64;
if synced_version > committed_version {
assert_eq!(
startup_info.synced_tree_state.unwrap().state_root_hash,
txn_infos.last().unwrap().state_change_hash(),
);
let expected_synced_version = if synced_version > li.version() {
Some(synced_version)
} else {
assert!(startup_info.synced_tree_state.is_none());
}
None
};
assert_eq!(synced_version_opt, expected_synced_version);
}
}

Expand Down
43 changes: 15 additions & 28 deletions storage/aptosdb/src/ledger_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use arc_swap::ArcSwap;
use itertools::Itertools;
use schemadb::{ReadOptions, SchemaBatch, SchemaIterator, DB};
use std::{ops::Deref, sync::Arc};
use storage_interface::{StartupInfo, TreeState};

#[derive(Debug)]
pub struct LedgerStore {
Expand Down Expand Up @@ -157,23 +156,19 @@ impl LedgerStore {
Ok(latest_epoch_state.clone())
}

pub fn get_tree_state(
&self,
num_transactions: LeafCount,
transaction_info: TransactionInfo,
) -> Result<TreeState> {
Ok(TreeState::new(
num_transactions,
self.get_frozen_subtree_hashes(num_transactions)?,
transaction_info.state_change_hash(),
))
}

pub fn get_frozen_subtree_hashes(&self, num_transactions: LeafCount) -> Result<Vec<HashValue>> {
Accumulator::get_frozen_subtree_hashes(self, num_transactions)
}

pub fn get_startup_info(&self) -> Result<Option<StartupInfo>> {
pub fn get_startup_info(
&self,
) -> Result<
Option<(
LedgerInfoWithSignatures, // latest ledger info
Option<EpochState>, // latest epoch state if not in the above ledger info
Option<Version>, // synced version if newer than the ledger info
)>,
> {
// Get the latest ledger info. Return None if not bootstrapped.
let latest_ledger_info = match self.get_latest_ledger_info_option() {
Some(x) => x,
Expand All @@ -188,26 +183,18 @@ impl LedgerStore {
};

let li_version = latest_ledger_info.ledger_info().version();
let (latest_version, latest_txn_info) = self.get_latest_transaction_info()?;
let (latest_version, _) = self.get_latest_transaction_info()?;
assert!(latest_version >= li_version);
let (commited_tree_state, synced_tree_state) = if latest_version == li_version {
(
self.get_tree_state(latest_version + 1, latest_txn_info)?,
None,
)
let synced_version_opt = if latest_version == li_version {
None
} else {
let commited_txn_info = self.get_transaction_info(li_version)?;
(
self.get_tree_state(li_version + 1, commited_txn_info)?,
Some(self.get_tree_state(latest_version + 1, latest_txn_info)?),
)
Some(latest_version)
};

Ok(Some(StartupInfo::new(
Ok(Some((
latest_ledger_info,
latest_epoch_state_if_not_in_li,
commited_tree_state,
synced_tree_state,
synced_version_opt,
)))
}

Expand Down
Loading

0 comments on commit 9ed81ba

Please sign in to comment.