Skip to content

Commit

Permalink
[parallel execution] only bump incarnation when the task is aborted
Browse files Browse the repository at this point in the history
This makes the code more consistent, no need to read incarnation number when execute.

Closes: aptos-labs#507
  • Loading branch information
zekun000 authored and aptos-bot committed Apr 14, 2022
1 parent 22fe0cd commit e421c51
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 54 deletions.
8 changes: 4 additions & 4 deletions aptos-move/parallel-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ where

fn execute<'a>(
&self,
idx_to_execute: TxnIndex,
version: Version,
guard: TaskGuard<'a>,
signature_verified_block: &[T],
last_input_output: &TxnLastInputOutput<
Expand All @@ -133,6 +133,7 @@ where
scheduler: &'a Scheduler,
executor: &E,
) -> SchedulerTask<'a> {
let (idx_to_execute, incarnation) = version;
let txn = &signature_verified_block[idx_to_execute];

let state_view = MVHashMapView {
Expand All @@ -144,7 +145,6 @@ where

// VM execution.
let execute_result = executor.execute_transaction(&state_view, txn);
let incarnation = scheduler.get_executing_incarnation(idx_to_execute);
let mut prev_write_set: HashSet<T::Key> = last_input_output.write_set(idx_to_execute);

// For tracking whether the recent execution wrote outside of the previous write set.
Expand Down Expand Up @@ -251,8 +251,8 @@ where
versioned_data_cache,
scheduler,
),
SchedulerTask::ExecutionTask(idx_to_execute, None, guard) => self.execute(
idx_to_execute,
SchedulerTask::ExecutionTask(version_to_execute, None, guard) => self.execute(
version_to_execute,
guard,
block,
last_input_output,
Expand Down
37 changes: 15 additions & 22 deletions aptos-move/parallel-executor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Drop for TaskGuard<'_> {
/// NoTask holds no task (similar None if we wrapped tasks in Option), and Done implies that
/// there are no more tasks and the scheduler is done.
pub enum SchedulerTask<'a> {
ExecutionTask(TxnIndex, Option<DependencyCondvar>, TaskGuard<'a>),
ExecutionTask(Version, Option<DependencyCondvar>, TaskGuard<'a>),
ValidationTask(Version, TaskGuard<'a>),
NoTask,
Done,
Expand Down Expand Up @@ -71,7 +71,7 @@ pub enum SchedulerTask<'a> {
/// | try_incarnate (incarnate successfully)
/// |
/// ↓ suspend (waiting on dependency) resume
/// Executing(i) -----------------------------> Suspended(i) ------------> Ready(i+1)
/// Executing(i) -----------------------------> Suspended(i) ------------> Ready(i)
/// |
/// | finish_execution
/// ↓
Expand All @@ -81,6 +81,7 @@ pub enum SchedulerTask<'a> {
/// ↓ finish_abort
/// Aborting(i) ---------------------------------------------------------> Ready(i+1)
///
#[derive(Debug)]
enum TransactionStatus {
ReadyToExecute(Incarnation, Option<DependencyCondvar>),
Executing(Incarnation),
Expand Down Expand Up @@ -204,10 +205,10 @@ impl Scheduler {
if let Some((version_to_validate, guard)) = self.try_validate_next_version() {
return SchedulerTask::ValidationTask(version_to_validate, guard);
}
} else if let Some((idx_to_execute, maybe_condvar, guard)) =
self.try_execute_next_index()
} else if let Some((version_to_execute, maybe_condvar, guard)) =
self.try_execute_next_version()
{
return SchedulerTask::ExecutionTask(idx_to_execute, maybe_condvar, guard);
return SchedulerTask::ExecutionTask(version_to_execute, maybe_condvar, guard);
}
}
}
Expand Down Expand Up @@ -330,24 +331,17 @@ impl Scheduler {
// re-execution task back to the caller. If incarnation fails, there is
// nothing to do, as another thread must have succeeded to incarnate and
// obtain the task for re-execution.
if let Some((_new_incarnation, maybe_condvar)) = self.try_incarnate(txn_idx) {
return SchedulerTask::ExecutionTask(txn_idx, maybe_condvar, guard);
if let Some((new_incarnation, maybe_condvar)) = self.try_incarnate(txn_idx) {
return SchedulerTask::ExecutionTask(
(txn_idx, new_incarnation),
maybe_condvar,
guard,
);
}
}

SchedulerTask::NoTask
}

/// If the status is EXECUTING, return the executing incarnation number.
pub fn get_executing_incarnation(&self, txn_idx: TxnIndex) -> Incarnation {
let status = self.txn_status[txn_idx].lock();

if let TransactionStatus::Executing(incarnation) = &*status {
*incarnation
} else {
unreachable!();
}
}
}

/// Public functions of the Scheduler
Expand Down Expand Up @@ -443,7 +437,7 @@ impl Scheduler {
/// return the version to the caller together with a guard to be used for the
/// corresponding ExecutionTask.
/// - Otherwise, return None.
fn try_execute_next_index(&self) -> Option<(TxnIndex, Option<DependencyCondvar>, TaskGuard)> {
fn try_execute_next_version(&self) -> Option<(Version, Option<DependencyCondvar>, TaskGuard)> {
let idx_to_execute = self.execution_idx.load(Ordering::SeqCst);

// Optimization for check-done, to avoid num_tasks going up and down.
Expand Down Expand Up @@ -474,7 +468,7 @@ impl Scheduler {
self.drain_idx.fetch_add(1, Ordering::SeqCst);
}

(idx_to_execute, maybe_condvar, guard)
((idx_to_execute, incarnation), maybe_condvar, guard)
})
}

Expand All @@ -496,8 +490,7 @@ impl Scheduler {
fn resume(&self, txn_idx: TxnIndex) {
let mut status = self.txn_status[txn_idx].lock();
if let TransactionStatus::Suspended(incarnation, dep_condvar) = &*status {
*status =
TransactionStatus::ReadyToExecute(*incarnation + 1, Some(dep_condvar.clone()));
*status = TransactionStatus::ReadyToExecute(*incarnation, Some(dep_condvar.clone()));
} else {
unreachable!();
}
Expand Down
45 changes: 17 additions & 28 deletions aptos-move/parallel-executor/src/unit_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ fn scheduler_tasks() {
// not calling finish execution, so validation tasks not dispatched.
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(j, None, _) if i == j
SchedulerTask::ExecutionTask((j, 0), None, _) if i == j
));
assert!(s.get_executing_incarnation(i) == 0);
}

// Finish execution for txns 0, 2, 4. txn 0 without validate_suffix and because
Expand Down Expand Up @@ -202,24 +201,21 @@ fn scheduler_tasks() {
assert!(!s.try_abort(3, 0));
assert!(matches!(
s.finish_abort(3, 0, TaskGuard::new(&fake_counter)),
SchedulerTask::ExecutionTask(3, None, _)
SchedulerTask::ExecutionTask((3, 1), None, _)
));
assert!(s.get_executing_incarnation(3) == 1);

// can abort even after succesful validation
assert!(s.try_abort(4, 0));
assert!(matches!(
s.finish_abort(4, 0, TaskGuard::new(&fake_counter)),
SchedulerTask::ExecutionTask(4, None, _)
SchedulerTask::ExecutionTask((4, 1), None, _)
));
assert!(s.get_executing_incarnation(4) == 1);

// txn 4 is aborted, so there won't be a validation task.
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(5, None, _)
SchedulerTask::ExecutionTask((5, 0), None, _)
));
assert!(s.get_executing_incarnation(5) == 0);
// Wrap up all outstanding tasks.
assert!(matches!(
s.finish_execution(4, 1, false, TaskGuard::new(&fake_counter)),
Expand Down Expand Up @@ -252,9 +248,8 @@ fn scheduler_dependency() {
// not calling finish execution, so validation tasks not dispatched.
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(j, None, _) if j == i
SchedulerTask::ExecutionTask((j, 0), None, _) if j == i
));
assert!(s.get_executing_incarnation(i) == 0);
}

assert!(matches!(
Expand All @@ -263,9 +258,8 @@ fn scheduler_dependency() {
));
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(5, None, _)
SchedulerTask::ExecutionTask((5, 0), None, _)
));
assert!(s.get_executing_incarnation(5) == 0);

