Skip to content

Commit

Permalink
Execute sequenced certs serially in execution driver (MystenLabs#5788)
Browse files Browse the repository at this point in the history
* Execute sequenced certs serially in execution driver

* Just use a bool to indicate whether sequenced

* PR comments

* Prioritize sequenced over unsequenced when there are duplicates
  • Loading branch information
mystenmark authored Nov 3, 2022
1 parent 842fbd9 commit 264bc42
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 32 deletions.
28 changes: 21 additions & 7 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use tracing::{debug, error, instrument, warn};
use typed_store::Map;

pub use authority_store::{
AuthorityStore, GatewayStore, ResolverWrapper, SuiDataStore, UpdateType,
AuthorityStore, GatewayStore, PendingDigest, ResolverWrapper, SuiDataStore, UpdateType,
};
use narwhal_config::{
Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache,
Expand Down Expand Up @@ -1614,6 +1614,10 @@ impl AuthorityState {
.await
}

pub fn add_pending_sequenced_certificate(&self, cert: VerifiedCertificate) -> SuiResult {
self.add_pending_impl(vec![(*cert.digest(), Some(cert))], true)
}

/// Add a number of certificates to the pending transactions as well as the
/// certificates structure if they are not already executed.
/// Certificates are optional, and if not provided, they will be eventually
Expand All @@ -1622,11 +1626,24 @@ impl AuthorityState {
&self,
certs: Vec<(TransactionDigest, Option<VerifiedCertificate>)>,
) -> SuiResult<()> {
self.add_pending_impl(certs, false)
}

fn add_pending_impl(
&self,
certs: Vec<(TransactionDigest, Option<VerifiedCertificate>)>,
is_sequenced: bool,
) -> SuiResult {
self.node_sync_store
.batch_store_certs(certs.iter().filter_map(|(_, cert_opt)| cert_opt.clone()))?;

self.database
.add_pending_digests(certs.iter().map(|(digest, _)| *digest).collect())
self.database.add_pending_digests(
certs
.iter()
.map(|(seq_and_digest, _)| *seq_and_digest)
.collect(),
is_sequenced,
)
}

// Continually pop in-progress txes from the WAL and try to drive them to completion.
Expand Down Expand Up @@ -2314,10 +2331,7 @@ impl AuthorityState {
);

// Schedule the certificate for execution
self.add_pending_certificates(vec![(
*certificate.digest(),
Some(certificate.clone()),
)])?;
self.add_pending_sequenced_certificate(certificate.clone())?;

if certificate.contains_shared_object() {
self.database
Expand Down
13 changes: 8 additions & 5 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub type AuthorityStore = SuiDataStore<AuthoritySignInfo>;
pub type GatewayStore = SuiDataStore<EmptySignInfo>;

pub type InternalSequenceNumber = u64;
pub type PendingDigest = (bool /* is sequenced */, TransactionDigest);

pub struct CertLockGuard(LockGuard);

Expand Down Expand Up @@ -223,7 +224,11 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
/// index. If two instanced run concurrently, the indexes are guaranteed to not overlap
/// although some certificates may be included twice in the `pending_execution`, and
/// the same certificate may be written twice (but that is OK since it is valid.)
pub fn add_pending_digests(&self, digests: Vec<TransactionDigest>) -> SuiResult<()> {
pub fn add_pending_digests(
&self,
digests: Vec<TransactionDigest>,
is_sequenced: bool,
) -> SuiResult<()> {
let first_index = self
.next_pending_seq
.fetch_add(digests.len() as u64, Ordering::Relaxed);
Expand All @@ -234,7 +239,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
digests
.iter()
.enumerate()
.map(|(num, digest)| ((num as u64) + first_index, digest)),
.map(|(num, digest)| ((num as u64) + first_index, (is_sequenced, *digest))),
)?;
batch.write()?;

Expand All @@ -245,9 +250,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}

/// Get all stored certificate digests
pub fn get_pending_digests(
&self,
) -> SuiResult<Vec<(InternalSequenceNumber, TransactionDigest)>> {
pub fn get_pending_digests(&self) -> SuiResult<Vec<(InternalSequenceNumber, PendingDigest)>> {
Ok(self.epoch_tables().pending_execution.iter().collect())
}

Expand Down
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
authority_store::{InternalSequenceNumber, ObjectKey},
authority_store::{InternalSequenceNumber, ObjectKey, PendingDigest},
*,
};
use narwhal_executor::ExecutionIndices;
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct AuthorityEpochTables<S> {
/// reads this table and executes the certificates. The order is a hint as to their
/// causal dependencies. Note that there is no guarantee digests are unique. Once executed, and
/// effects are written the entry should be deleted.
pub(crate) pending_execution: DBMap<InternalSequenceNumber, TransactionDigest>,
pub(crate) pending_execution: DBMap<InternalSequenceNumber, PendingDigest>,

/// Hold the lock for shared objects. These locks are written by a single task: upon receiving a valid
/// certified transaction from consensus, the authority assigns a lock to each shared objects of the
Expand Down
133 changes: 115 additions & 18 deletions crates/sui-core/src/authority_active/execution_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

use std::{collections::HashSet, sync::Arc};
use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate};
use tracing::{debug, info};
use tracing::{debug, error, info};

use crate::authority::AuthorityState;
use crate::authority::{AuthorityState, PendingDigest};
use crate::authority_client::AuthorityAPI;

use futures::{stream, StreamExt};
Expand Down Expand Up @@ -89,21 +89,33 @@ where
}
}

/// Reads all pending transactions as a block and executes them.
/// Returns whether all pending transactions succeeded.
async fn execute_pending<A>(active_authority: Arc<ActiveAuthority<A>>) -> SuiResult<bool>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_digests()?;
type PendingVec = Vec<(u64, PendingDigest)>;

fn sort_and_partition_pending_certs(
mut pending_transactions: PendingVec,
) -> (
PendingVec, // sequenced
PendingVec, // unsequenced
Vec<u64>, // duplicated indices, to be deleted
) {
// sort sequenced digests before unsequenced so that the deduplication below favors
// sequenced digests.
pending_transactions.sort_by(|(idx_a, (is_seq_a, _)), (idx_b, (is_seq_b, _))| {
match is_seq_b.cmp(is_seq_a) {
// when both are sequenced or unsequenced, sort by idx.
std::cmp::Ordering::Equal => idx_a.cmp(idx_b),
// otherwise sort sequenced before unsequenced
res => res,
}
});

// Before executing de-duplicate the list of pending trasnactions
let mut seen = HashSet::new();
let mut indexes_to_delete = Vec::new();
let pending_transactions: Vec<_> = pending_transactions

let (pending_sequenced, pending_transactions): (Vec<_>, Vec<_>) = pending_transactions
.into_iter()
.filter(|(idx, digest)| {
.filter(|(idx, (_, digest))| {
if seen.contains(digest) {
indexes_to_delete.push(*idx);
false
Expand All @@ -112,7 +124,63 @@ where
true
}
})
.collect();
.partition(|(_, (is_sequenced, _))| *is_sequenced);

debug!(
num_sequenced = ?pending_sequenced.len(),
num_unsequenced = ?pending_transactions.len()
);

(pending_sequenced, pending_transactions, indexes_to_delete)
}

#[test]
fn test_sort_and_partition_pending_certs() {
let tx1 = TransactionDigest::random();
let tx2 = TransactionDigest::random();
let tx3 = TransactionDigest::random();
let tx4 = TransactionDigest::random();

// partitioning works correctly.
assert_eq!(
sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx2))]),
(vec![(1, (true, tx2))], vec![(0, (false, tx1))], vec![],)
);

