diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 0f544b110ca3e..8f7edeeeb7605 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -57,7 +57,8 @@ use sui_types::{ storage::{BackingPackageStore, DeleteKind, Storage}, MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS, SUI_SYSTEM_STATE_OBJECT_ID, }; -use tracing::{debug, error, instrument}; +use tokio::sync::broadcast::error::RecvError; +use tracing::{debug, error, instrument, warn}; use typed_store::Map; #[cfg(test)] @@ -533,11 +534,6 @@ impl AuthorityState { self.update_state(temporary_store, &certificate, &signed_effects) .await?; - // Each certificate only reaches here once - if let Some(event_handler) = &self.event_handler { - event_handler.process_events(&signed_effects.effects).await; - } - Ok(TransactionInfoResponse { signed_transaction: self.database.get_transaction(&transaction_digest)?, certified_transaction: Some(certificate), @@ -545,6 +541,97 @@ impl AuthorityState { }) } + fn index_tx( + &self, + indexes: &IndexStore, + seq: TxSequenceNumber, + digest: &TransactionDigest, + cert: &CertifiedTransaction, + effects: &SignedTransactionEffects, + ) -> SuiResult { + indexes.index_tx( + cert.sender_address(), + cert.data.input_objects()?.iter().map(|o| o.object_id()), + effects.effects.mutated_and_created(), + seq, + digest, + ) + } + + async fn process_one_tx(&self, seq: TxSequenceNumber, digest: &TransactionDigest) -> SuiResult { + // Load cert and effects. + let info = self.make_transaction_info(digest).await?; + let (cert, effects) = match info { + TransactionInfoResponse { + certified_transaction: Some(cert), + signed_effects: Some(effects), + .. + } => (cert, effects), + _ => { + return Err(SuiError::CertificateNotfound { + certificate_digest: *digest, + }) + } + }; + + // Index tx + if let Some(indexes) = &self.indexes { + if let Err(e) = self.index_tx(indexes.as_ref(), seq, digest, &cert, &effects) { + warn!(?digest, "Couldn't index tx: {}", e); + } + } + + // Emit events + if let Some(event_handler) = &self.event_handler { + event_handler.process_events(&effects.effects).await; + } + + Ok(()) + } + + // TODO: This should persist the last successfully-processed sequence to disk, and upon + // starting up, look for any sequences in the store since then and process them. + pub async fn run_tx_post_processing_process(&self) -> SuiResult { + let mut subscriber = self.subscribe_batch(); + + loop { + match subscriber.recv().await { + Ok(item) => { + if let UpdateItem::Transaction(( + seq, + ExecutionDigests { + transaction: digest, + .. + }, + )) = item + { + if let Err(e) = self.process_one_tx(seq, &digest).await { + warn!(?digest, "Couldn't process tx: {}", e); + } + } + } + + // For both the error cases, we exit the loop which ends this task. + // TODO: Automatically restart the task, which in combination with the todo above, + // will process any skipped txes and then begin listening for new ones. + Err(RecvError::Closed) => { + // The service closed the channel. + error!("run_tx_post_processing_process receiver channel closed"); + break; + } + Err(RecvError::Lagged(number_skipped)) => { + error!( + "run_tx_post_processing_process too slow, skipped {} txes", + number_skipped + ); + break; + } + } + } + + Ok(()) + } + /// Check if we need to submit this transaction to consensus. We usually do, unless (i) we already /// processed the transaction and we can immediately return the effects, or (ii) we already locked /// all shared-objects of the transaction and can (re-)attempt execution. @@ -1121,52 +1208,11 @@ impl AuthorityState { let notifier_ticket = self.batch_notifier.ticket()?; let seq = notifier_ticket.seq(); - // We want to call update_state before updating the indexes, however this is extremely - // awkward because update_state takes temporary_store by value. - // TODO: Move indexing either into update_state, or make it a batch consumer to clean this - // up. - let (inputs, outputs) = if self.indexes.is_some() { - let inputs: Vec<_> = temporary_store - .objects() - .iter() - .map(|(_, o)| o.clone()) - .collect(); - let outputs: Vec<_> = temporary_store - .written() - .iter() - .map(|(_, (_, o))| o.clone()) - .collect(); - (Some(inputs), Some(outputs)) - } else { - (None, None) - }; - let update_type = UpdateType::Transaction(seq, signed_effects.effects.digest()); - let res = self - .database + self.database .update_state(temporary_store, certificate, signed_effects, update_type) - .await; - - if let Some(indexes) = &self.indexes { - // unwrap ok because of previous if stmt. - let inputs = inputs.unwrap(); - let outputs = outputs.unwrap(); - // turn into vectors of references... - let inputs: Vec<_> = inputs.iter().collect(); - let outputs: Vec<_> = outputs.iter().collect(); - if let Err(e) = indexes.index_tx( - certificate.sender_address(), - &inputs, - &outputs, - seq, - certificate.digest(), - ) { - error!("Error indexing certificate: {}", e); - } - } - - res + .await // implicitly we drop the ticket here and that notifies the batch manager } diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index 6bd026c3c8f10..b35befb4f68c4 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -791,6 +791,11 @@ impl Deserialize<'de>> // sequence_transaction call above assigns a sequence number to the transaction // the first time it is called and will return that same sequence on subsequent // calls. + trace!( + "assigning seq {:?} -> {:?}", + transaction_digest, + effects_digest + ); self.executed_sequence.insert( &assigned_seq, &ExecutionDigests::new(transaction_digest, effects_digest), diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index e7e343f29c9d3..8967c531cd789 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -24,6 +24,7 @@ pub struct SuiNode { grpc_server: tokio::task::JoinHandle>, _json_rpc_service: Option, _batch_subsystem_handle: tokio::task::JoinHandle>, + _post_processing_subsystem_handle: Option>>, _gossip_handle: Option>, state: Arc, } @@ -66,7 +67,7 @@ impl SuiNode { config.public_key(), secret, store, - index_store, + index_store.clone(), checkpoint_store, genesis, config.enable_event_processing, @@ -118,6 +119,19 @@ impl SuiNode { }) }; + let post_processing_subsystem_handle = + if index_store.is_some() || config.enable_event_processing { + let indexing_state = state.clone(); + Some(tokio::task::spawn(async move { + indexing_state + .run_tx_post_processing_process() + .await + .map_err(Into::into) + })) + } else { + None + }; + let validator_service = if config.consensus_config().is_some() { Some(ValidatorService::new(config, state.clone()).await?) } else { @@ -155,6 +169,7 @@ impl SuiNode { _json_rpc_service: json_rpc_service, _gossip_handle: gossip_handle, _batch_subsystem_handle: batch_subsystem_handle, + _post_processing_subsystem_handle: post_processing_subsystem_handle, state, }; diff --git a/crates/sui-storage/src/indexes.rs b/crates/sui-storage/src/indexes.rs index 5be70fb590094..59c68ad0be667 100644 --- a/crates/sui-storage/src/indexes.rs +++ b/crates/sui-storage/src/indexes.rs @@ -13,7 +13,8 @@ use sui_types::batch::TxSequenceNumber; use sui_types::error::SuiResult; -use sui_types::object::Object; +use sui_types::base_types::ObjectRef; +use sui_types::object::Owner; use typed_store::rocks::DBMap; use typed_store::{reopen, traits::Map}; @@ -77,11 +78,11 @@ impl IndexStore { } } - pub fn index_tx( + pub fn index_tx<'a>( &self, sender: SuiAddress, - active_inputs: &[&Object], - mutated_objects: &[&Object], + active_inputs: impl Iterator, + mutated_objects: impl Iterator + Clone, sequence: TxSequenceNumber, digest: &TransactionDigest, ) -> SuiResult { @@ -94,23 +95,22 @@ impl IndexStore { let batch = batch.insert_batch( &self.transactions_by_input_object_id, - active_inputs - .iter() - .map(|object| ((object.id(), sequence), *digest)), + active_inputs.map(|id| ((id, sequence), *digest)), )?; let batch = batch.insert_batch( &self.transactions_by_mutated_object_id, mutated_objects - .iter() - .map(|object| ((object.id(), sequence), *digest)), + .clone() + .map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)), )?; let batch = batch.insert_batch( &self.transactions_to_addr, - mutated_objects.iter().filter_map(|object| { - object - .get_single_owner() + mutated_objects.filter_map(|(_, owner)| { + owner + .get_owner_address() + .ok() .map(|addr| ((addr, sequence), digest)) }), )?; diff --git a/crates/sui-types/src/messages.rs b/crates/sui-types/src/messages.rs index e371eb236908f..81ff63c69dafd 100644 --- a/crates/sui-types/src/messages.rs +++ b/crates/sui-types/src/messages.rs @@ -967,7 +967,7 @@ impl TransactionEffects { /// Return an iterator that iterates through both mutated and /// created objects. /// It doesn't include deleted objects. - pub fn mutated_and_created(&self) -> impl Iterator { + pub fn mutated_and_created(&self) -> impl Iterator + Clone { self.mutated.iter().chain(self.created.iter()) } diff --git a/crates/sui/tests/full_node_tests.rs b/crates/sui/tests/full_node_tests.rs index 40b48430929dd..27d507e3f412c 100644 --- a/crates/sui/tests/full_node_tests.rs +++ b/crates/sui/tests/full_node_tests.rs @@ -106,6 +106,8 @@ async fn wait_for_tx(wait_digest: TransactionDigest, state: Arc) #[tokio::test] async fn test_full_node_follows_txes() -> Result<(), anyhow::Error> { + telemetry_subscribers::init_for_testing(); + let (swarm, mut context, _) = setup_network_and_wallet().await?; let config = swarm.config().generate_fullnode_config(); @@ -138,23 +140,29 @@ async fn test_full_node_indexes() -> Result<(), anyhow::Error> { .state() .get_transactions_by_input_object(transfered_object) .await?; + + assert_eq!(txes.len(), 1); assert_eq!(txes[0].1, digest); let txes = node .state() .get_transactions_by_mutated_object(transfered_object) .await?; + assert_eq!(txes.len(), 1); assert_eq!(txes[0].1, digest); let txes = node.state().get_transactions_from_addr(sender).await?; + assert_eq!(txes.len(), 1); assert_eq!(txes[0].1, digest); let txes = node.state().get_transactions_to_addr(receiver).await?; + assert_eq!(txes.len(), 1); assert_eq!(txes[0].1, digest); // Note that this is also considered a tx to the sender, because it mutated // one or more of the sender's objects. let txes = node.state().get_transactions_to_addr(sender).await?; + assert_eq!(txes.len(), 1); assert_eq!(txes[0].1, digest); // No transactions have originated from the receiver