Skip to content

Commit

Permalink
[core] Core facilities to register certs pending execution and execut…
Browse files Browse the repository at this point in the history
…e them (MystenLabs#2512)

* Added structures to remember pending certificates to be executed
* Added notification mechanism for pending transactions
* Added tests for notify store and active module
* Connected fragment certs to pending execution logic

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Jun 19, 2022
1 parent 8037786 commit 6ec1acd
Show file tree
Hide file tree
Showing 9 changed files with 496 additions and 26 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ impl ExecutionState for AuthorityState {
if let Some(checkpoint) = &self.checkpoints {
checkpoint
.lock()
.handle_internal_fragment(seq, *fragment, &self.committee.load())
.handle_internal_fragment(seq, *fragment, &self.committee.load(), self)
.map_err(|e| SuiError::from(&e.to_string()[..]))?;

// NOTE: The method `handle_internal_fragment` is idempotent, so we don't need
Expand Down
100 changes: 97 additions & 3 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use sui_storage::{
write_ahead_log::DBWriteAheadLog,
LockService,
};
use tokio::sync::Notify;

use std::sync::atomic::AtomicU64;
use sui_types::base_types::SequenceNumber;
use sui_types::batch::{SignedBatch, TxSequenceNumber};
use sui_types::committee::EpochId;
Expand All @@ -27,6 +30,8 @@ use typed_store::{reopen, traits::Map};
pub type AuthorityStore = SuiDataStore<false, AuthoritySignInfo>;
pub type GatewayStore = SuiDataStore<false, EmptySignInfo>;

pub type InternalSequenceNumber = u64;

const NUM_SHARDS: usize = 4096;

/// The key where the latest consensus index is stored in the database.
Expand Down Expand Up @@ -78,7 +83,20 @@ pub struct SuiDataStore<const ALL_OBJ_VER: bool, S> {
/// certificates that have been successfully processed by this authority. This set of certificates
/// along with the genesis allows the reconstruction of all other state, and a full sync to this
/// authority.
certificates: DBMap<TransactionDigest, CertifiedTransaction>,
pub(crate) certificates: DBMap<TransactionDigest, CertifiedTransaction>,

/// The pending execution table holds a sequence of transactions that are present
/// in the certificates table, but may not have yet been executed, and should be executed.
/// The source of these certificates might be (1) the checkpoint proposal process (2) the
/// gossip processes (3) the shared object post-consensus task. An active authority process
/// reads this table and executes the certificates. The order is a hint as to their
/// causal dependencies. Note that there is no guarantee digests are unique. Once executed, and
/// effects are written the entry should be deleted.
pending_execution: DBMap<InternalSequenceNumber, TransactionDigest>,
// The next sequence number.
next_pending_seq: AtomicU64,
// A notifier for new pending certificates
pending_notifier: Arc<Notify>,

/// The map between the object ref of objects processed at all versions and the transaction
/// digest of the certificate that lead to the creation of this version of the object.
Expand Down Expand Up @@ -133,6 +151,7 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
("transactions", &point_lookup),
("owner_index", &options),
("certificates", &point_lookup),
("pending_execution", &options),
("parent_sync", &options),
("effects", &point_lookup),
("sequenced", &options),
Expand All @@ -155,6 +174,7 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
owner_index,
transactions,
certificates,
pending_execution,
parent_sync,
effects,
sequenced,
Expand All @@ -169,6 +189,7 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
"owner_index";<(Owner, ObjectID), ObjectInfo>,
"transactions";<TransactionDigest, TransactionEnvelope<S>>,
"certificates";<TransactionDigest, CertifiedTransaction>,
"pending_execution";<InternalSequenceNumber, TransactionDigest>,
"parent_sync";<ObjectRef, TransactionDigest>,
"effects";<TransactionDigest, TransactionEffectsEnvelope<S>>,
"sequenced";<(TransactionDigest, ObjectID), SequenceNumber>,
Expand All @@ -187,6 +208,15 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
let wal_path = path.as_ref().join("recovery_log");
let wal = Arc::new(DBWriteAheadLog::new(wal_path));

// Get the last sequence item
let pending_seq = pending_execution
.iter()
.skip_to_last()
.next()
.map(|(seq, _)| seq + 1)
.unwrap_or(0);
let next_pending_seq = AtomicU64::new(pending_seq);

Self {
wal,
objects,
Expand All @@ -196,6 +226,9 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
owner_index,
transactions,
certificates,
pending_execution,
next_pending_seq,
pending_notifier: Arc::new(Notify::new()),
parent_sync,
effects,
sequenced,
Expand All @@ -207,6 +240,11 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
}
}

/// Await a new pending certificate to be added
pub async fn wait_for_new_pending(&self) {
self.pending_notifier.notified().await
}

/// Returns the TransactionEffects if we have an effects structure for this transaction digest
pub fn get_effects(
&self,
Expand Down Expand Up @@ -259,6 +297,57 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
self.executed_sequence.insert(&seq, digest).unwrap();
}

/// Add a number of certificates to the pending transactions as well as the
/// certificates structure if they are not already executed.
///
/// This function may be run concurrently: it increases atomically an internal index
/// by the number of certificates passed, and then records the certificates and their
/// index. If two instanced run concurrently, the indexes are guaranteed to not overlap
/// although some certificates may be included twice in the `pending_execution`, and
/// the same certificate may be written twice (but that is OK since it is valid.)
pub fn add_pending_certificates(
&self,
certs: Vec<(TransactionDigest, CertifiedTransaction)>,
) -> SuiResult<()> {
let first_index = self
.next_pending_seq
.fetch_add(certs.len() as u64, std::sync::atomic::Ordering::Relaxed);

let batch = self.pending_execution.batch();
let batch = batch.insert_batch(
&self.pending_execution,
certs
.iter()
.enumerate()
.map(|(num, (digest, _))| ((num as u64) + first_index, digest)),
)?;
let batch = batch.insert_batch(
&self.certificates,
certs.iter().map(|(digest, cert)| (digest, cert)),
)?;
batch.write()?;

// now notify there is a pending certificate
self.pending_notifier.notify_one();

Ok(())
}

/// Get all stored certificate digests
pub fn get_pending_certificates(
&self,
) -> SuiResult<Vec<(InternalSequenceNumber, TransactionDigest)>> {
Ok(self.pending_execution.iter().collect())
}

/// Remove entries from pending certificates
pub fn remove_pending_certificates(&self, seqs: Vec<InternalSequenceNumber>) -> SuiResult<()> {
let batch = self.pending_execution.batch();
let batch = batch.delete_batch(&self.pending_execution, seqs.iter())?;
batch.write()?;
Ok(())
}

/// A function that acquires all locks associated with the objects (in order to avoid deadlocks).
async fn acquire_locks<'a, 'b>(&'a self, input_objects: &'b [ObjectRef]) -> Vec<LockGuard<'a>> {
self.mutex_table
Expand Down Expand Up @@ -939,7 +1028,7 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
) -> Result<(), SuiError> {
// Make an iterator to save the certificate.
let transaction_digest = *certificate.digest();
let certificate_to_write = std::iter::once((transaction_digest, &certificate));
// let certificate_to_write = std::iter::once((transaction_digest, &certificate));

// Make an iterator to update the locks of the transaction's shared objects.
let ids = certificate.shared_input_objects();
Expand Down Expand Up @@ -969,9 +1058,14 @@ impl<const ALL_OBJ_VER: bool, S: Eq + Serialize + for<'de> Deserialize<'de>>
// Make an iterator to update the last consensus index.
let index_to_write = std::iter::once((LAST_CONSENSUS_INDEX_ADDR, consensus_index));

// Schedule the certificate for execution
self.add_pending_certificates(vec![(transaction_digest, certificate.clone())])?;
// Note: if we crash here we are not in an inconsistent state since
// it is ok to just update the pending list without updating the sequence.

// Atomically store all elements.
let mut write_batch = self.sequenced.batch();
write_batch = write_batch.insert_batch(&self.certificates, certificate_to_write)?;
// Note: we have already written the certificates as part of the add_pending_certificates above.
write_batch = write_batch.insert_batch(&self.sequenced, sequenced_to_write)?;
write_batch = write_batch.insert_batch(&self.schedule, schedule_to_write)?;
write_batch = write_batch.insert_batch(&self.last_consensus_index, index_to_write)?;
Expand Down
14 changes: 13 additions & 1 deletion crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ use gossip::{gossip_process, node_sync_process};
pub mod checkpoint_driver;
use checkpoint_driver::checkpoint_process;

use self::checkpoint_driver::CheckpointProcessControl;
pub mod execution_driver;

use self::{checkpoint_driver::CheckpointProcessControl, execution_driver::execution_process};

// TODO: Make these into a proper config
const MAX_RETRIES_RECORDED: u32 = 10;
Expand Down Expand Up @@ -259,4 +261,14 @@ where
node_sync_process(&active, target_num_tasks, node_sync_store).await;
})
}

