Skip to content

Commit

Permalink
Shared Object support in full node + tests. (Its mostly tests) (Myste…
Browse files Browse the repository at this point in the history
…nLabs#2620)

* Move halted check and certificate verification lower (fixes hole in try_skip_consensus)

* Shared Object support in full node + tests.

* Fix test flakiness and speed.

* flakiness: remove dumb gas selection code which could pick move
  packages
* speed: don't rely on gateway automatic gas selection except for the
  first request (since it enumerates all objects)
  • Loading branch information
mystenmark authored Jun 17, 2022
1 parent b647bbe commit 8037786
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 43 deletions.
63 changes: 48 additions & 15 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,29 +360,47 @@ impl AuthorityState {
}
}

pub async fn handle_node_sync_transaction(
&self,
certificate: CertifiedTransaction,
// Signed effects is signed by only one validator, it is not a
// CertifiedTransactionEffects. The caller of this (node_sync) must promise to
// wait until it has seen at least f+1 identifical effects digests matching this
// SignedTransactionEffects before calling this function, in order to prevent a
// byzantine validator from giving us incorrect effects.
signed_effects: SignedTransactionEffects,
) -> SuiResult {
let transaction_digest = *certificate.digest();
fp_ensure!(
signed_effects.effects.transaction_digest == transaction_digest,
// NOTE: the error message here will say 'Error acquiring lock' but what it means is
// 'error checking lock'.
SuiError::ErrorWhileProcessingConfirmationTransaction {
err: "effects/tx digest mismatch".to_string()
}
);

let tx_guard = self
.acquire_tx_guard(&transaction_digest, &certificate)
.await?;

if certificate.contains_shared_object() {
self.database
.acquire_shared_locks_from_effects(&certificate, &signed_effects.effects)?;
}

self.process_certificate(tx_guard, certificate).await?;
Ok(())
}

/// Confirm a transfer.
pub async fn handle_confirmation_transaction(
&self,
confirmation_transaction: ConfirmationTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.metrics.total_certs.inc();
let certificate = confirmation_transaction.certificate;
let transaction_digest = *certificate.digest();

if self.halted.load(Ordering::SeqCst) && !certificate.data.kind.is_system_tx() {
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

// Check the certificate and retrieve the transfer data.
let committee = &self.committee.load();
tracing::trace_span!("cert_check_signature")
.in_scope(|| certificate.verify(committee))
.map_err(|e| {
self.metrics.signature_errors.inc();
e
})?;

// This acquires a lock on the tx digest to prevent multiple concurrent executions of the
// same tx. While we don't need this for safety (tx sequencing is ultimately atomic), it is
// very common to receive the same tx multiple times simultaneously due to gossip, so we
Expand Down Expand Up @@ -487,8 +505,23 @@ impl AuthorityState {
tx_guard: CertTxGuard<'_>,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.metrics.total_certs.inc();
let transaction_digest = *certificate.digest();

if self.halted.load(Ordering::SeqCst) && !certificate.data.kind.is_system_tx() {
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

// Check the certificate and retrieve the transfer data.
let committee = &self.committee.load();
tracing::trace_span!("cert_check_signature")
.in_scope(|| certificate.verify(committee))
.map_err(|e| {
self.metrics.signature_errors.inc();
e
})?;

// The cert could have been processed by a concurrent attempt of the same cert, so check if
// the effects have already been written.
if self.database.effects_exists(&transaction_digest)? {
Expand Down
23 changes: 23 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,29 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
Ok(())
}

/// Lock a sequence number for the shared objects of the input transaction based on the effects
/// of that transaction. Used by the nodes, which don't listen to consensus.
pub fn acquire_shared_locks_from_effects(
&self,
certificate: &CertifiedTransaction,
effects: &TransactionEffects,
) -> SuiResult {
let digest = *certificate.digest();

let sequenced: Vec<_> = effects
.shared_objects
.iter()
.map(|(id, version, _)| ((digest, *id), *version))
.collect();
info!(?sequenced, "locking");

let mut write_batch = self.sequenced.batch();
write_batch = write_batch.insert_batch(&self.sequenced, sequenced)?;
write_batch.write()?;

Ok(())
}

/// Lock a sequence number for the shared objects of the input transaction. Also update the
/// last consensus index.
pub fn persist_certificate_and_lock_shared_objects(
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority_active/gossip/node_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sui_types::{
base_types::{AuthorityName, ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
committee::{Committee, StakeUnit},
error::{SuiError, SuiResult},
messages::{CertifiedTransaction, ConfirmationTransaction, SignedTransactionEffects},
messages::{CertifiedTransaction, SignedTransactionEffects},
};

use std::ops::Deref;
Expand Down Expand Up @@ -257,7 +257,7 @@ where
// TODO: support shared object TXes via something like:
// self.state.sequence_shared_locks_from_effects(effects).await
self.state
.handle_confirmation_transaction(ConfirmationTransaction { certificate: cert })
.handle_node_sync_transaction(cert, effects)
.await?;

// Garbage collect data for this tx.
Expand Down
Loading

0 comments on commit 8037786

Please sign in to comment.