From bb0e158c6a973c894ca5fe745ddcced2ce026569 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 14 Jun 2022 08:42:06 -0700 Subject: [PATCH] Add CertifiedTransactionEffects (#2539) * Add CertifiedTransactionEffects * feedback --- .../checkpoint_driver/tests.rs | 5 ++- crates/sui-core/src/authority_aggregator.rs | 37 ++++++++++++++----- .../src/checkpoints/tests/checkpoint_tests.rs | 5 ++- crates/sui-core/src/gateway_state.rs | 8 ++-- crates/sui-quorum-driver/src/lib.rs | 16 ++++---- crates/sui-types/src/messages.rs | 31 ++++++++++++---- crates/sui/tests/checkpoints_tests.rs | 17 +++++++-- crates/sui/tests/quorum_driver_tests.rs | 8 ++-- 8 files changed, 90 insertions(+), 37 deletions(-) diff --git a/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs b/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs index 8dd1cb3dd254a..ac4952695dba0 100644 --- a/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs +++ b/crates/sui-core/src/authority_active/checkpoint_driver/tests.rs @@ -49,7 +49,10 @@ async fn checkpoint_active_flow_happy_path() { .expect("All ok."); // Check whether this is a success? - assert!(matches!(effects.status, ExecutionStatus::Success { .. })); + assert!(matches!( + effects.effects.status, + ExecutionStatus::Success { .. } + )); println!("Execute at {:?}", tokio::time::Instant::now()); // Add some delay between transactions diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index 5e83c39d1f669..f7ec3011b6fb4 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -1069,12 +1069,17 @@ where pub async fn process_certificate( &self, certificate: CertifiedTransaction, - ) -> Result { + ) -> Result { + struct EffectsStakeInfo { + stake: StakeUnit, + effects: TransactionEffects, + signatures: Vec<(AuthorityName, AuthoritySignature)>, + } struct ProcessCertificateState { // Different authorities could return different effects. We want at least one effect to come // from 2f+1 authorities, which meets quorum and can be considered the approved effect. // The map here allows us to count the stake for each unique effect. - effects_map: HashMap<[u8; 32], (StakeUnit, TransactionEffects)>, + effects_map: HashMap<[u8; 32], EffectsStakeInfo>, bad_stake: StakeUnit, errors: Vec, } @@ -1169,10 +1174,15 @@ where let entry = state .effects_map .entry(inner_effects.digest()) - .or_insert((0, inner_effects.effects)); - entry.0 += weight; + .or_insert(EffectsStakeInfo { + stake: 0, + effects: inner_effects.effects, + signatures: vec![], + }); + entry.stake += weight; + entry.signatures.push((name, inner_effects.auth_signature.signature)); - if entry.0 >= threshold { + if entry.stake >= threshold { // It will set the timeout quite high. return Ok(ReduceOutput::ContinueWithTimeout( state, @@ -1213,13 +1223,22 @@ where // Check that one effects structure has more than 2f votes, // and return it. - for (stake, effects) in state.effects_map.into_values() { + for stake_info in state.effects_map.into_values() { + let EffectsStakeInfo { + stake, + effects, + signatures, + } = stake_info; if stake >= threshold { debug!( good_stake = stake, "Found an effect with good stake over threshold" ); - return Ok(effects); + return Ok(CertifiedTransactionEffects::new( + certificate.auth_sign_info.epoch, + effects, + signatures, + )); } } @@ -1241,7 +1260,7 @@ where pub async fn execute_transaction( &self, transaction: &Transaction, - ) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> { + ) -> Result<(CertifiedTransaction, CertifiedTransactionEffects), anyhow::Error> { let new_certificate = self .process_transaction(transaction.clone()) .instrument(tracing::debug_span!("process_tx")) @@ -1275,7 +1294,7 @@ where // If we have less stake telling us about the latest state of an object // we re-run the certificate on all authorities to ensure it is correct. if let Ok(effects) = self.process_certificate(cert_map[&tx_digest].clone()).await { - if effects.is_object_mutated_here(obj_ref) { + if effects.effects.is_object_mutated_here(obj_ref) { is_ok = true; } else { // TODO: Report a byzantine fault here diff --git a/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs b/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs index e116cd0c4d348..492a6c058fd37 100644 --- a/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs +++ b/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs @@ -1485,7 +1485,10 @@ async fn checkpoint_messaging_flow() { .expect("All ok."); // Check whether this is a success? - assert!(matches!(effects.status, ExecutionStatus::Success { .. })); + assert!(matches!( + effects.effects.status, + ExecutionStatus::Success { .. } + )); // Wait for a batch to go through // (We do not really wait, we jump there since real-time is not running). diff --git a/crates/sui-core/src/gateway_state.rs b/crates/sui-core/src/gateway_state.rs index fbb1f8cfbc0f4..29732475c3cd3 100644 --- a/crates/sui-core/src/gateway_state.rs +++ b/crates/sui-core/src/gateway_state.rs @@ -448,7 +448,7 @@ where &self, input_objects: InputObjects, transaction: Transaction, - ) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> { + ) -> Result<(CertifiedTransaction, CertifiedTransactionEffects), anyhow::Error> { // If execute_transaction ever fails due to panic, we should fix the panic and make sure it doesn't. // If execute_transaction fails, we should retry the same transaction, and it will // properly unlock the objects used in this transaction. In the short term, we will ask the wallet to retry failed transactions. @@ -485,6 +485,7 @@ where // Download the latest content of every mutated object from the authorities. let mutated_object_refs: BTreeSet<_> = effects + .effects .mutated_and_created() .map(|(obj_ref, _)| *obj_ref) .collect(); @@ -494,7 +495,7 @@ where let update_type = UpdateType::Transaction( self.next_tx_seq_number .fetch_add(1, std::sync::atomic::Ordering::SeqCst), - effects.digest(), + effects.effects.digest(), ); self.store .update_gateway_state( @@ -515,7 +516,7 @@ where &self, transaction: Transaction, is_last_retry: bool, - ) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> { + ) -> Result<(CertifiedTransaction, CertifiedTransactionEffects), anyhow::Error> { transaction.verify_signature()?; self.sync_input_objects_with_authorities(&transaction) @@ -898,6 +899,7 @@ where // Okay to unwrap() since we checked that this is Ok let (certificate, effects) = res.unwrap(); + let effects = effects.effects; debug!(?tx, ?certificate, ?effects, "Transaction succeeded"); // Create custom response base on the request type diff --git a/crates/sui-quorum-driver/src/lib.rs b/crates/sui-quorum-driver/src/lib.rs index d1e7dfa22a2cb..86aa5ccbc3785 100644 --- a/crates/sui-quorum-driver/src/lib.rs +++ b/crates/sui-quorum-driver/src/lib.rs @@ -13,8 +13,8 @@ use sui_core::authority_aggregator::AuthorityAggregator; use sui_core::authority_client::AuthorityAPI; use sui_types::error::{SuiError, SuiResult}; use sui_types::messages::{ - CertifiedTransaction, ExecuteTransactionRequest, ExecuteTransactionRequestType, - ExecuteTransactionResponse, Transaction, TransactionEffects, + CertifiedTransaction, CertifiedTransactionEffects, ExecuteTransactionRequest, + ExecuteTransactionRequestType, ExecuteTransactionResponse, Transaction, }; pub enum QuorumTask { @@ -29,7 +29,7 @@ pub struct QuorumDriverHandler { quorum_driver: Arc>, _processor_handle: JoinHandle<()>, // TODO: Change to CertifiedTransactionEffects eventually. - effects_subscriber: Receiver<(CertifiedTransaction, TransactionEffects)>, + effects_subscriber: Receiver<(CertifiedTransaction, CertifiedTransactionEffects)>, } /// The core data structure of the QuorumDriver. @@ -40,14 +40,14 @@ pub struct QuorumDriverHandler { pub struct QuorumDriver { validators: ArcSwap>, task_sender: Sender>, - effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>, + effects_subscribe_sender: Sender<(CertifiedTransaction, CertifiedTransactionEffects)>, } impl QuorumDriver { pub fn new( validators: AuthorityAggregator, task_sender: Sender>, - effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>, + effects_subscribe_sender: Sender<(CertifiedTransaction, CertifiedTransactionEffects)>, ) -> Self { Self { validators: ArcSwap::from(Arc::new(validators)), @@ -120,7 +120,7 @@ where pub async fn process_certificate( &self, certificate: CertifiedTransaction, - ) -> SuiResult<(CertifiedTransaction, TransactionEffects)> { + ) -> SuiResult<(CertifiedTransaction, CertifiedTransactionEffects)> { let effects = self .validators .load() @@ -162,7 +162,9 @@ where self.quorum_driver.clone() } - pub fn subscribe(&mut self) -> &mut Receiver<(CertifiedTransaction, TransactionEffects)> { + pub fn subscribe( + &mut self, + ) -> &mut Receiver<(CertifiedTransaction, CertifiedTransactionEffects)> { &mut self.effects_subscriber } diff --git a/crates/sui-types/src/messages.rs b/crates/sui-types/src/messages.rs index bc4cf144f59c4..f43223dea8730 100644 --- a/crates/sui-types/src/messages.rs +++ b/crates/sui-types/src/messages.rs @@ -1023,13 +1023,6 @@ impl TransactionEffects { } } - pub fn to_unsigned_effects(self) -> UnsignedTransactionEffects { - UnsignedTransactionEffects { - effects: self, - auth_signature: EmptySignInfo {}, - } - } - pub fn digest(&self) -> TransactionEffectsDigest { TransactionEffectsDigest(sha3_hash(self)) } @@ -1096,6 +1089,28 @@ impl PartialEq for SignedTransactionEffects { } } +pub type CertifiedTransactionEffects = TransactionEffectsEnvelope; + +impl CertifiedTransactionEffects { + pub fn new( + epoch: EpochId, + effects: TransactionEffects, + signatures: Vec<(AuthorityName, AuthoritySignature)>, + ) -> Self { + Self { + effects, + auth_signature: AuthorityQuorumSignInfo { epoch, signatures }, + } + } + + pub fn to_unsigned_effects(self) -> UnsignedTransactionEffects { + UnsignedTransactionEffects { + effects: self.effects, + auth_signature: EmptySignInfo {}, + } + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] pub enum InputObjectKind { // A Move package, must be immutable. @@ -1364,5 +1379,5 @@ pub enum ExecuteTransactionResponse { ImmediateReturn, TxCert(Box), // TODO: Change to CertifiedTransactionEffects eventually. - EffectsCert(Box<(CertifiedTransaction, TransactionEffects)>), + EffectsCert(Box<(CertifiedTransaction, CertifiedTransactionEffects)>), } diff --git a/crates/sui/tests/checkpoints_tests.rs b/crates/sui/tests/checkpoints_tests.rs index eef1832cb3f1b..d217b890ac317 100644 --- a/crates/sui/tests/checkpoints_tests.rs +++ b/crates/sui/tests/checkpoints_tests.rs @@ -163,7 +163,10 @@ async fn end_to_end() { .unwrap(); // If this check fails the transactions will not be included in the checkpoint. - assert!(matches!(effects.status, ExecutionStatus::Success { .. })); + assert!(matches!( + effects.effects.status, + ExecutionStatus::Success { .. } + )); // Add some delay between transactions tokio::time::sleep(Duration::from_millis(5)).await; @@ -253,8 +256,11 @@ async fn checkpoint_with_shared_objects() { .execute_transaction(&create_counter_transaction) .await .unwrap(); - assert!(matches!(effects.status, ExecutionStatus::Success { .. })); - let ((counter_id, _, _), _) = effects.created[0]; + assert!(matches!( + effects.effects.status, + ExecutionStatus::Success { .. } + )); + let ((counter_id, _, _), _) = effects.effects.created[0]; // We can finally make a valid shared-object transaction (incrementing the counter). tokio::task::yield_now().await; @@ -289,7 +295,10 @@ async fn checkpoint_with_shared_objects() { .unwrap(); // If this check fails the transactions will not be included in the checkpoint. - assert!(matches!(effects.status, ExecutionStatus::Success { .. })); + assert!(matches!( + effects.effects.status, + ExecutionStatus::Success { .. } + )); // Add some delay between transactions tokio::time::sleep(Duration::from_millis(5)).await; diff --git a/crates/sui/tests/quorum_driver_tests.rs b/crates/sui/tests/quorum_driver_tests.rs index 9d99bef70d192..4fb0f3769b2ba 100644 --- a/crates/sui/tests/quorum_driver_tests.rs +++ b/crates/sui/tests/quorum_driver_tests.rs @@ -40,7 +40,7 @@ async fn test_execute_transaction_immediate() { let handle = tokio::task::spawn(async move { let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap(); assert_eq!(*cert.digest(), digest); - assert_eq!(effects.transaction_digest, digest); + assert_eq!(effects.effects.transaction_digest, digest); }); assert!(matches!( quorum_driver @@ -66,7 +66,7 @@ async fn test_execute_transaction_wait_for_cert() { let handle = tokio::task::spawn(async move { let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap(); assert_eq!(*cert.digest(), digest); - assert_eq!(effects.transaction_digest, digest); + assert_eq!(effects.effects.transaction_digest, digest); }); if let ExecuteTransactionResponse::TxCert(cert) = quorum_driver .execute_transaction(ExecuteTransactionRequest { @@ -94,7 +94,7 @@ async fn test_execute_transaction_wait_for_effects() { let handle = tokio::task::spawn(async move { let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap(); assert_eq!(*cert.digest(), digest); - assert_eq!(effects.transaction_digest, digest); + assert_eq!(effects.effects.transaction_digest, digest); }); if let ExecuteTransactionResponse::EffectsCert(result) = quorum_driver .execute_transaction(ExecuteTransactionRequest { @@ -106,7 +106,7 @@ async fn test_execute_transaction_wait_for_effects() { { let (cert, effects) = *result; assert_eq!(*cert.digest(), digest); - assert_eq!(effects.transaction_digest, digest); + assert_eq!(effects.effects.transaction_digest, digest); } else { unreachable!(); }