/// Spawn gossip process
pub async fn spawn_execute_process(self) -> JoinHandle<()> {
let active = Arc::new(self);

let locals = active;
tokio::task::spawn(async move {
execution_process(&locals).await;
})
}
}
131 changes: 131 additions & 0 deletions crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::CertifiedTransaction};
use tracing::debug;
use typed_store::Map;

use crate::{authority::AuthorityState, authority_client::AuthorityAPI};

use super::{gossip::LocalConfirmationTransactionHandler, ActiveAuthority};

#[cfg(test)]
pub(crate) mod tests;

pub trait PendCertificateForExecution {
fn pending_execution(
&self,
certs: Vec<(TransactionDigest, CertifiedTransaction)>,
) -> SuiResult<()>;
}

impl PendCertificateForExecution for AuthorityState {
fn pending_execution(
&self,
certs: Vec<(TransactionDigest, CertifiedTransaction)>,
) -> SuiResult<()> {
self.database.add_pending_certificates(certs)
}
}

/// A no-op PendCertificateForExecution that we use for testing, when
/// we do not care about certificates actually being executed.
pub struct PendCertificateForExecutionNoop;
impl PendCertificateForExecution for PendCertificateForExecutionNoop {
fn pending_execution(
&self,
_certs: Vec<(TransactionDigest, CertifiedTransaction)>,
) -> SuiResult<()> {
Ok(())
}
}

