Skip to content

Commit

Permalink
Reject transaction if any input object is overloaded (MystenLabs#7438)
Browse files Browse the repository at this point in the history
It is possible for users to create many transactions with the same
shared input object, in a rate that is faster than the rate Sui can
execute them serially. This PR adds a basic rate limiting mechanism:
reject signing or submitting new transactions, if one of its input
object ID has > 1000 pending transactions. This scheme is not
bulletproof, but should help the case when users are unintentionally
generating too much load.
  • Loading branch information
mwtian authored Jan 18, 2023
1 parent 230eaa6 commit 87e1f6a
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 37 deletions.
46 changes: 45 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use move_core_types::parser::parse_struct_tag;
use move_core_types::{language_storage::ModuleId, resolver::ModuleResolver};
use move_vm_runtime::{move_vm::MoveVM, native_functions::NativeFunctionTable};
use mysten_metrics::spawn_monitored_task;
use parking_lot::Mutex;
use prometheus::{
register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_counter_with_registry, register_int_gauge_with_registry, Histogram, IntCounter,
Expand All @@ -28,6 +29,7 @@ use sui_config::node::AuthorityStorePruningConfig;
use sui_protocol_constants::MAX_TX_GAS;
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, error, info, instrument, trace, warn, Instrument};
use typed_store::Map;
Expand Down Expand Up @@ -123,6 +125,11 @@ pub(crate) mod authority_notify_read;
pub(crate) mod authority_store;

pub(crate) const MAX_TX_RECOVERY_RETRY: u32 = 3;

// Reject a transaction if the number of pending transactions depending on the object
// is above the threshold.
pub(crate) const MAX_PER_OBJECT_EXECUTION_QUEUE_LENGTH: usize = 1000;

type CertTxGuard<'a> =
DBTxGuard<'a, TrustedCertificate, (InnerTemporaryStore, TrustedSignedTransactionEffects)>;

Expand Down Expand Up @@ -421,6 +428,10 @@ pub struct AuthorityState {
/// Manages pending certificates and their missing input objects.
transaction_manager: Arc<TransactionManager>,

/// Shuts down the execution task. Used only in testing.
#[allow(unused)]
tx_execution_shutdown: Mutex<Option<oneshot::Sender<()>>>,

pub metrics: Arc<AuthorityMetrics>,
}

Expand Down Expand Up @@ -467,6 +478,23 @@ impl AuthorityState {
)
.await?;

for (object_id, queue_len) in self.transaction_manager.objects_queue_len(
input_objects
.mutable_inputs()
.into_iter()
.map(|r| r.0)
.collect(),
) {
// When this occurs, most likely transactions piled up on a shared object.
if queue_len >= MAX_PER_OBJECT_EXECUTION_QUEUE_LENGTH {
return Err(SuiError::TooManyTransactionsPendingOnObject {
object_id,
queue_len,
threshold: MAX_PER_OBJECT_EXECUTION_QUEUE_LENGTH,
});
}
}

let owned_objects = input_objects.filter_owned_objects();

let signed_transaction = VerifiedSignedTransaction::new(
Expand Down Expand Up @@ -1584,6 +1612,7 @@ impl AuthorityState {
tx_ready_certificates,
metrics.clone(),
));
let (tx_execution_shutdown, rx_execution_shutdown) = oneshot::channel();

let state = Arc::new(AuthorityState {
name,
Expand All @@ -1601,6 +1630,7 @@ impl AuthorityState {
checkpoint_store,
committee_store,
transaction_manager,
tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
metrics,
});

Expand All @@ -1617,7 +1647,11 @@ impl AuthorityState {

// Start a task to execute ready certificates.
let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(execution_process(authority_state, rx_ready_certificates));
spawn_monitored_task!(execution_process(
authority_state,
rx_ready_certificates,
rx_execution_shutdown
));

state
.create_owner_index_if_empty()
Expand Down Expand Up @@ -2594,4 +2628,14 @@ impl AuthorityState {
let previous_store = self.epoch_store.swap(epoch_tables);
previous_store.epoch_terminated().await;
}

#[cfg(test)]
pub(crate) fn shutdown_execution_for_test(&self) {
self.tx_execution_shutdown
.lock()
.take()
.unwrap()
.send(())
.unwrap();
}
}
23 changes: 22 additions & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::task::JoinHandle;
use tracing::{info, Instrument};

use crate::{
authority::AuthorityState,
authority::{AuthorityState, MAX_PER_OBJECT_EXECUTION_QUEUE_LENGTH},
consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics},
};

Expand Down Expand Up @@ -328,6 +328,27 @@ impl ValidatorService {
"Cannot execute system certificate via RPC interface! {certificate:?}"
)));
}
for (object_id, queue_len) in state.transaction_manager().objects_queue_len(
certificate
.data()
.intent_message
.value
.kind
.input_objects()?
.into_iter()
.map(|r| r.object_id())
.collect(),
) {
// When this occurs, most likely transactions piled up on a shared object.
if queue_len >= MAX_PER_OBJECT_EXECUTION_QUEUE_LENGTH {
return Err(SuiError::TooManyTransactionsPendingOnObject {
object_id,
queue_len,
threshold: MAX_PER_OBJECT_EXECUTION_QUEUE_LENGTH,
}
.into());
}
}
// code block within reconfiguration lock
let certificate = {
let reconfiguration_lock = epoch_store.get_reconfig_state_read_lock_guard();
Expand Down
27 changes: 19 additions & 8 deletions crates/sui-core/src/execution_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use mysten_metrics::spawn_monitored_task;
use sui_types::messages::VerifiedCertificate;
use tokio::{
sync::{mpsc::UnboundedReceiver, Semaphore},
sync::{mpsc::UnboundedReceiver, oneshot, Semaphore},
time::sleep,
};
use tracing::{debug, error, info};
Expand All @@ -30,6 +30,7 @@ const EXECUTION_FAILURE_RETRY_INTERVAL: Duration = Duration::from_secs(1);
pub async fn execution_process(
authority_state: Weak<AuthorityState>,
mut rx_ready_certificates: UnboundedReceiver<VerifiedCertificate>,
mut rx_execution_shutdown: oneshot::Receiver<()>,
) {
info!("Starting pending certificates execution process.");

Expand All @@ -38,14 +39,24 @@ pub async fn execution_process(

// Loop whenever there is a signal that a new transactions is ready to process.
loop {
let certificate = if let Some(cert) = rx_ready_certificates.recv().await {
cert
} else {
// Should only happen after the AuthorityState has shut down and tx_ready_certificate
// has been dropped by TransactionManager.
info!("No more certificate will be received. Exiting ...");
return;
let certificate;
tokio::select! {
result = rx_ready_certificates.recv() => {
if let Some(cert) = result {
certificate = cert;
} else {
// Should only happen after the AuthorityState has shut down and tx_ready_certificate
// has been dropped by TransactionManager.
info!("No more certificate will be received. Exiting executor ...");
return;
};
}
_ = &mut rx_execution_shutdown => {
info!("Shutdown signal received. Exiting executor ...");
return;
}
};

let authority = if let Some(authority) = authority_state.upgrade() {
authority
} else {
Expand Down
78 changes: 53 additions & 25 deletions crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::{BTreeMap, BTreeSet},
collections::{BTreeSet, HashMap, HashSet},
sync::Arc,
};

use parking_lot::Mutex;
use sui_types::storage::ObjectKey;
use parking_lot::RwLock;
use sui_types::{base_types::ObjectID, storage::ObjectKey};
use sui_types::{base_types::TransactionDigest, error::SuiResult, messages::VerifiedCertificate};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error};
Expand All @@ -25,7 +25,7 @@ pub struct TransactionManager {
authority_store: Arc<AuthorityStore>,
tx_ready_certificates: UnboundedSender<VerifiedCertificate>,
metrics: Arc<AuthorityMetrics>,
inner: Mutex<Inner>,
inner: RwLock<Inner>,
}

#[derive(Default)]
Expand All @@ -34,15 +34,19 @@ struct Inner {
// Note that except for immutable objects, a given key may only have one TransactionDigest in
// the set. Unfortunately we cannot easily verify that this invariant is upheld, because you
// cannot determine from TransactionData whether an input is mutable or immutable.
missing_inputs: BTreeMap<ObjectKey, BTreeSet<TransactionDigest>>,
missing_inputs: HashMap<ObjectKey, BTreeSet<TransactionDigest>>,

// Number of transactions that depend on each object ID.
// Used for throttling signing of hot objects.
input_objects: HashMap<ObjectID, usize>,

// A transaction enqueued to TransactionManager must be in either pending_certificates or
// executing_certificates.

// Maps transactions to their missing input objects.
pending_certificates: BTreeMap<TransactionDigest, BTreeSet<ObjectKey>>,
pending_certificates: HashMap<TransactionDigest, BTreeSet<ObjectKey>>,
// Transactions that have all input objects available, but have not finished execution.
executing_certificates: BTreeSet<TransactionDigest>,
executing_certificates: HashSet<TransactionDigest>,
}

impl TransactionManager {
Expand Down Expand Up @@ -80,7 +84,7 @@ impl TransactionManager {
certs: Vec<VerifiedCertificate>,
epoch_store: &AuthorityPerEpochStore,
) -> SuiResult<()> {
let inner = &mut self.inner.lock();
let inner = &mut self.inner.write();
for cert in certs {
let digest = *cert.digest();
// hold the tx lock until we have finished checking if objects are missing, so that we
Expand Down Expand Up @@ -156,6 +160,8 @@ impl TransactionManager {
digest,
objkey
);
let input_count = inner.input_objects.entry(objkey.0).or_default();
*input_count += 1;
}

assert!(
Expand Down Expand Up @@ -191,25 +197,34 @@ impl TransactionManager {
let mut ready_digests = Vec::new();

{
let inner = &mut self.inner.lock();
let inner = &mut self.inner.write();
for object_key in object_keys {
let Some(digests) = inner.missing_inputs.remove(&object_key) else {
if let Some(digests) = inner.missing_inputs.remove(&object_key) {
// Clean up object ID count table.
let input_count = inner.input_objects.get_mut(&object_key.0).unwrap();
*input_count -= digests.len();
if *input_count == 0 {
inner.input_objects.remove(&object_key.0);
}
// Clean up pending certificates table.
for digest in digests.iter() {
// Pending certificate must exist.
let set = inner.pending_certificates.get_mut(digest).unwrap();
assert!(set.remove(&object_key));
// When a certificate has no missing input, it is ready to execute.
if set.is_empty() {
debug!(tx_digest = ?digest, "certificate ready");
inner.pending_certificates.remove(digest).unwrap();
assert!(inner.executing_certificates.insert(*digest));
ready_digests.push(*digest);
} else {
debug!(tx_digest = ?digest, missing = ?set, "Certificate waiting on missing inputs");
}
}
} else {
// No pending transaction is using this object ref as input.
continue;
};

for digest in digests.iter() {
let set = inner.pending_certificates.entry(*digest).or_default();
set.remove(&object_key);
// This certificate has no missing input. It is ready to execute.
if set.is_empty() {
debug!(tx_digest = ?digest, "certificate ready");
inner.pending_certificates.remove(digest);
assert!(inner.executing_certificates.insert(*digest));
ready_digests.push(*digest);
} else {
debug!(tx_digest = ?digest, missing = ?set, "certificate waiting on missing");
}
}
}

self.metrics
Expand Down Expand Up @@ -253,7 +268,7 @@ impl TransactionManager {
epoch_store: &AuthorityPerEpochStore,
) {
{
let inner = &mut self.inner.lock();
let inner = &mut self.inner.write();
inner.executing_certificates.remove(digest);
self.metrics
.transaction_manager_num_executing_certificates
Expand All @@ -267,4 +282,17 @@ impl TransactionManager {
self.metrics.transaction_manager_num_ready.inc();
let _ = self.tx_ready_certificates.send(certificate);
}

// Returns the number of transactions waiting on each object ID.
pub(crate) fn objects_queue_len(&self, keys: Vec<ObjectID>) -> Vec<(ObjectID, usize)> {
let inner = self.inner.read();
keys.into_iter()
.map(|key| {
(
key,
inner.input_objects.get(&key).cloned().unwrap_or_default(),
)
})
.collect()
}
}
4 changes: 4 additions & 0 deletions crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3575,6 +3575,8 @@ pub(crate) async fn send_consensus(authority: &AuthorityState, cert: &VerifiedCe
)
.await
.unwrap();
} else {
warn!("Failed to verify certificate: {:?}", cert);
}
}

Expand Down Expand Up @@ -3602,6 +3604,8 @@ pub(crate) async fn send_consensus_no_execution(
)
.await
.unwrap();
} else {
warn!("Failed to verify certificate: {:?}", cert);
}
}

Expand Down
Loading

0 comments on commit 87e1f6a

Please sign in to comment.