assert!(s.wait_for_dependency(3, 0).is_none());
assert!(s.wait_for_dependency(4, 2).is_some());
Expand All @@ -274,11 +268,11 @@ fn scheduler_dependency() {
s.finish_execution(2, 0, false, TaskGuard::new(&fake_counter)),
SchedulerTask::ValidationTask((2, 0), _)
));
// resumed task doesn't bump incarnation
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(4, Some(_), _)
SchedulerTask::ExecutionTask((4, 0), Some(_), _)
));
assert!(s.get_executing_incarnation(4) == 1);
}

#[test]
Expand All @@ -290,9 +284,8 @@ fn scheduler_incarnation() {
// not calling finish execution, so validation tasks not dispatched.
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(j, None, _) if j == i
SchedulerTask::ExecutionTask((j, 0), None, _) if j == i
));
assert!(s.get_executing_incarnation(i) == 0);
}
// execution index = 5
assert!(s.wait_for_dependency(1, 0).is_some());
Expand Down Expand Up @@ -322,9 +315,8 @@ fn scheduler_incarnation() {

assert!(matches!(
s.finish_abort(2, 0, TaskGuard::new(&fake_counter)),
SchedulerTask::ExecutionTask(2, None, _)
SchedulerTask::ExecutionTask((2, 1), None, _)
));
assert!(s.get_executing_incarnation(2) == 1);

assert!(matches!(
s.finish_execution(0, 0, false, TaskGuard::new(&fake_counter)),
Expand All @@ -339,32 +331,29 @@ fn scheduler_incarnation() {

assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(1, Some(_), _)
SchedulerTask::ExecutionTask((1, 0), Some(_), _)
));
assert!(s.get_executing_incarnation(1) == 1);
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(3, Some(_), _)
SchedulerTask::ExecutionTask((3, 0), Some(_), _)
));
assert!(s.get_executing_incarnation(3) == 1);
assert!(matches!(
s.next_task(),
SchedulerTask::ExecutionTask(4, None, _)
SchedulerTask::ExecutionTask((4, 1), None, _)
));
assert!(s.get_executing_incarnation(4) == 1);
// execution index = 5

assert!(matches!(
s.finish_execution(1, 1, false, TaskGuard::new(&fake_counter)),
SchedulerTask::ValidationTask((1, 1), _)
s.finish_execution(1, 0, false, TaskGuard::new(&fake_counter)),
SchedulerTask::ValidationTask((1, 0), _)
));
assert!(matches!(
s.finish_execution(2, 1, false, TaskGuard::new(&fake_counter)),
SchedulerTask::ValidationTask((2, 1), _)
));
assert!(matches!(
s.finish_execution(3, 1, false, TaskGuard::new(&fake_counter)),
SchedulerTask::ValidationTask((3, 1), _)
s.finish_execution(3, 0, false, TaskGuard::new(&fake_counter)),
SchedulerTask::ValidationTask((3, 0), _)
));

// validation index is 4, so finish execution doesn't return validation task, next task does.
Expand Down

0 comments on commit e421c51

Please sign in to comment.