/// When a notification that a new pending transaction is received we activate
/// processing the transaction in a loop.
pub async fn execution_process<A>(active_authority: &ActiveAuthority<A>)
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
debug!("Start pending certificates execution.");

// Loop whenever there is a signal that a new transactions is ready to process.
loop {
// NOTE: nothing terrible happens if we fire more often than there are
// transactions awaiting execution, or less often than once per transactions.
// However, we need to be sure that if there is an awaiting trasnactions we
// will eventually fire the notification and wake up here.
active_authority.state.database.wait_for_new_pending().await;

debug!("Pending certificate execution activated.");

if let Err(err) = execute_pending(active_authority).await {
tracing::error!("Error in pending execution subsystem: {err}");
// The above should not return an error if the DB works, and we are connected to
// the network. However if it does, we should backoff a little.
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
}

/// Reads all pending transactions as a block and executes them.
async fn execute_pending<A>(active_authority: &ActiveAuthority<A>) -> SuiResult<()>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let _committee = active_authority.state.committee.load().clone();
let net = active_authority.net.load().clone();

// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_certificates()?;

// Get all the actual certificates mapping to these pending transactions
let certs = active_authority
.state
.database
.certificates
.multi_get(pending_transactions.iter().map(|(_, d)| *d))?;

// Zip seq, digest with certs. Note the cert must exist in the DB
let cert_seq: Vec<_> = pending_transactions
.iter()
.zip(certs.iter())
.map(|((i, d), c)| (i, d, c.as_ref().expect("certificate must exist")))
.collect();

let local_handler = LocalConfirmationTransactionHandler {
state: active_authority.state.clone(),
};

// TODO: implement properly efficient execution for the block of transactions.
let mut executed = vec![];
for (i, d, c) in cert_seq {
// Only execute if not already executed.
if active_authority.state.database.effects_exists(d)? {
executed.push(*i);
continue;
}

debug!(digest=?d, "Pending execution for certificate.");

// Sync and Execute with local authority state
net.sync_certificate_to_authority_with_timeout_inner(
sui_types::messages::ConfirmationTransaction::new(c.clone()),
active_authority.state.name,
&local_handler,
tokio::time::Duration::from_secs(10),
10,
)
.await?;

// Remove from the execution list
executed.push(*i);
}

// Now update the pending store.
active_authority
.state
.database
.remove_pending_certificates(executed)?;

Ok(())
}
Loading

0 comments on commit 6ec1acd

Please sign in to comment.