Skip to content

Commit

Permalink
[Storage] Add support for getting state key, value mapping for a stat…
Browse files Browse the repository at this point in the history
…e key prefix

Closes: aptos-labs#585
  • Loading branch information
sitalkedia authored and aptos-bot committed Apr 21, 2022
1 parent 1cd21e6 commit f63c0a1
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 28 deletions.
13 changes: 11 additions & 2 deletions storage/aptosdb/src/schema/state_value_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
use crate::schema::{ensure_slice_len_eq, ensure_slice_len_gt, STATE_VALUE_INDEX_CF_NAME};
use anyhow::Result;
use aptos_types::{state_store::state_key::StateKey, transaction::Version};
use aptos_types::{
state_store::{state_key::StateKey, state_key_prefix::StateKeyPrefix},
transaction::Version,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use schemadb::{
define_schema,
schema::{KeyCodec, ValueCodec},
schema::{KeyCodec, SeekKeyCodec, ValueCodec},
};
use std::{io::Write, mem::size_of};

Expand Down Expand Up @@ -62,5 +65,11 @@ impl ValueCodec<StateValueIndexSchema> for u8 {
}
}

impl SeekKeyCodec<StateValueIndexSchema> for &StateKeyPrefix {
fn encode_seek_key(&self) -> Result<Vec<u8>> {
self.encode()
}
}

