Skip to content

Commit

Permalink
[NFC] Add InputObjects struct wrapper (MystenLabs#2503)
Browse files Browse the repository at this point in the history
* Add InputObjects struct

* Rename active_inputs to mutable_inputs
  • Loading branch information
lxfind authored Jun 9, 2022
1 parent 7c9d300 commit a131d6b
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 106 deletions.
30 changes: 9 additions & 21 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::{
};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use itertools::Itertools;
use move_binary_format::CompiledModule;
use move_bytecode_utils::module_cache::SyncModuleCache;
use move_core_types::{
Expand Down Expand Up @@ -299,14 +298,14 @@ impl AuthorityState {
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

let (_gas_status, all_objects) = transaction_input_checker::check_transaction_input(
let (_gas_status, input_objects) = transaction_input_checker::check_transaction_input(
&self.database,
&transaction,
&self.metrics.shared_obj_tx,
)
.await?;

let owned_objects = transaction_input_checker::filter_owned_objects(&all_objects);
let owned_objects = input_objects.filter_owned_objects();

let signed_transaction = SignedTransaction::new(
self.committee.load().epoch,
Expand Down Expand Up @@ -461,7 +460,7 @@ impl AuthorityState {
let certificate = confirmation_transaction.certificate;
let transaction_digest = *certificate.digest();

let (gas_status, objects_by_kind) = transaction_input_checker::check_transaction_input(
let (gas_status, input_objects) = transaction_input_checker::check_transaction_input(
&self.database,
&certificate,
&self.metrics.shared_obj_tx,
Expand All @@ -470,12 +469,7 @@ impl AuthorityState {

// At this point we need to check if any shared objects need locks,
// and whether they have them.
let shared_object_refs: Vec<_> = objects_by_kind
.iter()
.filter(|(kind, _)| matches!(kind, InputObjectKind::SharedMoveObject(_)))
.map(|(_, obj)| obj.compute_object_reference())
.sorted()
.collect();
let shared_object_refs = input_objects.filter_shared_objects();
if !shared_object_refs.is_empty() && !certificate.data.kind.is_system_tx() {
// If the transaction contains shared objects, we need to ensure they have been scheduled
// for processing by the consensus protocol.
Expand All @@ -488,27 +482,21 @@ impl AuthorityState {

self.metrics
.num_input_objs
.observe(objects_by_kind.len() as f64);
.observe(input_objects.len() as f64);
self.metrics
.num_shared_objects
.observe(shared_object_refs.len() as f64);
self.metrics
.batch_size
.observe(certificate.data.kind.batch_size() as f64);
debug!(
num_inputs = objects_by_kind.len(),
num_inputs = input_objects.len(),
"Read inputs for transaction from DB"
);

let transaction_dependencies = objects_by_kind
.iter()
.map(|(_, obj)| obj.previous_transaction)
.collect();
let mut temporary_store = AuthorityTemporaryStore::new(
self.database.clone(),
objects_by_kind,
transaction_digest,
);
let transaction_dependencies = input_objects.transaction_dependencies();
let mut temporary_store =
AuthorityTemporaryStore::new(self.database.clone(), input_objects, transaction_digest);
let effects = execution_engine::execute_transaction_to_effects(
shared_object_refs,
&mut temporary_store,
Expand Down
11 changes: 7 additions & 4 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::*;
use crate::epoch::EpochInfoLocals;
use crate::gateway_state::GatewayTxSeqNumber;
use crate::transaction_input_checker::InputObjects;
use narwhal_executor::ExecutionIndices;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -591,15 +592,15 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
/// need to recreate the temporary store based on the inputs and effects to update it properly.
pub async fn update_gateway_state(
&self,
active_inputs: Vec<(InputObjectKind, Object)>,
input_objects: InputObjects,
mutated_objects: HashMap<ObjectRef, Object>,
certificate: CertifiedTransaction,
effects: TransactionEffectsEnvelope<S>,
update_type: UpdateType,
) -> SuiResult {
let transaction_digest = certificate.digest();
let mut temporary_store =
AuthorityTemporaryStore::new(Arc::new(&self), active_inputs, *transaction_digest);
AuthorityTemporaryStore::new(Arc::new(&self), input_objects, *transaction_digest);
for (_, object) in mutated_objects {
temporary_store.write_object(object);
}
Expand Down Expand Up @@ -1206,7 +1207,8 @@ pub async fn store_package_and_init_modules_for_genesis<
.collect::<Vec<_>>();

debug_assert!(ctx.digest() == TransactionDigest::genesis());
let mut temporary_store = AuthorityTemporaryStore::new(store.clone(), filtered, ctx.digest());
let mut temporary_store =
AuthorityTemporaryStore::new(store.clone(), InputObjects::new(filtered), ctx.digest());
let package_id = ObjectID::from(*modules[0].self_id().address());
let natives = native_functions.clone();
let mut gas_status = SuiGasStatus::new_unmetered();
Expand Down Expand Up @@ -1239,7 +1241,8 @@ pub async fn generate_genesis_system_object<
genesis_ctx: &mut TxContext,
) -> SuiResult {
let genesis_digest = genesis_ctx.digest();
let mut temporary_store = AuthorityTemporaryStore::new(store.clone(), vec![], genesis_digest);
let mut temporary_store =
AuthorityTemporaryStore::new(store.clone(), InputObjects::new(vec![]), genesis_digest);
let pubkeys: Vec<Vec<u8>> = committee
.expanded_keys
.values()
Expand Down
33 changes: 9 additions & 24 deletions crates/sui-core/src/authority/temporary_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::transaction_input_checker::InputObjects;
use move_core_types::account_address::AccountAddress;
use sui_types::{event::Event, gas::SuiGasStatus, object::Owner};

Expand All @@ -20,7 +21,7 @@ pub struct AuthorityTemporaryStore<S> {
package_store: Arc<S>,
tx_digest: TransactionDigest,
objects: BTreeMap<ObjectID, Object>,
active_inputs: Vec<ObjectRef>, // Inputs that are not read only
mutable_inputs: Vec<ObjectRef>, // Inputs that are mutable
written: BTreeMap<ObjectID, (ObjectRef, Object)>, // Objects written
/// Objects actively deleted.
deleted: BTreeMap<ObjectID, (SequenceNumber, DeleteKind)>,
Expand All @@ -36,32 +37,16 @@ impl<S> AuthorityTemporaryStore<S> {
/// initial objects.
pub fn new(
package_store: Arc<S>,
input_objects: Vec<(InputObjectKind, Object)>,
input_objects: InputObjects,
tx_digest: TransactionDigest,
) -> Self {
let active_inputs = input_objects
.iter()
.filter_map(|(kind, object)| match kind {
InputObjectKind::MovePackage(_) => None,
InputObjectKind::ImmOrOwnedMoveObject(object_ref) => {
if object.is_immutable() {
None
} else {
Some(*object_ref)
}
}
InputObjectKind::SharedMoveObject(_) => Some(object.compute_object_reference()),
})
.collect();
let objects = input_objects
.into_iter()
.map(|(_, object)| (object.id(), object))
.collect();
let mutable_inputs = input_objects.mutable_inputs();
let objects = input_objects.into_object_map();
Self {
package_store,
tx_digest,
objects,
active_inputs,
mutable_inputs,
written: BTreeMap::new(),
deleted: BTreeMap::new(),
events: Vec::new(),
Expand Down Expand Up @@ -90,7 +75,7 @@ impl<S> AuthorityTemporaryStore<S> {
}
(
self.objects,
self.active_inputs,
self.mutable_inputs,
self.written,
self.deleted,
self.events,
Expand All @@ -102,7 +87,7 @@ impl<S> AuthorityTemporaryStore<S> {
/// sequence number. This is required to achieve safety.
/// We skip the gas object, because gas object will be updated separately.
pub fn ensure_active_inputs_mutated(&mut self, gas_object_id: &ObjectID) {
for (id, _seq, _) in &self.active_inputs {
for (id, _seq, _) in &self.mutable_inputs {
if id == gas_object_id {
continue;
}
Expand Down Expand Up @@ -268,7 +253,7 @@ impl<S> AuthorityTemporaryStore<S> {
self.written.iter().all(|(elt, _)| used.insert(elt));
self.deleted.iter().all(|elt| used.insert(elt.0));

self.active_inputs.iter().all(|elt| !used.insert(&elt.0))
self.mutable_inputs.iter().all(|elt| !used.insert(&elt.0))
},
"Mutable input neither written nor deleted."
);
Expand Down
17 changes: 10 additions & 7 deletions crates/sui-core/src/epoch/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use sui_types::{
SUI_SYSTEM_STATE_OBJECT_ID,
};

use crate::transaction_input_checker::InputObjects;
use crate::{
authority::AuthorityTemporaryStore, authority_active::ActiveAuthority,
authority_aggregator::authority_aggregator_tests::init_local_authorities,
Expand Down Expand Up @@ -130,13 +131,15 @@ async fn test_start_epoch_change() {
let tx_digest = *transaction.digest();
let mut temporary_store = AuthorityTemporaryStore::new(
state.database.clone(),
transaction
.data
.input_objects()
.unwrap()
.into_iter()
.zip(genesis_objects)
.collect(),
InputObjects::new(
transaction
.data
.input_objects()
.unwrap()
.into_iter()
.zip(genesis_objects)
.collect(),
),
tx_digest,
);
let effects = execution_engine::execute_transaction_to_effects(
Expand Down
11 changes: 6 additions & 5 deletions crates/sui-core/src/gateway_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
use sui_json::{resolve_move_function_args, SuiJsonCallArg, SuiJsonValue};

use crate::gateway_types::*;
use crate::transaction_input_checker::InputObjects;

#[cfg(test)]
#[path = "unit_tests/gateway_state_tests.rs"]
Expand Down Expand Up @@ -445,7 +446,7 @@ where

async fn execute_transaction_impl_inner(
&self,
all_objects: Vec<(InputObjectKind, Object)>,
input_objects: InputObjects,
transaction: Transaction,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
// If execute_transaction ever fails due to panic, we should fix the panic and make sure it doesn't.
Expand Down Expand Up @@ -497,7 +498,7 @@ where
);
self.store
.update_gateway_state(
all_objects,
input_objects,
mutated_objects,
new_certificate.clone(),
effects.clone().to_unsigned_effects(),
Expand All @@ -520,20 +521,20 @@ where
self.sync_input_objects_with_authorities(&transaction)
.await?;

let (_gas_status, all_objects) = transaction_input_checker::check_transaction_input(
let (_gas_status, input_objects) = transaction_input_checker::check_transaction_input(
&self.store,
&transaction,
&self.metrics.shared_obj_tx,
)
.await?;

let owned_objects = transaction_input_checker::filter_owned_objects(&all_objects);
let owned_objects = input_objects.filter_owned_objects();
self.set_transaction_lock(&owned_objects, transaction.clone())
.instrument(tracing::trace_span!("db_set_transaction_lock"))
.await?;

let exec_result = self
.execute_transaction_impl_inner(all_objects, transaction)
.execute_transaction_impl_inner(input_objects, transaction)
.await;
if exec_result.is_err() && is_last_retry {
// If we cannot successfully execute this transaction, even after all the retries,
Expand Down
Loading

0 comments on commit a131d6b

Please sign in to comment.