// if certs are duplicated, but some are sequenced, the sequenced certs take priority.
assert_eq!(
sort_and_partition_pending_certs(vec![(0, (false, tx1)), (1, (true, tx1))]),
(vec![(1, (true, tx1))], vec![], vec![0],)
);

// sorting works correctly for both sequenced and unsequenced.
assert_eq!(
sort_and_partition_pending_certs(vec![
(2, (false, tx3)),
(0, (false, tx2)),
(4, (true, tx4)),
(1, (true, tx1))
]),
(
vec![(1, (true, tx1)), (4, (true, tx4))],
vec![(0, (false, tx2)), (2, (false, tx3))],
vec![],
)
);
}

/// Reads all pending transactions as a block and executes them.
/// Returns whether all pending transactions succeeded.
async fn execute_pending<A>(active_authority: Arc<ActiveAuthority<A>>) -> SuiResult<bool>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Get the pending transactions
let pending_transactions = active_authority.state.database.get_pending_digests()?;

let (pending_sequenced, pending_transactions, indexes_to_delete) =
sort_and_partition_pending_certs(pending_transactions);

active_authority
.state
.database
Expand All @@ -121,22 +189,51 @@ where
// Send them for execution
let epoch = active_authority.state.committee.load().epoch;
let sync_handle = active_authority.clone().node_sync_handle();

// Execute certs that have a sequencing index associated with them serially.
for (seq, (_, digest)) in pending_sequenced.iter() {
let mut result_stream = sync_handle
.handle_execution_request(epoch, std::iter::once(*digest))
.await?;

match result_stream.next().await.unwrap() {
Ok(_) => {
debug!(?seq, ?digest, "serial certificate execution complete");
active_authority
.state
.database
.remove_pending_digests(vec![*seq])
.tap_err(|err| {
error!(?seq, ?digest, "pending digest deletion failed: {}", err)
})?;
}
Err(err) => {
info!(
?seq,
?digest,
"serial certificate execution failed: {}",
err
);
}
}
}

let executed: Vec<_> = sync_handle
// map to extract digest
.handle_execution_request(
epoch,
pending_transactions.iter().map(|(_, digest)| *digest),
pending_transactions.iter().map(|(_, (_, digest))| *digest),
)
.await?
// zip results back together with seq
.zip(stream::iter(pending_transactions.iter()))
// filter out errors
.filter_map(|(result, (seq, digest))| async move {
.filter_map(|(result, (idx, digest))| async move {
result
.tap_err(|e| info!(?seq, ?digest, "certificate execution failed: {}", e))
.tap_ok(|_| debug!(?seq, ?digest, "certificate execution complete"))
.tap_err(|e| info!(?idx, ?digest, "certificate execution failed: {}", e))
.tap_ok(|_| debug!(?idx, ?digest, "certificate execution complete"))
.ok()
.map(|_| seq)
.map(|_| idx)
})
.collect()
.await;
Expand Down

0 comments on commit 264bc42

Please sign in to comment.