Skip to content

Commit

Permalink
[Block-STM] Scheduler uniform wave treatment & test for number of tas…
Browse files Browse the repository at this point in the history
…ks (aptos-labs#7034)

* comments

* test for task numbers, scheduler fixes

---------

Co-authored-by: Zekun Li <[email protected]>
  • Loading branch information
gelash and zekun000 authored Mar 14, 2023
1 parent 72583ae commit a6a4c74
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 127 deletions.
6 changes: 4 additions & 2 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,10 @@ where
// Only one thread try_commit to avoid contention.
if committing {
// Keep committing txns until there is no more that can be committed now.
loop {
if scheduler.try_commit().is_none() {
while let Some(txn_idx) = scheduler.try_commit() {
if txn_idx + 1 == block.len() {
// Committed the last transaction / everything.
scheduler_task = SchedulerTask::Done;
break;
}
}
Expand Down
133 changes: 80 additions & 53 deletions aptos-move/block-executor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,21 @@ impl PartialEq for ExecutionStatus {
///
/////////////////////////////// Algorithm Description for Updating Waves ///////////////////////////////
/// In the following, 'update' means taking the maximum.
/// (1) Upon decreasing validation_idx, increment validation_idx.wave and update txn's max_triggered_wave <- validation_idx.wave;
/// (2) Upon finishing execution of txn that is below validation_idx, if this execution does not write new places,
/// then only this txn needs validation but not all later txns, update txn's required_wave <- validation_idx.wave;
/// (1) Upon decreasing validation_idx, increment validation_idx.wave and update txn's
/// max_triggered_wave <- validation_idx.wave;
/// (2) Upon finishing execution of txn that is below validation_idx, update txn's
/// required_wave <- validation_idx.wave; (otherwise, the last triggered wave is below and will validate).
/// (3) Upon validating a txn successfully, update txn's maybe_max_validated_wave <- validation_idx.wave;
/// (4) Upon trying to commit an executed txn, update commit_state.wave <- txn's max_triggered_wave.
/// (5) If txn's maybe_max_validated_wave >= max(commit_state.wave, txn's required_wave), can commit the txn.
///
/// Remark: commit_state.wave is updated only with max_triggered_wave but not required_wave. The reason is that we rely on
/// the first txn's max_triggered_wave being incremented during a new wave (due to decreasing validation_idx).
/// Then, since commit_state.wave is updated with the first txn's max_triggered_wave, all later txns also
/// need to have a maybe_max_validated_wave in order to be committed, which indicates they have the up-to-date validation
/// wave. Similarly, for required_wave which is incremented only when the single txn needs validation,
/// the commit_state.wave is not updated since later txns do not need new wave of validation.
/// Remark: commit_state.wave is updated only with max_triggered_wave but not required_wave. This is
/// because max_triggered_wave implies that this wave of validations was required for all higher transactions
/// (and is set as a part of decrease_validation_idx), while required_wave is set for the transaction only
/// (when a validation task is returned to the caller). Moreover, the code is structured in a way that
/// decrease_validation_idx is always called for txn_idx + 1 (e.g. when aborting, there is no need to validate
/// the transaction before re-execution, and in finish_execution, even if there is a need to validate txn_idx,
/// it is returned to the caller directly, which is done so as an optimization and also for uniformity).
#[derive(Debug)]
struct ValidationStatus {
max_triggered_wave: Wave,
Expand Down Expand Up @@ -203,6 +204,9 @@ pub struct Scheduler {
/// Public Interfaces for the Scheduler
impl Scheduler {
pub fn new(num_txns: usize) -> Self {
// Empty block should early return and not create a scheduler.
assert!(num_txns > 0, "No scheduler needed for 0 transactions");

Self {
num_txns,
execution_idx: AtomicUsize::new(0),
Expand All @@ -225,17 +229,12 @@ impl Scheduler {

/// If successful, returns Some(TxnIndex), the index of committed transaction.
/// The current implementation has one dedicated thread to try_commit.
/// Should not be called after the last transaction is committed.
pub fn try_commit(&self) -> Option<TxnIndex> {
let mut commit_state_mutex = self.commit_state.lock();
let commit_state = commit_state_mutex.deref_mut();
let (commit_idx, commit_wave) = (&mut commit_state.0, &mut commit_state.1);

if *commit_idx == self.num_txns {
// All txns have been committed, the parallel execution can finish.
self.done_marker.store(true, Ordering::SeqCst);
return None;
}

if let Some(validation_status) = self.txn_status[*commit_idx].1.try_read() {
// Acquired the validation status read lock.
if let Some(status) = self.txn_status[*commit_idx].0.try_upgradable_read() {
Expand All @@ -254,7 +253,12 @@ impl Scheduler {
// Upgrade the execution status read lock to write lock.
// Can commit.
*status_write = ExecutionStatus::Committed(incarnation);

*commit_idx += 1;
if *commit_idx == self.num_txns {
// All txns have been committed, the parallel execution can finish.
self.done_marker.store(true, Ordering::SeqCst);
}
return Some(*commit_idx - 1);
}
}
Expand Down Expand Up @@ -357,25 +361,23 @@ impl Scheduler {

let mut stored_deps = self.txn_dependency[dep_txn_idx].lock();

{
if self.is_executed(dep_txn_idx, true).is_some() {
// Current status of dep_txn_idx is 'executed', so the dependency got resolved.
// To avoid zombie dependency (and losing liveness), must return here and
// not add a (stale) dependency.
if self.is_executed(dep_txn_idx, true).is_some() {
// Current status of dep_txn_idx is 'executed', so the dependency got resolved.
// To avoid zombie dependency (and losing liveness), must return here and
// not add a (stale) dependency.

// Note: acquires (a different, status) mutex, while holding (dependency) mutex.
// Only place in scheduler where a thread may hold >1 mutexes, hence, such
// acquisitions always happens in the same order (this function), may not deadlock.
// Note: acquires (a different, status) mutex, while holding (dependency) mutex.
// Only place in scheduler where a thread may hold >1 mutexes, hence, such
// acquisitions always happens in the same order (this function), may not deadlock.

return None;
}
return None;
}

self.suspend(txn_idx, dep_condvar.clone());
self.suspend(txn_idx, dep_condvar.clone());

// Safe to add dependency here (still holding the lock) - finish_execution of txn
// dep_txn_idx is guaranteed to acquire the same lock later and clear the dependency.
stored_deps.push(txn_idx);
}
// Safe to add dependency here (still holding the lock) - finish_execution of txn
// dep_txn_idx is guaranteed to acquire the same lock later and clear the dependency.
stored_deps.push(txn_idx);

Some(dep_condvar)
}
Expand Down Expand Up @@ -433,7 +435,7 @@ impl Scheduler {
.fetch_min(execution_target_idx, Ordering::SeqCst);
}

let (cur_val_idx, cur_wave) =
let (cur_val_idx, mut cur_wave) =
Self::unpack_validation_idx(self.validation_idx.load(Ordering::Acquire));

// If validation_idx is already lower than txn_idx, all required transactions will be
Expand All @@ -443,17 +445,13 @@ impl Scheduler {
// The transaction execution required revalidating all higher txns (not
// only itself), currently happens when incarnation writes to a new path
// (w.r.t. the write-set of its previous completed incarnation).
if let Some(wave) = self.decrease_validation_idx(txn_idx) {
// Under lock, current wave monotonically increasing, can simply write.
validation_status.max_triggered_wave = wave;
}
} else {
// Only transaction txn_idx requires validation. Return validation task
// back to the caller.
// Under lock, current wave is monotonically increasing, can simply write.
validation_status.required_wave = cur_wave;
return SchedulerTask::ValidationTask((txn_idx, incarnation), cur_wave);
if let Some(wave) = self.decrease_validation_idx(txn_idx + 1) {
cur_wave = wave;
};
}
// Update the minimum wave this txn needs to pass.
validation_status.required_wave = cur_wave;
return SchedulerTask::ValidationTask((txn_idx, incarnation), cur_wave);
}

SchedulerTask::NoTask
Expand All @@ -462,17 +460,25 @@ impl Scheduler {
/// Finalize a validation task of version (txn_idx, incarnation). In some cases,
/// may return a re-execution task back to the caller (otherwise, NoTask).
pub fn finish_abort(&self, txn_idx: TxnIndex, incarnation: Incarnation) -> SchedulerTask {
// Similar reason as in finish_execution to hold the validation lock throughout the
// function. Also note that we always lock validation status before execution status
// which is good to have a fixed order to avoid potential deadlocks.
let mut validation_status = self.txn_status[txn_idx].1.write();
self.set_aborted_status(txn_idx, incarnation);

// Schedule higher txns for validation, skipping txn_idx itself (needs to be
// re-executed first).
if let Some(wave) = self.decrease_validation_idx(txn_idx + 1) {
// Under lock, current wave monotonically increasing, can simply write.
validation_status.max_triggered_wave = wave;
{
// acquire exclusive lock on the validation status of txn_idx, and hold the lock
// while calling decrease_validation_idx below. Otherwise, this thread might get
// suspended after setting aborted ( = ready) status, and other threads might finish
// re-executing, then commit txn_idx, and potentially commit txn_idx + 1 before
// decrease_validation_idx would be able to set max_triggered_wave.
//
// Also, as a convention, we always acquire validation status lock before execution
// status lock, as we have to have a consistent order and this order is easier to
// provide correctness between finish_execution & try_commit.
let _validation_status = self.txn_status[txn_idx].1.write();

self.set_aborted_status(txn_idx, incarnation);

// Schedule higher txns for validation, skipping txn_idx itself (needs to be
// re-executed first).
self.decrease_validation_idx(txn_idx + 1);

// can release the lock early.
}

// txn_idx must be re-executed, and if execution_idx is lower, it will be.
Expand Down Expand Up @@ -503,11 +509,26 @@ impl Scheduler {

/// Decreases the validation index, adjusting the wave and validation status as needed.
fn decrease_validation_idx(&self, target_idx: TxnIndex) -> Option<Wave> {
// We only call with txn_idx + 1, so it can equal num_txns, but not be strictly larger.
debug_assert!(target_idx <= self.num_txns);
if target_idx >= self.num_txns {
return None;
}

if let Ok(prev_val_idx) =
self.validation_idx
.fetch_update(Ordering::Acquire, Ordering::SeqCst, |val_idx| {
let (txn_idx, wave) = Self::unpack_validation_idx(val_idx);
if txn_idx > target_idx {
let mut validation_status = self.txn_status[target_idx].1.write();
// Update the minimum wave all the suffix txn needs to pass.
// We set it to max for safety (to avoid overwriting with lower values
// by a slower thread), but currently this isn't strictly required
// as all callers of decrease_validation_idx hold a write lock on the
// previous transaction's validation status.
validation_status.max_triggered_wave =
max(validation_status.max_triggered_wave, wave + 1);

// Pack into validation index.
Some((target_idx as u64) | ((wave as u64 + 1) << 32))
} else {
Expand Down Expand Up @@ -595,6 +616,12 @@ impl Scheduler {
idx_to_validate: TxnIndex,
wave: Wave,
) -> Option<(Version, Wave)> {
// We do compare-and-swap here instead of fetch-and-increment as for execution index
// because we would like to not validate transactions when lower indices are in the
// 'never_executed' state (to avoid unnecessarily reducing validation index and creating
// redundant validation tasks). This is checked in the caller (in 'next_task' function),
// but if we used fetch-and-increment, two threads can arrive in a cloned state and
// both increment, effectively skipping over the 'never_executed' transaction index.
let validation_idx = (idx_to_validate as u64) | ((wave as u64) << 32);
let new_validation_idx = ((idx_to_validate + 1) as u64) | ((wave as u64) << 32);
if self
Expand Down
Loading

0 comments on commit a6a4c74

Please sign in to comment.