Skip to content

Commit

Permalink
[executor] lazy fetch of write sets for start up
Browse files Browse the repository at this point in the history
There's an edge case where when a DB is restored from a backup, the
"committed version" is deemed at the last epoch ending version because we
saved the LedgerInfo there, while there is no state snapthot before
that -- a large amount of write sets are required to compose the
StartupInfo struct as it is now.
In practise, on startup state sync will initialize things at the "synced
version" which is the latest transaction version and it should work
fine. We remove the write sets after the state checkpoint in the
StartupInfo struct and make IntoLedgerView fetch them later.

This new structure also allows up to convert the get_write_sets()
interface to get_write_set_iter() more easily -- in case we need it.

Closes: aptos-labs#1035
  • Loading branch information
msmouse authored and aptos-bot committed May 17, 2022
1 parent 3d2b98b commit 733ad85
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 82 deletions.
7 changes: 5 additions & 2 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ impl ExecutedTrees {
frozen_subtrees_in_accumulator: Vec<HashValue>,
num_leaves_in_accumulator: u64,
) -> Self {
let state = InMemoryState::new_at_checkpoint(state_root_hash, num_leaves_in_accumulator);
let state = InMemoryState::new_at_checkpoint(
state_root_hash,
num_leaves_in_accumulator.checked_sub(1),
);
let transaction_accumulator = Arc::new(
InMemoryAccumulator::new(frozen_subtrees_in_accumulator, num_leaves_in_accumulator)
.expect("The startup info read from storage should be valid."),
Expand Down Expand Up @@ -354,7 +357,7 @@ impl ExecutedTrees {
VerifiedStateView::new(
id,
reader.clone(),
persisted_view.state.checkpoint_version(),
persisted_view.state.checkpoint_version,
persisted_view.state.checkpoint_root_hash(),
self.state.current.clone(),
)
Expand Down
86 changes: 60 additions & 26 deletions execution/executor/src/components/in_memory_state_calculator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::components::apply_chunk_output::ParsedTransactionOutput;
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, ensure, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_state_view::{account_with_state_cache::AsAccountWithStateCache, StateViewId};
use aptos_types::{
Expand All @@ -13,7 +13,7 @@ use aptos_types::{
on_chain_config,
proof::{accumulator::InMemoryAccumulator, definition::LeafCount},
state_store::{state_key::StateKey, state_value::StateValue},
transaction::{Transaction, TransactionPayload, Version},
transaction::{Transaction, TransactionPayload, Version, PRE_GENESIS_VERSION},
write_set::{WriteOp, WriteSet},
};
use executor_types::{ExecutedTrees, ProofReader};
Expand All @@ -36,26 +36,45 @@ pub trait IntoLedgerView {

impl IntoLedgerView for TreeState {
fn into_ledger_view(self, db: &Arc<dyn DbReader>) -> Result<ExecutedTrees> {
let checkpoint_num_txns = self
.num_transactions
.checked_sub(self.write_sets_after_checkpoint.len() as LeafCount)
.ok_or_else(|| anyhow!("State checkpoint pre-dates genesis."))?;
let checkpoint_state =
InMemoryState::new_at_checkpoint(self.state_checkpoint_hash, checkpoint_num_txns);

let checkpoint_state_view = VerifiedStateView::new(
StateViewId::Miscellaneous,
db.clone(),
checkpoint_num_txns.checked_sub(1),
let checkpoint_state = InMemoryState::new_at_checkpoint(
self.state_checkpoint_hash,
checkpoint_state.checkpoint.clone(),
self.state_checkpoint_version,
);
let write_sets: Vec<_> = self.write_sets_after_checkpoint.iter().collect();
checkpoint_state_view.prime_cache_by_write_set(&write_sets)?;
let state_cache = checkpoint_state_view.into_state_cache();
let calculator =
InMemoryStateCalculator::new(&checkpoint_state, state_cache, checkpoint_num_txns);
let state = calculator.calculate_for_write_sets_after_checkpoint(&write_sets)?;
let checkpoint_next_version = state_checkpoint_next_version(self.state_checkpoint_version);
ensure!(
checkpoint_next_version <= self.num_transactions,
"checkpoint is after latest version. checkpoint_next_version: {}, num_transactions: {}",
checkpoint_next_version,
self.num_transactions,
);

let state = if self.num_transactions == checkpoint_next_version {
checkpoint_state
} else {
ensure!(
self.num_transactions - checkpoint_next_version <= MAX_WRITE_SETS_AFTER_CHECKPOINT,
"Too many versions after state checkpoint. checkpoint_next_version: {}, num_transactions: {}",
checkpoint_next_version,
self.num_transactions,
);
let checkpoint_state_view = VerifiedStateView::new(
StateViewId::Miscellaneous,
db.clone(),
self.state_checkpoint_version,
self.state_checkpoint_hash,
checkpoint_state.checkpoint.clone(),
);
let write_sets = db.get_write_sets(checkpoint_next_version, self.num_transactions)?;
checkpoint_state_view.prime_cache_by_write_set(&write_sets)?;
let state_cache = checkpoint_state_view.into_state_cache();
let calculator = InMemoryStateCalculator::new(
&checkpoint_state,
state_cache,
checkpoint_next_version,
);
calculator.calculate_for_write_sets_after_checkpoint(&write_sets)?
};

let transaction_accumulator = Arc::new(InMemoryAccumulator::new(
self.ledger_frozen_subtree_hashes,
self.num_transactions,
Expand All @@ -65,6 +84,21 @@ impl IntoLedgerView for TreeState {
}
}

const MAX_WRITE_SETS_AFTER_CHECKPOINT: LeafCount = 200_000;

fn state_checkpoint_next_version(checkpoint_version: Option<Version>) -> Version {
match checkpoint_version {
None => 0,
Some(v) => {
if v == PRE_GENESIS_VERSION {
0
} else {
v + 1
}
}
}
}

pub static NEW_EPOCH_EVENT_KEY: Lazy<EventKey> = Lazy::new(on_chain_config::new_epoch_event_key);

/// Helper class for calculating `InMemState` after a chunk or block of transactions are executed.
Expand All @@ -88,7 +122,7 @@ pub(crate) struct InMemoryStateCalculator {
proof_reader: ProofReader,

checkpoint: SparseMerkleTree<StateValue>,
checkpoint_num_transactions: LeafCount,
checkpoint_version: Option<Version>,
// This doesn't need to be frozen since `_frozen_base` holds a ref to the oldest ancestor
// already, but frozen SMT is used here anyway to avoid exposing the `batch_update()` interface
// on the non-frozen SMT.
Expand All @@ -108,7 +142,7 @@ impl InMemoryStateCalculator {
} = state_cache;
let InMemoryState {
checkpoint,
checkpoint_num_transactions,
checkpoint_version,
current,
updated_since_checkpoint,
} = base.clone();
Expand All @@ -118,7 +152,7 @@ impl InMemoryStateCalculator {
state_cache,
proof_reader: ProofReader::new(proofs),
checkpoint,
checkpoint_num_transactions,
checkpoint_version,
latest: current.freeze(),
next_version,
updated_between_checkpoint_and_latest: updated_since_checkpoint,
Expand Down Expand Up @@ -236,7 +270,7 @@ impl InMemoryStateCalculator {
// Move self to the new checkpoint.
self.latest = new_checkpoint.clone();
self.checkpoint = new_checkpoint.unfreeze();
self.checkpoint_num_transactions = self.next_version;
self.checkpoint_version = self.next_version.checked_sub(1);
self.updated_between_checkpoint_and_latest = HashSet::new();
self.updated_after_latest = HashSet::new();

Expand Down Expand Up @@ -290,7 +324,7 @@ impl InMemoryStateCalculator {

let result_state = InMemoryState::new(
self.checkpoint,
self.checkpoint_num_transactions,
self.checkpoint_version,
latest.unfreeze(),
updated_since_checkpoint,
);
Expand All @@ -300,7 +334,7 @@ impl InMemoryStateCalculator {

pub fn calculate_for_write_sets_after_checkpoint(
mut self,
write_sets: &[&WriteSet],
write_sets: &[WriteSet],
) -> Result<InMemoryState> {
for write_set in write_sets {
let state_updates =
Expand Down
4 changes: 2 additions & 2 deletions storage/aptosdb/src/aptosdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn test_get_latest_tree_state() {
let pre_genesis = db.get_latest_tree_state().unwrap();
assert_eq!(
pre_genesis,
TreeState::new_at_state_checkpoint(0, vec![], hash)
TreeState::new(0, vec![], hash, Some(PRE_GENESIS_VERSION))
);

// bootstrapped db (any transaction info is in)
Expand All @@ -114,7 +114,7 @@ fn test_get_latest_tree_state() {
1,
vec![txn_info.hash()],
txn_info.state_checkpoint_hash().unwrap(),
Vec::new()
Some(0),
),
);
}
Expand Down
38 changes: 18 additions & 20 deletions storage/aptosdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ use aptos_types::{
transaction::{
AccountTransactionsWithProof, Transaction, TransactionInfo, TransactionListWithProof,
TransactionOutput, TransactionOutputListWithProof, TransactionToCommit,
TransactionWithProof, Version, PRE_GENESIS_VERSION,
TransactionWithProof, Version,
},
write_set::WriteSet,
};
use itertools::zip_eq;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -458,32 +459,14 @@ impl AptosDB {
self.state_store.get_root_hash(ver)
})?;

let write_sets_after_checkpoint = self.transaction_store.get_write_sets(
Self::state_checkpoint_next_version(checkpoint_version),
num_transactions,
)?;

Ok(TreeState::new(
num_transactions,
frozen_subtrees,
checkpoint_root_hash,
write_sets_after_checkpoint,
checkpoint_version,
))
}

fn state_checkpoint_next_version(checkpoint_version: Option<Version>) -> Version {
match checkpoint_version {
None => 0,
Some(v) => {
if v == PRE_GENESIS_VERSION {
0
} else {
v + 1
}
}
}
}

// ================================== Backup APIs ===================================

/// Gets an instance of `BackupHandler` for data backup purpose.
Expand Down Expand Up @@ -929,6 +912,21 @@ impl DbReader for AptosDB {
})
}

/// Get write sets for range [begin_version, end_version).
///
/// Used by the executor to build in memory state after a state checkpoint.
/// Any missing write set in the entire range results in error.
fn get_write_sets(
&self,
begin_version: Version,
end_version: Version,
) -> Result<Vec<WriteSet>> {
gauged_api("get_write_sets", || {
self.transaction_store
.get_write_sets(begin_version, end_version)
})
}

fn get_events(
&self,
event_key: &EventKey,
Expand Down
22 changes: 6 additions & 16 deletions storage/storage-interface/src/in_memory_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use aptos_crypto::HashValue;
use aptos_types::{
proof::definition::LeafCount,
state_store::{state_key::StateKey, state_value::StateValue},
transaction::Version,
};
Expand All @@ -20,43 +19,34 @@ use std::collections::HashSet;
#[derive(Clone, Debug)]
pub struct InMemoryState {
pub checkpoint: SparseMerkleTree<StateValue>,
pub checkpoint_num_transactions: LeafCount,
pub checkpoint_version: Option<Version>,
pub current: SparseMerkleTree<StateValue>,
pub updated_since_checkpoint: HashSet<StateKey>,
}

impl InMemoryState {
pub fn new(
checkpoint: SparseMerkleTree<StateValue>,
checkpoint_num_transactions: LeafCount,
checkpoint_version: Option<Version>,
current: SparseMerkleTree<StateValue>,
updated_since_checkpoint: HashSet<StateKey>,
) -> Self {
Self {
checkpoint,
checkpoint_num_transactions,
checkpoint_version,
current,
updated_since_checkpoint,
}
}

pub fn new_empty() -> Self {
let smt = SparseMerkleTree::new_empty();
Self::new(smt.clone(), 0, smt, HashSet::new())
Self::new(smt.clone(), None, smt, HashSet::new())
}

pub fn new_at_checkpoint(root_hash: HashValue, checkpoint_num_transactions: LeafCount) -> Self {
pub fn new_at_checkpoint(root_hash: HashValue, checkpoint_version: Option<Version>) -> Self {
let smt = SparseMerkleTree::new(root_hash);
Self::new(
smt.clone(),
checkpoint_num_transactions,
smt,
HashSet::new(),
)
}

pub fn checkpoint_version(&self) -> Option<Version> {
self.checkpoint_num_transactions.checked_sub(1)
Self::new(smt.clone(), checkpoint_version, smt, HashSet::new())
}

pub fn checkpoint_root_hash(&self) -> HashValue {
Expand Down
27 changes: 17 additions & 10 deletions storage/storage-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl StartupInfo {
num_transactions: 0,
ledger_frozen_subtree_hashes: Vec::new(),
state_checkpoint_hash: *SPARSE_MERKLE_PLACEHOLDER_HASH,
write_sets_after_checkpoint: Vec::new(),
state_checkpoint_version: None,
};
let synced_tree_state = None;

Expand Down Expand Up @@ -113,26 +113,21 @@ pub struct TreeState {
pub ledger_frozen_subtree_hashes: Vec<HashValue>,
/// Root hash of the state checkpoint (global sparse merkle tree).
pub state_checkpoint_hash: HashValue,
/// The state checkpoint can be at or before the latest version, if the latter case, this is
/// the write sets between the state checkpoint and the latest version. Applying this to the
/// state checkpoint results in a in-mem latest world state.
/// N.b the latest version minus `write_sets_after_checkpoint.len()` is the state checkpoint
/// version.
pub write_sets_after_checkpoint: Vec<WriteSet>,
pub state_checkpoint_version: Option<Version>,
}

impl TreeState {
pub fn new(
num_transactions: LeafCount,
ledger_frozen_subtree_hashes: Vec<HashValue>,
state_checkpoint_hash: HashValue,
write_sets_after_checkpoint: Vec<WriteSet>,
state_checkpoint_version: Option<Version>,
) -> Self {
Self {
num_transactions,
ledger_frozen_subtree_hashes,
state_checkpoint_hash,
write_sets_after_checkpoint,
state_checkpoint_version,
}
}

Expand All @@ -145,7 +140,8 @@ impl TreeState {
num_transactions,
ledger_frozen_subtree_hashes,
state_checkpoint_hash: state_root_hash,
write_sets_after_checkpoint: Vec::new(),
// Doesn't consider the possibility of PRE_GENESIS exists
state_checkpoint_version: num_transactions.checked_sub(1),
}
}

Expand Down Expand Up @@ -292,6 +288,17 @@ pub trait DbReader: Send + Sync {
unimplemented!()
}

/// See [`AptosDB::get_write_sets`].
///
/// [`AptosDB::get_write_sets`]: ../aptosdb/struct.AptosDB.html#method.get_write_sets
fn get_write_sets(
&self,
start_version: Version,
end_version: Version,
) -> Result<Vec<WriteSet>> {
unimplemented!()
}

/// Returns events by given event key
fn get_events(
&self,
Expand Down
2 changes: 1 addition & 1 deletion storage/storage-interface/src/verified_state_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl VerifiedStateView {
}
}

pub fn prime_cache_by_write_set(&self, write_sets: &[&WriteSet]) -> Result<()> {
pub fn prime_cache_by_write_set(&self, write_sets: &[WriteSet]) -> Result<()> {
write_sets
.iter()
.flat_map(|write_set| write_set.iter())
Expand Down
Loading

0 comments on commit 733ad85

Please sign in to comment.