Skip to content

Commit

Permalink
Merge handle_confirmation_transaction and handle_consensus_transaction (
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jul 7, 2022
1 parent 87dbe33 commit 14fd340
Show file tree
Hide file tree
Showing 22 changed files with 189 additions and 390 deletions.
4 changes: 2 additions & 2 deletions crates/sui-benchmark/src/benchmark/load_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn send_tx_chunks(
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let resp2 = client
.handle_confirmation_transaction(ConfirmationTransaction { certificate })
.handle_certificate(certificate)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
resps.push(resp1);
Expand Down Expand Up @@ -138,7 +138,7 @@ pub async fn send_confs(
let mut resps = Vec::new();
for certificate in txns {
let resp = client
.handle_confirmation_transaction(ConfirmationTransaction { certificate })
.handle_certificate(certificate)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
resps.push(resp);
Expand Down
102 changes: 35 additions & 67 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,9 @@ impl AuthorityState {
}
}

pub async fn handle_node_sync_transaction(
/// We cannot use handle_certificate in fullnode to execute a certificate because there is no
/// consensus engine to assign locks for shared objects. Hence we need special handling here.
pub async fn handle_node_sync_certificate(
&self,
certificate: CertifiedTransaction,
// Signed effects is signed by only one validator, it is not a
Expand All @@ -392,7 +394,7 @@ impl AuthorityState {
}
);

let tx_guard = self.acquire_tx_guard(&digest, &certificate).await?;
let tx_guard = self.acquire_tx_guard(&certificate).await?;

if certificate.contains_shared_object() {
self.database
Expand All @@ -403,13 +405,11 @@ impl AuthorityState {
Ok(())
}

/// Confirm a transfer.
pub async fn handle_confirmation_transaction(
pub async fn handle_certificate(
&self,
confirmation_transaction: ConfirmationTransaction,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
let certificate = confirmation_transaction.certificate;
let digest = *certificate.digest();
let digest = certificate.digest();
debug!(?digest, "handle_confirmation_transaction");

// This acquires a lock on the tx digest to prevent multiple concurrent executions of the
Expand All @@ -422,16 +422,16 @@ impl AuthorityState {
// to do this, since the false contention can be made arbitrarily low (no cost for 1.0 -
// epsilon of txes) while solutions without false contention have slightly higher cost
// for every tx.
let tx_guard = self.acquire_tx_guard(&digest, &certificate).await?;
let tx_guard = self.acquire_tx_guard(&certificate).await?;

self.process_certificate(tx_guard, certificate).await
}

async fn acquire_tx_guard<'a, 'b>(
&'a self,
digest: &'b TransactionDigest,
cert: &'b CertifiedTransaction,
) -> SuiResult<CertTxGuard<'a>> {
let digest = cert.digest();
match self.database.wal.begin_tx(digest, cert).await? {
Some(g) => Ok(g),
None => {
Expand Down Expand Up @@ -516,14 +516,20 @@ impl AuthorityState {
) -> SuiResult<TransactionInfoResponse> {
self.metrics.total_certs.inc();
let transaction_digest = *certificate.digest();
// The cert could have been processed by a concurrent attempt of the same cert, so check if
// the effects have already been written.
if let Some(info) = self.check_tx_already_executed(&transaction_digest).await? {
tx_guard.release();
return Ok(info);
}

if self.halted.load(Ordering::SeqCst) && !certificate.data.kind.is_system_tx() {
tx_guard.release();
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

// Check the certificate and retrieve the transfer data.
// Check the certificate signatures.
let committee = &self.committee.load();
tracing::trace_span!("cert_check_signature")
.in_scope(|| certificate.verify(committee))
Expand All @@ -532,15 +538,6 @@ impl AuthorityState {
e
})?;

// The cert could have been processed by a concurrent attempt of the same cert, so check if
// the effects have already been written.
if self.database.effects_exists(&transaction_digest)? {
let info = self.make_transaction_info(&transaction_digest).await?;
debug!("Transaction {transaction_digest:?} already executed");
tx_guard.release();
return Ok(info);
}

// Errors originating from prepare_certificate may be transient (failure to read locks) or
// non-transient (transaction input is invalid, move vm errors). However, all errors from
// this function occur before we have written anything to the db, so we commit the tx
Expand Down Expand Up @@ -651,6 +648,18 @@ impl AuthorityState {
Ok((temporary_store, signed_effects))
}

pub async fn check_tx_already_executed(
&self,
digest: &TransactionDigest,
) -> SuiResult<Option<TransactionInfoResponse>> {
if self.database.effects_exists(digest)? {
debug!("Transaction {digest:?} already executed");
Ok(Some(self.make_transaction_info(digest).await?))
} else {
Ok(None)
}
}

fn index_tx(
&self,
indexes: &IndexStore,
Expand Down Expand Up @@ -760,49 +769,6 @@ impl AuthorityState {
u64::try_from(ts_ms).expect("Travelling in time machine")
}

/// Check if we need to submit this transaction to consensus. We usually do, unless (i) we already
/// processed the transaction and we can immediately return the effects, or (ii) we already locked
/// all shared-objects of the transaction and can (re-)attempt execution.
#[instrument(level = "debug", name = "prepare_certificate", skip_all)]
pub async fn try_skip_consensus(
&self,
certificate: CertifiedTransaction,
) -> Result<Option<TransactionInfoResponse>, SuiError> {
// Ensure the input is a shared object certificate
fp_ensure!(
certificate.contains_shared_object(),
SuiError::NotASharedObjectTransaction
);

// If we already executed this transaction, return the signed effects.
let digest = certificate.digest();
if self.database.effects_exists(digest)? {
debug!("Shared-object transaction {digest:?} already executed");
return self.make_transaction_info(digest).await.map(Some);
}

// If we already assigned locks to this transaction, we can try to execute it immediately.
// This can happen to transaction previously submitted to consensus that failed execution
// due to missing dependencies.
if self.transaction_shared_locks_exist(&certificate).await? {
// Attempt to execute the transaction. This will only succeed if the authority
// already executed all its dependencies and if the locks are correctly attributed to
// the transaction (ie. this transaction is the next to be executed).
debug!("Shared-locks already assigned to {digest:?} - executing now");

let tx_guard = self.acquire_tx_guard(digest, &certificate).await?;

return self
.process_certificate(tx_guard, certificate)
.await
.map(Some);
}

// If we didn't already attributed shared locks to this transaction, it needs to go
// through consensus.
Ok(None)
}

pub async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
Expand Down Expand Up @@ -1437,7 +1403,7 @@ impl AuthorityState {
}

/// Check whether a shared-object certificate has already been given shared-locks.
async fn transaction_shared_locks_exist(
pub async fn transaction_shared_locks_exist(
&self,
certificate: &CertifiedTransaction,
) -> SuiResult<bool> {
Expand Down Expand Up @@ -1503,6 +1469,7 @@ impl ExecutionState for AuthorityState {
type Transaction = ConsensusTransaction;
type Error = SuiError;

/// This function will be called by Narwhal, after Narwhal sequenced this certificate.
#[instrument(name = "handle_consensus_transaction", level = "debug", skip_all)]
async fn handle_consensus_transaction(
&self,
Expand All @@ -1522,11 +1489,12 @@ impl ExecutionState for AuthorityState {
let shared_locks = self.transaction_shared_locks_exist(&certificate).await?;

// If we already executed this transaction, return the signed effects.
// This is not an optimization, and is critical for safety. It is to ensure that
// we don't end up re-assigning shared object locks after they are unlocked when
// the transaction was committed.
let digest = certificate.digest();
if self.database.effects_exists(digest)? {
debug!(?digest, "Shared-object transaction already executed");
let info = self.make_transaction_info(digest).await?;
return Ok(bincode::serialize(&info).expect("Failed to serialize tx info"));
if let Some(response) = self.check_tx_already_executed(digest).await? {
return Ok(bincode::serialize(&response).expect("Failed to serialize tx info"));
}

// If we didn't already assigned shared-locks to this transaction, we do it now.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};

use std::{collections::BTreeSet, sync::Arc, time::Duration};
use sui_types::messages::{ConfirmationTransaction, ExecutionStatus};
use sui_types::messages::ExecutionStatus;

use crate::checkpoints::checkpoint_tests::checkpoint_tests_setup;

Expand Down Expand Up @@ -143,7 +143,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
let client: SafeClient<LocalAuthorityClient> =
sender_aggregator.authority_clients[sample_authority].clone();
let _response = client
.handle_confirmation_transaction(ConfirmationTransaction::new(new_certificate))
.handle_certificate(new_certificate)
.await
.expect("Problem processing certificate");

Expand Down Expand Up @@ -237,7 +237,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
let client: SafeClient<LocalAuthorityClient> =
sender_aggregator.authority_clients[sample_authority].clone();
let _response = client
.handle_confirmation_transaction(ConfirmationTransaction::new(new_certificate))
.handle_certificate(new_certificate)
.await
.expect("Problem processing certificate");

Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use typed_store::Map;
use crate::authority::AuthorityStore;
use crate::authority_client::AuthorityAPI;

use super::{gossip::LocalConfirmationTransactionHandler, ActiveAuthority};
use super::{gossip::LocalCertificateHandler, ActiveAuthority};

#[cfg(test)]
pub(crate) mod tests;
Expand Down Expand Up @@ -94,7 +94,7 @@ where
.map(|((i, d), c)| (i, d, c.as_ref().expect("certificate must exist")))
.collect();

let local_handler = LocalConfirmationTransactionHandler {
let local_handler = LocalCertificateHandler {
state: active_authority.state.clone(),
};

Expand All @@ -111,7 +111,7 @@ where

// Sync and Execute with local authority state
net.sync_certificate_to_authority_with_timeout_inner(
sui_types::messages::ConfirmationTransaction::new(c.clone()),
c.clone(),
active_authority.state.name,
&local_handler,
tokio::time::Duration::from_secs(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use sui_types::crypto::{get_key_pair, KeyPair, PublicKeyBytes};
use sui_types::error::SuiError;
use sui_types::messages::{
AccountInfoRequest, AccountInfoResponse, BatchInfoRequest, BatchInfoResponseItem,
ConfirmationTransaction, ConsensusTransaction, ObjectInfoRequest, ObjectInfoResponse,
Transaction, TransactionInfoRequest, TransactionInfoResponse,
CertifiedTransaction, ObjectInfoRequest, ObjectInfoResponse, Transaction,
TransactionInfoRequest, TransactionInfoResponse,
};
use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse};
use sui_types::object::Object;
Expand Down Expand Up @@ -103,23 +103,12 @@ impl AuthorityAPI for ConfigurableBatchActionClient {
state.handle_transaction(transaction).await
}

async fn handle_confirmation_transaction(
async fn handle_certificate(
&self,
transaction: ConfirmationTransaction,
certificate: CertifiedTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
let state = self.state.clone();
state.handle_confirmation_transaction(transaction).await
}

async fn handle_consensus_transaction(
&self,
_transaction: ConsensusTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
Ok(TransactionInfoResponse {
signed_transaction: None,
certified_transaction: None,
signed_effects: None,
})
state.handle_certificate(certificate).await
}

async fn handle_account_info_request(
Expand Down
21 changes: 12 additions & 9 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::{
authority::AuthorityState,
authority_aggregator::{AuthorityAggregator, ConfirmationTransactionHandler},
authority_aggregator::{AuthorityAggregator, CertificateHandler},
authority_client::AuthorityAPI,
safe_client::SafeClient,
};
Expand All @@ -26,8 +26,7 @@ use sui_types::{
batch::{TxSequenceNumber, UpdateItem},
error::{SuiError, SuiResult},
messages::{
BatchInfoRequest, BatchInfoResponseItem, ConfirmationTransaction, TransactionInfoRequest,
TransactionInfoResponse,
BatchInfoRequest, BatchInfoResponseItem, TransactionInfoRequest, TransactionInfoResponse,
},
};
use tracing::{debug, error, info, trace};
Expand All @@ -40,6 +39,7 @@ pub(crate) mod tests;

mod node_sync;
use node_sync::NodeSyncDigestHandler;
use sui_types::messages::CertifiedTransaction;

struct Follower<A> {
peer_name: AuthorityName,
Expand Down Expand Up @@ -205,14 +205,17 @@ async fn wait_for_one_gossip_task_to_finish<A>(
peer_names.remove(&finished_name);
}

pub struct LocalConfirmationTransactionHandler {
pub struct LocalCertificateHandler {
pub state: Arc<AuthorityState>,
}

#[async_trait]
impl ConfirmationTransactionHandler for LocalConfirmationTransactionHandler {
async fn handle(&self, cert: ConfirmationTransaction) -> SuiResult<TransactionInfoResponse> {
self.state.handle_confirmation_transaction(cert).await
impl CertificateHandler for LocalCertificateHandler {
async fn handle(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.state.handle_certificate(certificate).await
}

fn destination_name(&self) -> String {
Expand Down Expand Up @@ -274,9 +277,9 @@ impl GossipDigestHandler {
follower
.aggregator
.sync_authority_source_to_destination(
ConfirmationTransaction { certificate },
certificate,
follower.peer_name,
&LocalConfirmationTransactionHandler {
&LocalCertificateHandler {
state: follower.state.clone(),
},
)
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_active/gossip/node_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ where
}

self.state
.handle_node_sync_transaction(cert, effects)
.handle_node_sync_certificate(cert, effects)
.await?;

// Garbage collect data for this tx.
Expand Down
Loading

0 comments on commit 14fd340

Please sign in to comment.