Skip to content

Commit

Permalink
Add CertifiedTransactionEffects (MystenLabs#2539)
Browse files Browse the repository at this point in the history
* Add CertifiedTransactionEffects

* feedback
  • Loading branch information
lxfind authored Jun 14, 2022
1 parent c4b078a commit bb0e158
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ async fn checkpoint_active_flow_happy_path() {
.expect("All ok.");

// Check whether this is a success?
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
assert!(matches!(
effects.effects.status,
ExecutionStatus::Success { .. }
));
println!("Execute at {:?}", tokio::time::Instant::now());

// Add some delay between transactions
Expand Down
37 changes: 28 additions & 9 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,12 +1069,17 @@ where
pub async fn process_certificate(
&self,
certificate: CertifiedTransaction,
) -> Result<TransactionEffects, SuiError> {
) -> Result<CertifiedTransactionEffects, SuiError> {
struct EffectsStakeInfo {
stake: StakeUnit,
effects: TransactionEffects,
signatures: Vec<(AuthorityName, AuthoritySignature)>,
}
struct ProcessCertificateState {
// Different authorities could return different effects. We want at least one effect to come
// from 2f+1 authorities, which meets quorum and can be considered the approved effect.
// The map here allows us to count the stake for each unique effect.
effects_map: HashMap<[u8; 32], (StakeUnit, TransactionEffects)>,
effects_map: HashMap<[u8; 32], EffectsStakeInfo>,
bad_stake: StakeUnit,
errors: Vec<SuiError>,
}
Expand Down Expand Up @@ -1169,10 +1174,15 @@ where
let entry = state
.effects_map
.entry(inner_effects.digest())
.or_insert((0, inner_effects.effects));
entry.0 += weight;
.or_insert(EffectsStakeInfo {
stake: 0,
effects: inner_effects.effects,
signatures: vec![],
});
entry.stake += weight;
entry.signatures.push((name, inner_effects.auth_signature.signature));

if entry.0 >= threshold {
if entry.stake >= threshold {
// It will set the timeout quite high.
return Ok(ReduceOutput::ContinueWithTimeout(
state,
Expand Down Expand Up @@ -1213,13 +1223,22 @@ where

// Check that one effects structure has more than 2f votes,
// and return it.
for (stake, effects) in state.effects_map.into_values() {
for stake_info in state.effects_map.into_values() {
let EffectsStakeInfo {
stake,
effects,
signatures,
} = stake_info;
if stake >= threshold {
debug!(
good_stake = stake,
"Found an effect with good stake over threshold"
);
return Ok(effects);
return Ok(CertifiedTransactionEffects::new(
certificate.auth_sign_info.epoch,
effects,
signatures,
));
}
}

Expand All @@ -1241,7 +1260,7 @@ where
pub async fn execute_transaction(
&self,
transaction: &Transaction,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
) -> Result<(CertifiedTransaction, CertifiedTransactionEffects), anyhow::Error> {
let new_certificate = self
.process_transaction(transaction.clone())
.instrument(tracing::debug_span!("process_tx"))
Expand Down Expand Up @@ -1275,7 +1294,7 @@ where
// If we have less stake telling us about the latest state of an object
// we re-run the certificate on all authorities to ensure it is correct.
if let Ok(effects) = self.process_certificate(cert_map[&tx_digest].clone()).await {
if effects.is_object_mutated_here(obj_ref) {
if effects.effects.is_object_mutated_here(obj_ref) {
is_ok = true;
} else {
// TODO: Report a byzantine fault here
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1485,7 +1485,10 @@ async fn checkpoint_messaging_flow() {
.expect("All ok.");

// Check whether this is a success?
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
assert!(matches!(
effects.effects.status,
ExecutionStatus::Success { .. }
));

// Wait for a batch to go through
// (We do not really wait, we jump there since real-time is not running).
Expand Down
8 changes: 5 additions & 3 deletions crates/sui-core/src/gateway_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ where
&self,
input_objects: InputObjects,
transaction: Transaction,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
) -> Result<(CertifiedTransaction, CertifiedTransactionEffects), anyhow::Error> {
// If execute_transaction ever fails due to panic, we should fix the panic and make sure it doesn't.
// If execute_transaction fails, we should retry the same transaction, and it will
// properly unlock the objects used in this transaction. In the short term, we will ask the wallet to retry failed transactions.
Expand Down Expand Up @@ -485,6 +485,7 @@ where

// Download the latest content of every mutated object from the authorities.
let mutated_object_refs: BTreeSet<_> = effects
.effects
.mutated_and_created()
.map(|(obj_ref, _)| *obj_ref)
.collect();
Expand All @@ -494,7 +495,7 @@ where
let update_type = UpdateType::Transaction(
self.next_tx_seq_number
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
effects.digest(),
effects.effects.digest(),
);
self.store
.update_gateway_state(
Expand All @@ -515,7 +516,7 @@ where
&self,
transaction: Transaction,
is_last_retry: bool,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
) -> Result<(CertifiedTransaction, CertifiedTransactionEffects), anyhow::Error> {
transaction.verify_signature()?;

self.sync_input_objects_with_authorities(&transaction)
Expand Down Expand Up @@ -898,6 +899,7 @@ where

// Okay to unwrap() since we checked that this is Ok
let (certificate, effects) = res.unwrap();
let effects = effects.effects;

debug!(?tx, ?certificate, ?effects, "Transaction succeeded");
// Create custom response base on the request type
Expand Down
16 changes: 9 additions & 7 deletions crates/sui-quorum-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::authority_client::AuthorityAPI;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{
CertifiedTransaction, ExecuteTransactionRequest, ExecuteTransactionRequestType,
ExecuteTransactionResponse, Transaction, TransactionEffects,
CertifiedTransaction, CertifiedTransactionEffects, ExecuteTransactionRequest,
ExecuteTransactionRequestType, ExecuteTransactionResponse, Transaction,
};

pub enum QuorumTask<A> {
Expand All @@ -29,7 +29,7 @@ pub struct QuorumDriverHandler<A> {
quorum_driver: Arc<QuorumDriver<A>>,
_processor_handle: JoinHandle<()>,
// TODO: Change to CertifiedTransactionEffects eventually.
effects_subscriber: Receiver<(CertifiedTransaction, TransactionEffects)>,
effects_subscriber: Receiver<(CertifiedTransaction, CertifiedTransactionEffects)>,
}

/// The core data structure of the QuorumDriver.
Expand All @@ -40,14 +40,14 @@ pub struct QuorumDriverHandler<A> {
pub struct QuorumDriver<A> {
validators: ArcSwap<AuthorityAggregator<A>>,
task_sender: Sender<QuorumTask<A>>,
effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>,
effects_subscribe_sender: Sender<(CertifiedTransaction, CertifiedTransactionEffects)>,
}

impl<A> QuorumDriver<A> {
pub fn new(
validators: AuthorityAggregator<A>,
task_sender: Sender<QuorumTask<A>>,
effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>,
effects_subscribe_sender: Sender<(CertifiedTransaction, CertifiedTransactionEffects)>,
) -> Self {
Self {
validators: ArcSwap::from(Arc::new(validators)),
Expand Down Expand Up @@ -120,7 +120,7 @@ where
pub async fn process_certificate(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<(CertifiedTransaction, TransactionEffects)> {
) -> SuiResult<(CertifiedTransaction, CertifiedTransactionEffects)> {
let effects = self
.validators
.load()
Expand Down Expand Up @@ -162,7 +162,9 @@ where
self.quorum_driver.clone()
}

pub fn subscribe(&mut self) -> &mut Receiver<(CertifiedTransaction, TransactionEffects)> {
pub fn subscribe(
&mut self,
) -> &mut Receiver<(CertifiedTransaction, CertifiedTransactionEffects)> {
&mut self.effects_subscriber
}

Expand Down
31 changes: 23 additions & 8 deletions crates/sui-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,13 +1023,6 @@ impl TransactionEffects {
}
}

pub fn to_unsigned_effects(self) -> UnsignedTransactionEffects {
UnsignedTransactionEffects {
effects: self,
auth_signature: EmptySignInfo {},
}
}

pub fn digest(&self) -> TransactionEffectsDigest {
TransactionEffectsDigest(sha3_hash(self))
}
Expand Down Expand Up @@ -1096,6 +1089,28 @@ impl PartialEq for SignedTransactionEffects {
}
}

pub type CertifiedTransactionEffects = TransactionEffectsEnvelope<AuthorityQuorumSignInfo>;

impl CertifiedTransactionEffects {
pub fn new(
epoch: EpochId,
effects: TransactionEffects,
signatures: Vec<(AuthorityName, AuthoritySignature)>,
) -> Self {
Self {
effects,
auth_signature: AuthorityQuorumSignInfo { epoch, signatures },
}
}

pub fn to_unsigned_effects(self) -> UnsignedTransactionEffects {
UnsignedTransactionEffects {
effects: self.effects,
auth_signature: EmptySignInfo {},
}
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum InputObjectKind {
// A Move package, must be immutable.
Expand Down Expand Up @@ -1364,5 +1379,5 @@ pub enum ExecuteTransactionResponse {
ImmediateReturn,
TxCert(Box<CertifiedTransaction>),
// TODO: Change to CertifiedTransactionEffects eventually.
EffectsCert(Box<(CertifiedTransaction, TransactionEffects)>),
EffectsCert(Box<(CertifiedTransaction, CertifiedTransactionEffects)>),
}
17 changes: 13 additions & 4 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ async fn end_to_end() {
.unwrap();

// If this check fails the transactions will not be included in the checkpoint.
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
assert!(matches!(
effects.effects.status,
ExecutionStatus::Success { .. }
));

// Add some delay between transactions
tokio::time::sleep(Duration::from_millis(5)).await;
Expand Down Expand Up @@ -253,8 +256,11 @@ async fn checkpoint_with_shared_objects() {
.execute_transaction(&create_counter_transaction)
.await
.unwrap();
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
let ((counter_id, _, _), _) = effects.created[0];
assert!(matches!(
effects.effects.status,
ExecutionStatus::Success { .. }
));
let ((counter_id, _, _), _) = effects.effects.created[0];

// We can finally make a valid shared-object transaction (incrementing the counter).
tokio::task::yield_now().await;
Expand Down Expand Up @@ -289,7 +295,10 @@ async fn checkpoint_with_shared_objects() {
.unwrap();

// If this check fails the transactions will not be included in the checkpoint.
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
assert!(matches!(
effects.effects.status,
ExecutionStatus::Success { .. }
));

// Add some delay between transactions
tokio::time::sleep(Duration::from_millis(5)).await;
Expand Down
8 changes: 4 additions & 4 deletions crates/sui/tests/quorum_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn test_execute_transaction_immediate() {
let handle = tokio::task::spawn(async move {
let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap();
assert_eq!(*cert.digest(), digest);
assert_eq!(effects.transaction_digest, digest);
assert_eq!(effects.effects.transaction_digest, digest);
});
assert!(matches!(
quorum_driver
Expand All @@ -66,7 +66,7 @@ async fn test_execute_transaction_wait_for_cert() {
let handle = tokio::task::spawn(async move {
let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap();
assert_eq!(*cert.digest(), digest);
assert_eq!(effects.transaction_digest, digest);
assert_eq!(effects.effects.transaction_digest, digest);
});
if let ExecuteTransactionResponse::TxCert(cert) = quorum_driver
.execute_transaction(ExecuteTransactionRequest {
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn test_execute_transaction_wait_for_effects() {
let handle = tokio::task::spawn(async move {
let (cert, effects) = quorum_driver_handler.subscribe().recv().await.unwrap();
assert_eq!(*cert.digest(), digest);
assert_eq!(effects.transaction_digest, digest);
assert_eq!(effects.effects.transaction_digest, digest);
});
if let ExecuteTransactionResponse::EffectsCert(result) = quorum_driver
.execute_transaction(ExecuteTransactionRequest {
Expand All @@ -106,7 +106,7 @@ async fn test_execute_transaction_wait_for_effects() {
{
let (cert, effects) = *result;
assert_eq!(*cert.digest(), digest);
assert_eq!(effects.transaction_digest, digest);
assert_eq!(effects.effects.transaction_digest, digest);
} else {
unreachable!();
}
Expand Down

0 comments on commit bb0e158

Please sign in to comment.