Skip to content

Commit

Permalink
Fix fullnode indexing (MystenLabs#2494)
Browse files Browse the repository at this point in the history
* Make TX indexing a client of the batch system.

- ensures we only process each tx once.
- removes work from the critical path.

* Check that we get only one instance of the tx

* Move event processing to batch subscriber
  • Loading branch information
mystenmark authored Jun 9, 2022
1 parent 8a6569d commit 1759919
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 63 deletions.
144 changes: 95 additions & 49 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ use sui_types::{
storage::{BackingPackageStore, DeleteKind, Storage},
MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS, SUI_SYSTEM_STATE_OBJECT_ID,
};
use tracing::{debug, error, instrument};
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error, instrument, warn};
use typed_store::Map;

#[cfg(test)]
Expand Down Expand Up @@ -533,18 +534,104 @@ impl AuthorityState {
self.update_state(temporary_store, &certificate, &signed_effects)
.await?;

// Each certificate only reaches here once
if let Some(event_handler) = &self.event_handler {
event_handler.process_events(&signed_effects.effects).await;
}

Ok(TransactionInfoResponse {
signed_transaction: self.database.get_transaction(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
})
}

fn index_tx(
&self,
indexes: &IndexStore,
seq: TxSequenceNumber,
digest: &TransactionDigest,
cert: &CertifiedTransaction,
effects: &SignedTransactionEffects,
) -> SuiResult {
indexes.index_tx(
cert.sender_address(),
cert.data.input_objects()?.iter().map(|o| o.object_id()),
effects.effects.mutated_and_created(),
seq,
digest,
)
}

async fn process_one_tx(&self, seq: TxSequenceNumber, digest: &TransactionDigest) -> SuiResult {
// Load cert and effects.
let info = self.make_transaction_info(digest).await?;
let (cert, effects) = match info {
TransactionInfoResponse {
certified_transaction: Some(cert),
signed_effects: Some(effects),
..
} => (cert, effects),
_ => {
return Err(SuiError::CertificateNotfound {
certificate_digest: *digest,
})
}
};

// Index tx
if let Some(indexes) = &self.indexes {
if let Err(e) = self.index_tx(indexes.as_ref(), seq, digest, &cert, &effects) {
warn!(?digest, "Couldn't index tx: {}", e);
}
}

// Emit events
if let Some(event_handler) = &self.event_handler {
event_handler.process_events(&effects.effects).await;
}

Ok(())
}

// TODO: This should persist the last successfully-processed sequence to disk, and upon
// starting up, look for any sequences in the store since then and process them.
pub async fn run_tx_post_processing_process(&self) -> SuiResult {
let mut subscriber = self.subscribe_batch();

loop {
match subscriber.recv().await {
Ok(item) => {
if let UpdateItem::Transaction((
seq,
ExecutionDigests {
transaction: digest,
..
},
)) = item
{
if let Err(e) = self.process_one_tx(seq, &digest).await {
warn!(?digest, "Couldn't process tx: {}", e);
}
}
}

// For both the error cases, we exit the loop which ends this task.
// TODO: Automatically restart the task, which in combination with the todo above,
// will process any skipped txes and then begin listening for new ones.
Err(RecvError::Closed) => {
// The service closed the channel.
error!("run_tx_post_processing_process receiver channel closed");
break;
}
Err(RecvError::Lagged(number_skipped)) => {
error!(
"run_tx_post_processing_process too slow, skipped {} txes",
number_skipped
);
break;
}
}
}

Ok(())
}

/// Check if we need to submit this transaction to consensus. We usually do, unless (i) we already
/// processed the transaction and we can immediately return the effects, or (ii) we already locked
/// all shared-objects of the transaction and can (re-)attempt execution.
Expand Down Expand Up @@ -1121,52 +1208,11 @@ impl AuthorityState {
let notifier_ticket = self.batch_notifier.ticket()?;
let seq = notifier_ticket.seq();

// We want to call update_state before updating the indexes, however this is extremely
// awkward because update_state takes temporary_store by value.
// TODO: Move indexing either into update_state, or make it a batch consumer to clean this
// up.
let (inputs, outputs) = if self.indexes.is_some() {
let inputs: Vec<_> = temporary_store
.objects()
.iter()
.map(|(_, o)| o.clone())
.collect();
let outputs: Vec<_> = temporary_store
.written()
.iter()
.map(|(_, (_, o))| o.clone())
.collect();
(Some(inputs), Some(outputs))
} else {
(None, None)
};

let update_type = UpdateType::Transaction(seq, signed_effects.effects.digest());

let res = self
.database
self.database
.update_state(temporary_store, certificate, signed_effects, update_type)
.await;

if let Some(indexes) = &self.indexes {
// unwrap ok because of previous if stmt.
let inputs = inputs.unwrap();
let outputs = outputs.unwrap();
// turn into vectors of references...
let inputs: Vec<_> = inputs.iter().collect();
let outputs: Vec<_> = outputs.iter().collect();
if let Err(e) = indexes.index_tx(
certificate.sender_address(),
&inputs,
&outputs,
seq,
certificate.digest(),
) {
error!("Error indexing certificate: {}", e);
}
}

res
.await

// implicitly we drop the ticket here and that notifies the batch manager
}
Expand Down
5 changes: 5 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
// sequence_transaction call above assigns a sequence number to the transaction
// the first time it is called and will return that same sequence on subsequent
// calls.
trace!(
"assigning seq {:?} -> {:?}",
transaction_digest,
effects_digest
);
self.executed_sequence.insert(
&assigned_seq,
&ExecutionDigests::new(transaction_digest, effects_digest),
Expand Down
17 changes: 16 additions & 1 deletion crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct SuiNode {
grpc_server: tokio::task::JoinHandle<Result<()>>,
_json_rpc_service: Option<jsonrpsee::http_server::HttpServerHandle>,
_batch_subsystem_handle: tokio::task::JoinHandle<Result<()>>,
_post_processing_subsystem_handle: Option<tokio::task::JoinHandle<Result<()>>>,
_gossip_handle: Option<tokio::task::JoinHandle<()>>,
state: Arc<AuthorityState>,
}
Expand Down Expand Up @@ -66,7 +67,7 @@ impl SuiNode {
config.public_key(),
secret,
store,
index_store,
index_store.clone(),
checkpoint_store,
genesis,
config.enable_event_processing,
Expand Down Expand Up @@ -118,6 +119,19 @@ impl SuiNode {
})
};

let post_processing_subsystem_handle =
if index_store.is_some() || config.enable_event_processing {
let indexing_state = state.clone();
Some(tokio::task::spawn(async move {
indexing_state
.run_tx_post_processing_process()
.await
.map_err(Into::into)
}))
} else {
None
};

let validator_service = if config.consensus_config().is_some() {
Some(ValidatorService::new(config, state.clone()).await?)
} else {
Expand Down Expand Up @@ -155,6 +169,7 @@ impl SuiNode {
_json_rpc_service: json_rpc_service,
_gossip_handle: gossip_handle,
_batch_subsystem_handle: batch_subsystem_handle,
_post_processing_subsystem_handle: post_processing_subsystem_handle,
state,
};

Expand Down
24 changes: 12 additions & 12 deletions crates/sui-storage/src/indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use sui_types::batch::TxSequenceNumber;

use sui_types::error::SuiResult;

use sui_types::object::Object;
use sui_types::base_types::ObjectRef;
use sui_types::object::Owner;

use typed_store::rocks::DBMap;
use typed_store::{reopen, traits::Map};
Expand Down Expand Up @@ -77,11 +78,11 @@ impl IndexStore {
}
}

pub fn index_tx(
pub fn index_tx<'a>(
&self,
sender: SuiAddress,
active_inputs: &[&Object],
mutated_objects: &[&Object],
active_inputs: impl Iterator<Item = ObjectID>,
mutated_objects: impl Iterator<Item = &'a (ObjectRef, Owner)> + Clone,
sequence: TxSequenceNumber,
digest: &TransactionDigest,
) -> SuiResult {
Expand All @@ -94,23 +95,22 @@ impl IndexStore {

let batch = batch.insert_batch(
&self.transactions_by_input_object_id,
active_inputs
.iter()
.map(|object| ((object.id(), sequence), *digest)),
active_inputs.map(|id| ((id, sequence), *digest)),
)?;

let batch = batch.insert_batch(
&self.transactions_by_mutated_object_id,
mutated_objects
.iter()
.map(|object| ((object.id(), sequence), *digest)),
.clone()
.map(|(obj_ref, _)| ((obj_ref.0, sequence), *digest)),
)?;

let batch = batch.insert_batch(
&self.transactions_to_addr,
mutated_objects.iter().filter_map(|object| {
object
.get_single_owner()
mutated_objects.filter_map(|(_, owner)| {
owner
.get_owner_address()
.ok()
.map(|addr| ((addr, sequence), digest))
}),
)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ impl TransactionEffects {
/// Return an iterator that iterates through both mutated and
/// created objects.
/// It doesn't include deleted objects.
pub fn mutated_and_created(&self) -> impl Iterator<Item = &(ObjectRef, Owner)> {
pub fn mutated_and_created(&self) -> impl Iterator<Item = &(ObjectRef, Owner)> + Clone {
self.mutated.iter().chain(self.created.iter())
}

Expand Down
8 changes: 8 additions & 0 deletions crates/sui/tests/full_node_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ async fn wait_for_tx(wait_digest: TransactionDigest, state: Arc<AuthorityState>)

#[tokio::test]
async fn test_full_node_follows_txes() -> Result<(), anyhow::Error> {
telemetry_subscribers::init_for_testing();

let (swarm, mut context, _) = setup_network_and_wallet().await?;

let config = swarm.config().generate_fullnode_config();
Expand Down Expand Up @@ -138,23 +140,29 @@ async fn test_full_node_indexes() -> Result<(), anyhow::Error> {
.state()
.get_transactions_by_input_object(transfered_object)
.await?;

assert_eq!(txes.len(), 1);
assert_eq!(txes[0].1, digest);

let txes = node
.state()
.get_transactions_by_mutated_object(transfered_object)
.await?;
assert_eq!(txes.len(), 1);
assert_eq!(txes[0].1, digest);

let txes = node.state().get_transactions_from_addr(sender).await?;
assert_eq!(txes.len(), 1);
assert_eq!(txes[0].1, digest);

let txes = node.state().get_transactions_to_addr(receiver).await?;
assert_eq!(txes.len(), 1);
assert_eq!(txes[0].1, digest);

// Note that this is also considered a tx to the sender, because it mutated
// one or more of the sender's objects.
let txes = node.state().get_transactions_to_addr(sender).await?;
assert_eq!(txes.len(), 1);
assert_eq!(txes[0].1, digest);

// No transactions have originated from the receiver
Expand Down

0 comments on commit 1759919

Please sign in to comment.