Skip to content

Commit

Permalink
rest_index: add index for dynamic fields
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed Jun 12, 2024
1 parent fb2e4b7 commit 7fad5b3
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 21 deletions.
6 changes: 6 additions & 0 deletions crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,18 @@ impl<'a> TestAuthorityBuilder<'a> {
let rest_index = if self.disable_indexer {
None
} else {
let mut resolver = epoch_store
.executor()
.type_layout_resolver(Box::new(&cache_traits.backing_package_store));

Some(Arc::new(RestIndexStore::new(
path.join("rest_index"),
&authority_store,
&checkpoint_store,
resolver.as_mut(),
)))
};

let transaction_deny_config = self.transaction_deny_config.unwrap_or_default();
let certificate_deny_config = self.certificate_deny_config.unwrap_or_default();
let authority_overload_config = self.authority_overload_config.unwrap_or_default();
Expand Down
13 changes: 9 additions & 4 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use sui_types::accumulator::Accumulator;
use sui_types::crypto::RandomnessRound;
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::inner_temporary_store::PackageStoreWithFallback;
use sui_types::message_envelope::Message;
use sui_types::transaction::TransactionKind;
use sui_types::{
Expand Down Expand Up @@ -669,7 +670,7 @@ impl CheckpointExecutor {
self.transaction_cache_reader.as_ref(),
self.checkpoint_store.clone(),
&all_tx_digests,
epoch_store.clone(),
&epoch_store,
checkpoint.clone(),
self.accumulator.clone(),
effects,
Expand Down Expand Up @@ -891,7 +892,7 @@ async fn handle_execution_effects(
transaction_cache_reader,
checkpoint_store.clone(),
&all_tx_digests,
epoch_store.clone(),
&epoch_store,
checkpoint.clone(),
accumulator.clone(),
effects,
Expand Down Expand Up @@ -1285,7 +1286,7 @@ async fn finalize_checkpoint(
transaction_cache_reader: &dyn TransactionCacheRead,
checkpoint_store: Arc<CheckpointStore>,
tx_digests: &[TransactionDigest],
epoch_store: Arc<AuthorityPerEpochStore>,
epoch_store: &Arc<AuthorityPerEpochStore>,
checkpoint: VerifiedCheckpoint,
accumulator: Arc<StateAccumulator>,
effects: Vec<TransactionEffects>,
Expand Down Expand Up @@ -1319,7 +1320,11 @@ async fn finalize_checkpoint(
// TODO(bmwill) discuss with team a better location for this indexing so that it isn't on
// the critical path and the writes to the DB are done in checkpoint order
if let Some(rest_index) = &state.rest_index {
rest_index.index_checkpoint(&checkpoint_data)?;
let mut layout_resolver = epoch_store.executor().type_layout_resolver(Box::new(
PackageStoreWithFallback::new(state.get_backing_package_store(), &checkpoint_data),
));

rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut())?;
}

if let Some(path) = data_ingestion_dir {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ impl CheckpointBuilder {
let acc = self.accumulator.accumulate_checkpoint(
effects.clone(),
sequence_number,
self.epoch_store.clone(),
&self.epoch_store,
)?;
self.accumulator
.accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
Expand Down
176 changes: 161 additions & 15 deletions crates/sui-core/src/rest_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::authority::authority_store_tables::LiveObject;
use crate::authority::AuthorityStore;
use crate::checkpoints::CheckpointStore;
use crate::state_accumulator::AccumulatorStore;
use move_core_types::language_storage::TypeTag;
use serde::Deserialize;
use serde::Serialize;
use std::path::PathBuf;
Expand All @@ -14,9 +15,12 @@ use sui_types::base_types::ObjectID;
use sui_types::base_types::SequenceNumber;
use sui_types::base_types::SuiAddress;
use sui_types::digests::TransactionDigest;
use sui_types::dynamic_field::{DynamicFieldInfo, DynamicFieldType};
use sui_types::messages_checkpoint::CheckpointContents;
use sui_types::object::Object;
use sui_types::object::Owner;
use sui_types::storage::error::Error as StorageError;
use sui_types::type_resolver::LayoutResolver;
use tracing::{debug, info};
use typed_store::rocks::{DBMap, MetricConf};
use typed_store::traits::Map;
Expand Down Expand Up @@ -52,6 +56,37 @@ impl OwnerIndexInfo {
}
}

#[derive(Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug)]
struct DynamicFieldKey {
parent: ObjectID,
field_id: ObjectID,
}

impl DynamicFieldKey {
fn new<P: Into<ObjectID>>(parent: P, field_id: ObjectID) -> Self {
Self {
parent: parent.into(),
field_id,
}
}
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
pub struct DynamicFieldIndexInfo {
// field_id of this dynamic field is a part of the Key
pub dynamic_field_type: DynamicFieldType,
pub name_type: TypeTag,
pub name_value: Vec<u8>,
// TODO do we want to also store the type of the value? We can get this for free for
// DynamicFields, but for DynamicObjects it would require a lookup in the DB on init, or
// scanning the transaction's output objects for the coorisponding Object to retreive its type
// information.
//
// pub value_type: TypeTag,
/// ObjectId of the child object when `dynamic_field_type == DynamicFieldType::DynamicObject`
pub dynamic_object_id: Option<ObjectID>,
}

#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct TransactionInfo {
checkpoint: u64,
Expand All @@ -74,6 +109,12 @@ struct IndexStoreTables {
/// Allows an efficient iterator to list all objects currently owned by a specific user
/// account.
owner: DBMap<OwnerIndexKey, OwnerIndexInfo>,

/// An index of dynamic fields (children objects).
///
/// Allows an efficient iterator to list all of the dynamic fields owned by a particular
/// ObjectID.
dynamic_field: DBMap<DynamicFieldKey, DynamicFieldIndexInfo>,
// NOTE: Authors and Reviewers before adding any new tables ensure that they are either:
// - bounded in size by the live object set
// - are prune-able and have corresponding logic in the `prune` function
Expand All @@ -88,6 +129,7 @@ impl IndexStoreTables {
&mut self,
authority_store: &AuthorityStore,
checkpoint_store: &CheckpointStore,
resolver: &mut dyn LayoutResolver,
) -> Result<(), StorageError> {
info!("Initializing REST indexes");

Expand Down Expand Up @@ -127,17 +169,27 @@ impl IndexStoreTables {
.iter_live_object_set(false)
.filter_map(LiveObject::to_normal)
{
let Owner::AddressOwner(owner) = object.owner else {
continue;
};

let mut batch = self.owner.batch();

// Owner Index
let owner_key = OwnerIndexKey::new(owner, object.id());
let owner_info = OwnerIndexInfo::new(&object);
match object.owner {
// Owner Index
Owner::AddressOwner(owner) => {
let owner_key = OwnerIndexKey::new(owner, object.id());
let owner_info = OwnerIndexInfo::new(&object);
batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
}

// Dynamic Field Index
Owner::ObjectOwner(parent) => {
if let Some(field_info) = try_create_dynamic_field_info(&object, resolver)? {
let field_key = DynamicFieldKey::new(parent, object.id());

batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
batch.insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
}
}

Owner::Shared { .. } | Owner::Immutable => continue,
}

batch.write()?;
}
Expand All @@ -164,7 +216,11 @@ impl IndexStoreTables {
}

/// Index a Checkpoint
fn index_checkpoint(&self, checkpoint: &CheckpointData) -> Result<(), TypedStoreError> {
fn index_checkpoint(
&self,
checkpoint: &CheckpointData,
resolver: &mut dyn LayoutResolver,
) -> Result<(), StorageError> {
debug!(
checkpoint = checkpoint.checkpoint_summary.sequence_number,
"indexing checkpoint"
Expand Down Expand Up @@ -197,7 +253,13 @@ impl IndexStoreTables {
let owner_key = OwnerIndexKey::new(*address, removed_object.id());
batch.delete_batch(&self.owner, [owner_key])?;
}
Owner::ObjectOwner(_) | Owner::Shared { .. } | Owner::Immutable => {}
Owner::ObjectOwner(object_id) => {
batch.delete_batch(
&self.dynamic_field,
[DynamicFieldKey::new(*object_id, removed_object.id())],
)?;
}
Owner::Shared { .. } | Owner::Immutable => {}
}
}

Expand All @@ -211,8 +273,14 @@ impl IndexStoreTables {
batch.delete_batch(&self.owner, [owner_key])?;
}

Owner::ObjectOwner(_) | Owner::Shared { .. } | Owner::Immutable => {
Owner::ObjectOwner(object_id) => {
batch.delete_batch(
&self.dynamic_field,
[DynamicFieldKey::new(*object_id, old_object.id())],
)?;
}

Owner::Shared { .. } | Owner::Immutable => {}
}
}
}
Expand All @@ -223,7 +291,17 @@ impl IndexStoreTables {
let owner_info = OwnerIndexInfo::new(object);
batch.insert_batch(&self.owner, [(owner_key, owner_info)])?;
}
Owner::ObjectOwner(_) | Owner::Shared { .. } | Owner::Immutable => {}
Owner::ObjectOwner(parent) => {
if let Some(field_info) =
try_create_dynamic_field_info(object, resolver)?
{
let field_key = DynamicFieldKey::new(*parent, object.id());

batch
.insert_batch(&self.dynamic_field, [(field_key, field_info)])?;
}
}
Owner::Shared { .. } | Owner::Immutable => {}
}
}
}
Expand All @@ -248,6 +326,7 @@ impl RestIndexStore {
path: PathBuf,
authority_store: &AuthorityStore,
checkpoint_store: &CheckpointStore,
resolver: &mut dyn LayoutResolver,
) -> Self {
let mut tables = IndexStoreTables::open_tables_read_write(
path,
Expand All @@ -258,7 +337,9 @@ impl RestIndexStore {

// If the index tables are empty then we need to populate them
if tables.is_empty() {
tables.init(authority_store, checkpoint_store).unwrap();
tables
.init(authority_store, checkpoint_store, resolver)
.unwrap();
}

Self { tables }
Expand Down Expand Up @@ -286,7 +367,72 @@ impl RestIndexStore {
self.tables.prune(checkpoint_contents_to_prune)
}

pub fn index_checkpoint(&self, checkpoint: &CheckpointData) -> Result<(), TypedStoreError> {
self.tables.index_checkpoint(checkpoint)
pub fn index_checkpoint(
&self,
checkpoint: &CheckpointData,
resolver: &mut dyn LayoutResolver,
) -> Result<(), StorageError> {
self.tables.index_checkpoint(checkpoint, resolver)
}
}

fn try_create_dynamic_field_info(
object: &Object,
resolver: &mut dyn LayoutResolver,
) -> Result<Option<DynamicFieldIndexInfo>, StorageError> {
// Skip if not a move object
let Some(move_object) = object.data.try_as_move() else {
return Ok(None);
};

// Skip any objects that aren't of type `Field<Name, Value>`
//
// All dynamic fields are of type:
// - Field<Name, Value> for dynamic fields
// - Field<Wrapper<Name, ID>> for dynamic field objects where the ID is the id of the pointed
// to object
//
if !move_object.type_().is_dynamic_field() {
return Ok(None);
}

let (name_value, dynamic_field_type, object_id) = {
let layout = sui_types::type_resolver::into_struct_layout(
resolver
.get_annotated_layout(&move_object.type_().clone().into())
.map_err(StorageError::custom)?,
)
.map_err(StorageError::custom)?;

let move_struct = move_object
.to_move_struct(&layout)
.map_err(StorageError::serialization)?;

// SAFETY: move struct has already been validated to be of type DynamicField
DynamicFieldInfo::parse_move_object(&move_struct).unwrap()
};

let name_type = move_object
.type_()
.try_extract_field_name(&dynamic_field_type)
.expect("object is of type Field");

let name_value = name_value
.undecorate()
.simple_serialize()
.expect("serialization cannot fail");

let dynamic_object_id = match dynamic_field_type {
DynamicFieldType::DynamicObject => Some(object_id),
DynamicFieldType::DynamicField => None,
};

let field_info = DynamicFieldIndexInfo {
name_type,
name_value,
dynamic_field_type,
dynamic_object_id,
};

Ok(Some(field_info))
}
2 changes: 1 addition & 1 deletion crates/sui-core/src/state_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl StateAccumulator {
&self,
effects: Vec<TransactionEffects>,
checkpoint_seq_num: CheckpointSequenceNumber,
epoch_store: Arc<AuthorityPerEpochStore>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<Accumulator> {
let _scope = monitored_scope("AccumulateCheckpoint");
if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
Expand Down
5 changes: 5 additions & 0 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,15 @@ impl SuiNode {
&& config.enable_experimental_rest_api
&& config.enable_index_processing
{
let mut resolver = epoch_store
.executor()
.type_layout_resolver(Box::new(&cache_traits.backing_package_store));

Some(Arc::new(RestIndexStore::new(
config.db_path().join("rest_index"),
&store,
&checkpoint_store,
resolver.as_mut(),
)))
} else {
None
Expand Down

0 comments on commit 7fad5b3

Please sign in to comment.