#[cfg(test)]
mod test;
116 changes: 104 additions & 12 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ use crate::{
state_value_index::StateValueIndexSchema,
AptosDbError,
};
#[cfg(test)]
use anyhow::anyhow;
use anyhow::{ensure, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_jellyfish_merkle::{
iterator::JellyfishMerkleIterator, node_type::NodeKey, restore::JellyfishMerkleRestore,
JellyfishMerkleTree, TreeReader, TreeWriter,
};
#[cfg(test)]
use aptos_types::state_store::state_key_prefix::StateKeyPrefix;
use aptos_types::{
nibble::{nibble_path::NibblePath, ROOT_NIBBLE_HEIGHT},
proof::{SparseMerkleProof, SparseMerkleRangeProof},
Expand All @@ -32,13 +36,18 @@ use aptos_types::{
};
use itertools::process_results;
use schemadb::{SchemaBatch, DB};
#[cfg(test)]
use std::cmp::Ordering;
use std::{collections::HashMap, sync::Arc};
use storage_interface::StateSnapshotReceiver;

type LeafNode = aptos_jellyfish_merkle::node_type::LeafNode<StateKeyAndValue>;
type Node = aptos_jellyfish_merkle::node_type::Node<StateKeyAndValue>;
type NodeBatch = aptos_jellyfish_merkle::NodeBatch<StateKeyAndValue>;

#[cfg(test)]
pub const MAX_VALUES_TO_FETCH_FOR_KEY_PREFIX: usize = 10_000;

#[derive(Debug)]
pub(crate) struct StateStore {
db: Arc<DB>,
Expand All @@ -63,6 +72,88 @@ impl StateStore {
))
}

#[cfg(test)]
fn get_node_keys_by_key_prefix(
&self,
key_prefix: &StateKeyPrefix,
desired_version: Version,
) -> Result<HashMap<StateKey, NodeKey>> {
let mut iter = self.db.iter::<StateValueIndexSchema>(Default::default())?;
let mut result = HashMap::new();
iter.seek(&(key_prefix))?;
while let Some(((state_key, first_version), num_nibbles)) = iter.next().transpose()? {
// Cursor is currently at the first available version of the state key.
// Check if the key_prefix is a valid prefix of the state_key we got from DB.

if !key_prefix.is_prefix(&state_key)? {
// No more keys matching the key_prefix, we can return the result.
return Ok(result);
}
match first_version.cmp(&desired_version) {
Ordering::Less => {
iter.seek_for_prev(&(state_key.clone(), desired_version))?;
let ((state_key, db_version), num_nibbles) =
iter.next().transpose()?.ok_or_else(|| {
anyhow!(
"Failure seeking to desired version {:?} for state key {:?}",
desired_version,
state_key
)
})?;
result.insert(
state_key.clone(),
NodeKey::new(
db_version,
NibblePath::new_from_state_key(&state_key, num_nibbles as usize),
),
);
}

Ordering::Equal => {
result.insert(
state_key.clone(),
NodeKey::new(
first_version,
NibblePath::new_from_state_key(&state_key, num_nibbles as usize),
),
);
}
Ordering::Greater => {}
}
// We don't allow fetching arbitrarily large number of values to be fetched as this can
// potentially slowdown the DB.
if result.len() > MAX_VALUES_TO_FETCH_FOR_KEY_PREFIX {
return Err(anyhow!(
"Too many values requested for key_prefix {:?} - maximum allowed {:?}",
key_prefix,
MAX_VALUES_TO_FETCH_FOR_KEY_PREFIX
));
}
// Seek to the next key - this can be done by seeking to the current key with max version
iter.seek(&(state_key, u64::MAX))?;
}
Ok(result)
}

/// Returns the key, value pairs for a particular state key prefix at at desired version. This
/// API can be used to get all resources of an account by passing the account address as the
/// key prefix.
#[cfg(test)]
pub fn get_values_by_key_prefix(
&self,
key_prefix: &StateKeyPrefix,
version: Version,
) -> Result<HashMap<StateKey, StateValue>> {
let mut result = HashMap::new();
for (state_key, node_key) in self.get_node_keys_by_key_prefix(key_prefix, version)? {
let state_value = self
.get_value_by_node_key(&node_key)?
.ok_or_else(|| anyhow!("Failure reading value for node_key {:?}", node_key))?;
result.insert(state_key, state_value);
}
Ok(result)
}

/// Get the state value given the state key and root hash of state Merkle tree by using the
/// state value index. Only used for testing for now but should replace the
/// `get_value_with_proof_by_version` call for VM execution to fetch the value without proof.
Expand All @@ -73,22 +164,23 @@ impl StateStore {
version: Version,
) -> Result<Option<StateValue>> {
match self.get_jmt_leaf_node_key(state_key, version)? {
Some(node_key) => {
if let Some(Node::Leaf(leaf)) =
self.db.get::<JellyfishMerkleNodeSchema>(&node_key)?
{
Ok(Some(leaf.value().value.clone()))
} else {
Err(anyhow::anyhow!(
"Can't find value in JMT for state key {:?}",
state_key
))
}
}
Some(node_key) => self.get_value_by_node_key(&node_key),
None => Ok(None),
}
}

#[cfg(test)]
fn get_value_by_node_key(&self, node_key: &NodeKey) -> Result<Option<StateValue>> {
if let Some(Node::Leaf(leaf)) = self.db.get::<JellyfishMerkleNodeSchema>(node_key)? {
Ok(Some(leaf.value().value.clone()))
} else {
Err(anyhow::anyhow!(
"Can't find value in JMT for node key {:?}",
node_key
))
}
}

/// Returns the value index in the form of number of nibbles for given pair of state key and version
/// which can be used to index into the JMT leaf.
#[cfg(test)]
Expand Down
112 changes: 111 additions & 1 deletion storage/aptosdb/src/state_store/state_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use proptest::{

use aptos_jellyfish_merkle::restore::JellyfishMerkleRestore;
use aptos_temppath::TempPath;
use aptos_types::{account_address::AccountAddress, account_state_blob::AccountStateBlob};
use aptos_types::{
access_path::AccessPath, account_address::AccountAddress, account_state_blob::AccountStateBlob,
state_store::state_key::StateKeyTag,
};
use storage_interface::StateSnapshotReceiver;

use crate::{pruner, AptosDB};
Expand Down Expand Up @@ -58,6 +61,24 @@ fn put_account_state_set(
root
}

fn put_value_set(
state_store: &StateStore,
value_set: Vec<(StateKey, StateValue)>,
version: Version,
) -> HashValue {
let mut cs = ChangeSet::new();
let value_set: HashMap<_, _> = value_set
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect();

let root = state_store
.put_value_sets(vec![&value_set], None, version, &mut cs)
.unwrap()[0];
state_store.db.write_schemas(cs.batch).unwrap();
root
}

fn prune_stale_indices(
store: &StateStore,
least_readable_version: Version,
Expand Down Expand Up @@ -186,6 +207,95 @@ fn test_state_store_reader_writer() {
verify_value_and_proof(store, address3, Some(&value3), 1, root);
}

#[test]
fn test_get_values_by_key_prefix() {
let tmp_dir = TempPath::new();
let db = AptosDB::new_for_test(&tmp_dir);
let store = &db.state_store;
let address = AccountAddress::new([12u8; AccountAddress::LENGTH]);

let key1 = StateKey::AccessPath(AccessPath::new(address, b"state_key1".to_vec()));
let key2 = StateKey::AccessPath(AccessPath::new(address, b"state_key2".to_vec()));

let value1_v0 = StateValue::from(String::from("value1_v0").into_bytes());
let value2_v0 = StateValue::from(String::from("value2_v0").into_bytes());

let account_key_prefx = StateKeyPrefix::new(StateKeyTag::AccessPath, address.to_vec());

put_value_set(
store,
vec![
(key1.clone(), value1_v0.clone()),
(key2.clone(), value2_v0.clone()),
],
0,
);

let key_value_map = store
.get_values_by_key_prefix(&account_key_prefx, 0)
.unwrap();
assert_eq!(key_value_map.len(), 2);
assert_eq!(*key_value_map.get(&key1).unwrap(), value1_v0);
assert_eq!(*key_value_map.get(&key2).unwrap(), value2_v0);

let key4 = StateKey::AccessPath(AccessPath::new(address, b"state_key4".to_vec()));

let value2_v1 = StateValue::from(String::from("value2_v1").into_bytes());
let value4_v1 = StateValue::from(String::from("value4_v1").into_bytes());

put_value_set(
store,
vec![
(key2.clone(), value2_v1.clone()),
(key4.clone(), value4_v1.clone()),
],
1,
);

// Ensure that we still get only values for key1 and key2 for version 0 after the update
let key_value_map = store
.get_values_by_key_prefix(&account_key_prefx, 0)
.unwrap();
assert_eq!(key_value_map.len(), 2);
assert_eq!(*key_value_map.get(&key1).unwrap(), value1_v0);
assert_eq!(*key_value_map.get(&key2).unwrap(), value2_v0);

// Ensure that key value map for version 1 returns value for key1 at version 0.
let key_value_map = store
.get_values_by_key_prefix(&account_key_prefx, 1)
.unwrap();
assert_eq!(key_value_map.len(), 3);
assert_eq!(*key_value_map.get(&key1).unwrap(), value1_v0);
assert_eq!(*key_value_map.get(&key2).unwrap(), value2_v1);
assert_eq!(*key_value_map.get(&key4).unwrap(), value4_v1);

// Add values for one more account and verify the state
let address1 = AccountAddress::new([22u8; AccountAddress::LENGTH]);
let key5 = StateKey::AccessPath(AccessPath::new(address1, b"state_key5".to_vec()));
let value5_v2 = StateValue::from(String::from("value5_v2").into_bytes());

let account1_key_prefx = StateKeyPrefix::new(StateKeyTag::AccessPath, address1.to_vec());

put_value_set(store, vec![(key5.clone(), value5_v2.clone())], 2);

// address1 did not exist in version 0 and 1.
let key_value_map = store
.get_values_by_key_prefix(&account1_key_prefx, 0)
.unwrap();
assert_eq!(key_value_map.len(), 0);

let key_value_map = store
.get_values_by_key_prefix(&account1_key_prefx, 1)
.unwrap();
assert_eq!(key_value_map.len(), 0);

let key_value_map = store
.get_values_by_key_prefix(&account1_key_prefx, 2)
.unwrap();
assert_eq!(key_value_map.len(), 1);
assert_eq!(*key_value_map.get(&key5).unwrap(), value5_v2);
}

#[test]
fn test_retired_records() {
let address1 = AccountAddress::new([1u8; AccountAddress::LENGTH]);
Expand Down
1 change: 1 addition & 0 deletions types/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
// SPDX-License-Identifier: Apache-2.0

pub mod state_key;
pub mod state_key_prefix;
pub mod state_value;
25 changes: 12 additions & 13 deletions types/src/state_store/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub enum StateKey {
}

#[repr(u8)]
#[derive(FromPrimitive, ToPrimitive)]
enum StateKeyPrefix {
#[derive(Clone, Debug, FromPrimitive, ToPrimitive)]
pub enum StateKeyTag {
AccountAddress,
AccessPath,
Raw = 255,
Expand All @@ -39,14 +39,13 @@ impl StateKey {
let mut out = vec![];

let (prefix, raw_key) = match self {
StateKey::AccountAddressKey(account_address) => (
StateKeyPrefix::AccountAddress,
bcs::to_bytes(account_address)?,
),
StateKey::AccountAddressKey(account_address) => {
(StateKeyTag::AccountAddress, bcs::to_bytes(account_address)?)
}
StateKey::AccessPath(access_path) => {
(StateKeyPrefix::AccessPath, bcs::to_bytes(access_path)?)
(StateKeyTag::AccessPath, bcs::to_bytes(access_path)?)
}
StateKey::Raw(raw_bytes) => (StateKeyPrefix::Raw, raw_bytes.to_vec()),
StateKey::Raw(raw_bytes) => (StateKeyTag::Raw, raw_bytes.to_vec()),
};
out.push(prefix as u8);
out.extend(raw_key);
Expand All @@ -59,14 +58,14 @@ impl StateKey {
return Err(StateKeyDecodeErr::EmptyInput.into());
}
let tag = val[0];
let state_key_tag = StateKeyPrefix::from_u8(tag)
.ok_or(StateKeyDecodeErr::UnknownTag { unknown_tag: tag })?;
let state_key_tag =
StateKeyTag::from_u8(tag).ok_or(StateKeyDecodeErr::UnknownTag { unknown_tag: tag })?;
match state_key_tag {
StateKeyPrefix::AccountAddress => {
StateKeyTag::AccountAddress => {
Ok(StateKey::AccountAddressKey(bcs::from_bytes(&val[1..])?))
}
StateKeyPrefix::AccessPath => Ok(StateKey::AccessPath(bcs::from_bytes(&val[1..])?)),
StateKeyPrefix::Raw => Ok(StateKey::Raw(val[1..].to_vec())),
StateKeyTag::AccessPath => Ok(StateKey::AccessPath(bcs::from_bytes(&val[1..])?)),
StateKeyTag::Raw => Ok(StateKey::Raw(val[1..].to_vec())),
}
}
}
Expand Down
Loading

0 comments on commit f63c0a1

Please sign in to comment.