Skip to content

Commit

Permalink
Enable write-ahead log, recover in-progress TXes at startup. (MystenL…
Browse files Browse the repository at this point in the history
…abs#2518)

* Use the WriteAheadLog to recover partially commited transactions.

* Ensure that we never get stuck in an infinite recovery loop

If a poison pill tx is ever found (that somehow crashes the validator)
we must stop retrying it every time we restart.

* Writing effects must be the last step

* fmt

* Improve logging

* Remove shared locks only after effects have been stored

* Address PR comments

* Ensure we never broadcast an effects digest for which there are no stored effects

* Fix gateway state updates

* fmt

* PR feedback
  • Loading branch information
mystenmark authored Jun 16, 2022
1 parent 87ea17d commit 6f33388
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 141 deletions.
220 changes: 177 additions & 43 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use std::{
};
use sui_adapter::adapter;
use sui_config::genesis::Genesis;
use sui_storage::IndexStore;
use sui_storage::{
write_ahead_log::{DBTxGuard, TxGuard, WriteAheadLog},
IndexStore,
};
use sui_types::{
base_types::*,
batch::{TxSequenceNumber, UpdateItem},
Expand Down Expand Up @@ -98,6 +101,9 @@ pub mod authority_notifier;
pub const MAX_ITEMS_LIMIT: u64 = 100_000;
const BROADCAST_CAPACITY: usize = 10_000;

const MAX_TX_RECOVERY_RETRY: u32 = 3;
type CertTxGuard<'a> = DBTxGuard<'a, CertifiedTransaction>;

/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
pub struct AuthorityMetrics {
tx_orders: IntCounter,
Expand Down Expand Up @@ -360,39 +366,62 @@ impl AuthorityState {
confirmation_transaction: ConfirmationTransaction,
) -> SuiResult<TransactionInfoResponse> {
self.metrics.total_certs.inc();
let transaction_digest = *confirmation_transaction.certificate.digest();

// Ensure an idempotent answer.
if self.database.effects_exists(&transaction_digest)? {
let info = self.make_transaction_info(&transaction_digest).await?;
debug!("Transaction {transaction_digest:?} already executed");
return Ok(info);
}
let certificate = confirmation_transaction.certificate;
let transaction_digest = *certificate.digest();

if self.halted.load(Ordering::SeqCst)
&& !confirmation_transaction
.certificate
.data
.kind
.is_system_tx()
{
if self.halted.load(Ordering::SeqCst) && !certificate.data.kind.is_system_tx() {
// TODO: Do we want to include the new validator set?
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}

// Check the certificate and retrieve the transfer data.
let committee = &self.committee.load();
tracing::trace_span!("cert_check_signature")
.in_scope(|| {
confirmation_transaction
.certificate
.verify(&self.committee.load())
})
.in_scope(|| certificate.verify(committee))
.map_err(|e| {
self.metrics.signature_errors.inc();
e
})?;

self.process_certificate(confirmation_transaction).await
// This acquires a lock on the tx digest to prevent multiple concurrent executions of the
// same tx. While we don't need this for safety (tx sequencing is ultimately atomic), it is
// very common to receive the same tx multiple times simultaneously due to gossip, so we
// may as well hold the lock and save the cpu time for other requests.
//
// Note that this lock has some false contention (since it uses a MutexTable), so you can't
// assume that different txes can execute concurrently. This is probably the fastest way
// to do this, since the false contention can be made arbitrarily low (no cost for 1.0 -
// epsilon of txes) while solutions without false contention have slightly higher cost
// for every tx.
let tx_guard = self
.acquire_tx_guard(&transaction_digest, &certificate)
.await?;

self.process_certificate(tx_guard, certificate).await
}

async fn acquire_tx_guard<'a>(
&'a self,
digest: &TransactionDigest,
cert: &CertifiedTransaction,
) -> SuiResult<CertTxGuard<'a>> {
match self.database.wal.begin_tx(digest, cert).await? {
Some(g) => Ok(g),
None => {
// If the tx previously errored out without committing, we return an
// error now as well. We could retry the transaction on behalf of
// the client right now, but:
//
// a) This keeps the normal and recovery paths separated.
// b) If a client finds a way to create a tx that always fails here,
// allowing them to retry it on command could be a DoS channel.
let err = "previous attempt of transaction resulted in an error - \
transaction will be retried offline"
.to_owned();
debug!(?digest, "{}", err);
Err(SuiError::ErrorWhileProcessingConfirmationTransaction { err })
}
}
}

#[instrument(level = "trace", skip_all)]
Expand Down Expand Up @@ -455,14 +484,72 @@ impl AuthorityState {
#[instrument(level = "debug", name = "process_cert_inner", skip_all)]
async fn process_certificate(
&self,
confirmation_transaction: ConfirmationTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
let certificate = confirmation_transaction.certificate;
tx_guard: CertTxGuard<'_>,
certificate: CertifiedTransaction,
) -> SuiResult<TransactionInfoResponse> {
let transaction_digest = *certificate.digest();

// The cert could have been processed by a concurrent attempt of the same cert, so check if
// the effects have already been written.
if self.database.effects_exists(&transaction_digest)? {
let info = self.make_transaction_info(&transaction_digest).await?;
debug!("Transaction {transaction_digest:?} already executed");
tx_guard.release();
return Ok(info);
}

// Errors originating from prepare_certificate may be transient (failure to read locks) or
// non-transient (transaction input is invalid, move vm errors). However, all errors from
// this function occur before we have written anything to the db, so we commit the tx
// guard and rely on the client to retry the tx (if it was transient).
let (temporary_store, signed_effects) = match self
.prepare_certificate(&certificate, transaction_digest)
.await
{
Err(e) => {
debug!(name = ?self.name, digest = ?transaction_digest, "Error preparing transaction: {}", e);
tx_guard.release();
return Err(e);
}
Ok(res) => res,
};

// If commit_certificate returns an error, tx_guard will be dropped and the certificate
// will be persisted in the log for later recovery.
self.commit_certificate(temporary_store, &certificate, &signed_effects)
.await?;

// commit_certificate finished, the tx is fully committed to the store.
tx_guard.commit_tx();

Ok(TransactionInfoResponse {
signed_transaction: self.database.get_transaction(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
})
}

/// prepare_certificate validates the transaction input, and executes the certificate,
/// returning effects, output objects, events, etc.
///
/// It reads state from the db (both owned and shared locks), but it has no side effects.
///
/// It can be generally understood that a failure of prepare_certificate indicates a
/// non-transient error, e.g. the transaction input is somehow invalid, the correct
/// locks are not held, etc. However, this is not entirely true, as a transient db read error
/// may also cause this function to fail.
#[instrument(level = "debug", name = "prepare_certificate", skip_all)]
async fn prepare_certificate(
&self,
certificate: &CertifiedTransaction,
transaction_digest: TransactionDigest,
) -> SuiResult<(
AuthorityTemporaryStore<AuthorityStore>,
SignedTransactionEffects,
)> {
let (gas_status, input_objects) = transaction_input_checker::check_transaction_input(
&self.database,
&certificate,
certificate,
&self.metrics.shared_obj_tx,
)
.await?;
Expand Down Expand Up @@ -518,15 +605,7 @@ impl AuthorityState {
let signed_effects =
effects.to_sign_effects(self.committee.load().epoch, &self.name, &*self.secret);

// Update the database in an atomic manner
self.update_state(temporary_store, &certificate, &signed_effects)
.await?;

Ok(TransactionInfoResponse {
signed_transaction: self.database.get_transaction(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
})
Ok((temporary_store, signed_effects))
}

fn index_tx(
Expand Down Expand Up @@ -636,6 +715,7 @@ impl AuthorityState {
/// Check if we need to submit this transaction to consensus. We usually do, unless (i) we already
/// processed the transaction and we can immediately return the effects, or (ii) we already locked
/// all shared-objects of the transaction and can (re-)attempt execution.
#[instrument(level = "debug", name = "prepare_certificate", skip_all)]
pub async fn try_skip_consensus(
&self,
certificate: CertifiedTransaction,
Expand All @@ -661,8 +741,13 @@ impl AuthorityState {
// already executed all its dependencies and if the locks are correctly attributed to
// the transaction (ie. this transaction is the next to be executed).
debug!("Shared-locks already assigned to {digest:?} - executing now");
let confirmation = ConfirmationTransaction { certificate };
return self.process_certificate(confirmation).await.map(Some);

let tx_guard = self.acquire_tx_guard(digest, &certificate).await?;

return self
.process_certificate(tx_guard, certificate)
.await
.map(Some);
}

// If we didn't already attributed shared locks to this transaction, it needs to go
Expand Down Expand Up @@ -941,6 +1026,13 @@ impl AuthorityState {
metrics: &METRICS,
};

// Process tx recovery log first, so that the batch and checkpoint recovery (below)
// don't observe partially-committed txes.
state
.process_tx_recovery_log(None)
.await
.expect("Could not fully process recovery log at startup!");

state
.init_batches_from_database()
.expect("Init batches failed!");
Expand Down Expand Up @@ -980,6 +1072,43 @@ impl AuthorityState {
state
}

// Continually pop in-progress txes from the WAL and try to drive them to completion.
async fn process_tx_recovery_log(&self, limit: Option<usize>) -> SuiResult {
let mut limit = limit.unwrap_or(usize::max_value());
while limit > 0 {
limit -= 1;
if let Some(tx_guard) = self.database.wal.read_one_recoverable_tx().await {
let digest = tx_guard.tx_id();

let (cert, retry_count) = self.database.wal.get_tx_data(&tx_guard)?;

if retry_count >= MAX_TX_RECOVERY_RETRY {
// This tx will be only partially executed, however the store will be in a safe
// state. We will simply never reach eventual consistency for this TX.
// TODO: Should we revert the tx entirely? I'm not sure the effort is
// warranted, since the only way this can happen is if we are repeatedly
// failing to write to the db, in which case a revert probably won't succeed
// either.
error!(
?digest,
"Abandoning in-progress TX after {} retries.", MAX_TX_RECOVERY_RETRY
);
// prevent the tx from going back into the recovery list again.
tx_guard.release();
continue;
}

if let Err(e) = self.process_certificate(tx_guard, cert).await {
warn!(?digest, "Failed to process in-progress certificate: {}", e);
}
} else {
break;
}
}

Ok(())
}

pub fn checkpoints(&self) -> Option<Arc<Mutex<CheckpointStore>>> {
self.checkpoints.clone()
}
Expand Down Expand Up @@ -1200,8 +1329,8 @@ impl AuthorityState {

/// Update state and signals that a new transactions has been processed
/// to the batch maker service.
#[instrument(name = "db_update_state", level = "debug", skip_all)]
pub(crate) async fn update_state(
#[instrument(name = "commit_certificate", level = "debug", skip_all)]
pub(crate) async fn commit_certificate(
&self,
temporary_store: AuthorityTemporaryStore<AuthorityStore>,
certificate: &CertifiedTransaction,
Expand All @@ -1216,10 +1345,14 @@ impl AuthorityState {
let notifier_ticket = self.batch_notifier.ticket()?;
let seq = notifier_ticket.seq();

let update_type = UpdateType::Transaction(seq, signed_effects.effects.digest());

self.database
.update_state(temporary_store, certificate, signed_effects, update_type)
.update_state(
temporary_store,
certificate,
seq,
signed_effects,
&signed_effects.effects.digest(),
)
.await

// implicitly we drop the ticket here and that notifies the batch manager
Expand Down Expand Up @@ -1292,6 +1425,7 @@ impl ExecutionState for AuthorityState {
type Transaction = ConsensusTransaction;
type Error = SuiError;

#[instrument(name = "handle_consensus_transaction", level = "debug", skip_all)]
async fn handle_consensus_transaction(
&self,
consensus_index: ExecutionIndices,
Expand All @@ -1312,7 +1446,7 @@ impl ExecutionState for AuthorityState {
// If we already executed this transaction, return the signed effects.
let digest = certificate.digest();
if self.database.effects_exists(digest)? {
debug!(tx_digest =? digest, "Shared-object transaction already executed");
debug!(?digest, "Shared-object transaction already executed");
let info = self.make_transaction_info(digest).await?;
return Ok(bincode::serialize(&info).expect("Failed to serialize tx info"));
}
Expand Down
Loading

0 comments on commit 6f33388

Please sign in to comment.