diff --git a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs index a95d3f4fe0398..25246e02b1a4b 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs @@ -7,7 +7,9 @@ use aptos_types::{ on_chain_config::{OnChainConfig, ValidatorSet}, transaction::Transaction, }; -use aptos_vm::{data_cache::AsMoveResolver, AptosVM, VMExecutor}; +use aptos_vm::{ + data_cache::AsMoveResolver, parallel_executor::ParallelAptosVM, AptosVM, VMExecutor, +}; use criterion::{measurement::Measurement, BatchSize, Bencher}; use language_e2e_tests::{ account_universe::{log_balance_strategy, AUTransactionGen, AccountUniverseGen}, @@ -80,13 +82,13 @@ where pub fn bench_parallel(&self, b: &mut Bencher) { b.iter_batched( || { - TransactionBenchState::with_size_parallel( + TransactionBenchState::with_size( &self.strategy, self.num_accounts, self.num_transactions, ) }, - |state| state.execute(), + |state| state.execute_parallel(), // The input here is the entire list of signed transactions, so it's pretty large. BatchSize::LargeInput, ) @@ -115,11 +117,30 @@ impl TransactionBenchState { S: Strategy, S::Value: AUTransactionGen, { - Self::with_universe( + let mut state = Self::with_universe( strategy, universe_strategy(num_accounts, num_transactions), num_transactions, - ) + ); + + // Insert a blockmetadata transaction at the beginning to better simulate the real life traffic. + let validator_set = + ValidatorSet::fetch_config(&state.executor.get_state_view().as_move_resolver()) + .expect("Unable to retrieve the validator set from storage"); + + let new_block = BlockMetadata::new( + HashValue::zero(), + 0, + 1, + vec![], + *validator_set.payload()[0].account_address(), + ); + + state + .transactions + .insert(0, Transaction::BlockMetadata(new_block)); + + state } /// Creates a new benchmark state with the given account universe strategy and number of @@ -168,36 +189,12 @@ impl TransactionBenchState { .expect("VM should not fail to start"); } - fn with_size_parallel(strategy: S, num_accounts: usize, num_transactions: usize) -> Self - where - S: Strategy, - S::Value: AUTransactionGen, - { - let mut state = Self::with_universe( - strategy, - universe_strategy(num_accounts, num_transactions), - num_transactions, - ); - state.executor.enable_parallel_execution(); - - // Insert a blockmetadata transaction at the beginning to better simulate the real life traffic. - let validator_set = - ValidatorSet::fetch_config(&state.executor.get_state_view().as_move_resolver()) - .expect("Unable to retrieve the validator set from storage"); - - let new_block = BlockMetadata::new( - HashValue::zero(), - 0, - 1, - vec![], - *validator_set.payload()[0].account_address(), - ); - - state - .transactions - .insert(0, Transaction::BlockMetadata(new_block)); - - state + /// Executes this state in a single block via parallel execution. + fn execute_parallel(self) { + // The output is ignored here since we're just testing transaction performance, not trying + // to assert correctness. + ParallelAptosVM::execute_block(self.transactions, self.executor.get_state_view()) + .expect("VM should not fail to start"); } } diff --git a/aptos-move/aptos-vm/src/parallel_executor/mod.rs b/aptos-move/aptos-vm/src/parallel_executor/mod.rs index 786b621917cb9..7ddca393d84f5 100644 --- a/aptos-move/aptos-vm/src/parallel_executor/mod.rs +++ b/aptos-move/aptos-vm/src/parallel_executor/mod.rs @@ -8,7 +8,7 @@ mod vm_wrapper; use crate::{ adapter_common::{preprocess_transaction, PreprocessedTransaction}, aptos_vm::AptosVM, - parallel_executor::vm_wrapper::DiemVMWrapper, + parallel_executor::vm_wrapper::AptosVMWrapper, }; use aptos_parallel_executor::{ errors::Error, @@ -30,9 +30,9 @@ impl PTransaction for PreprocessedTransaction { } // Wrapper to avoid orphan rule -pub(crate) struct DiemTransactionOutput(TransactionOutput); +pub(crate) struct AptosTransactionOutput(TransactionOutput); -impl DiemTransactionOutput { +impl AptosTransactionOutput { pub fn new(output: TransactionOutput) -> Self { Self(output) } @@ -41,7 +41,7 @@ impl DiemTransactionOutput { } } -impl PTransactionOutput for DiemTransactionOutput { +impl PTransactionOutput for AptosTransactionOutput { type T = PreprocessedTransaction; fn get_writes(&self) -> Vec<(StateKey, WriteOp)> { @@ -74,13 +74,13 @@ impl ParallelAptosVM { .map(|txn| preprocess_transaction::(txn.clone())) .collect(); - match ParallelTransactionExecutor::>::new() + match ParallelTransactionExecutor::>::new() .execute_transactions_parallel(state_view, signature_verified_block) { Ok(results) => Ok(( results .into_iter() - .map(DiemTransactionOutput::into) + .map(AptosTransactionOutput::into) .collect(), None, )), diff --git a/aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs b/aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs index 5879d16974cd0..45f57fc402abf 100644 --- a/aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs +++ b/aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs @@ -32,12 +32,11 @@ impl<'a, S: StateView> StateView for VersionedView<'a, S> { // Get some data either through the cache or the `StateView` on a cache miss. fn get_state_value(&self, state_key: &StateKey) -> anyhow::Result>> { match self.hashmap_view.read(state_key) { - Ok(Some(v)) => Ok(match v.as_ref() { + Some(v) => Ok(match v.as_ref() { WriteOp::Value(w) => Some(w.clone()), WriteOp::Deletion => None, }), - Ok(None) => self.base_view.get_state_value(state_key), - Err(err) => Err(err), + None => self.base_view.get_state_value(state_key), } } diff --git a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs b/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs index d17120bded485..73fbb686e6252 100644 --- a/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs +++ b/aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs @@ -6,7 +6,7 @@ use crate::{ aptos_vm::AptosVM, data_cache::RemoteStorage, logging::AdapterLogSchema, - parallel_executor::{storage_wrapper::VersionedView, DiemTransactionOutput}, + parallel_executor::{storage_wrapper::VersionedView, AptosTransactionOutput}, }; use aptos_logger::prelude::*; use aptos_parallel_executor::{ @@ -14,34 +14,39 @@ use aptos_parallel_executor::{ task::{ExecutionStatus, ExecutorTask}, }; use aptos_state_view::StateView; -use aptos_types::{ - account_config::DIEM_ACCOUNT_MODULE, state_store::state_key::StateKey, write_set::WriteOp, +use aptos_types::{state_store::state_key::StateKey, write_set::WriteOp}; +use move_core_types::{ + ident_str, + language_storage::{ModuleId, CORE_CODE_ADDRESS}, + vm_status::VMStatus, }; -use move_core_types::vm_status::VMStatus; -pub(crate) struct DiemVMWrapper<'a, S> { +pub(crate) struct AptosVMWrapper<'a, S> { vm: AptosVM, base_view: &'a S, } -impl<'a, S: 'a + StateView> ExecutorTask for DiemVMWrapper<'a, S> { +impl<'a, S: 'a + StateView> ExecutorTask for AptosVMWrapper<'a, S> { type T = PreprocessedTransaction; - type Output = DiemTransactionOutput; + type Output = AptosTransactionOutput; type Error = VMStatus; type Argument = &'a S; fn init(argument: &'a S) -> Self { let vm = AptosVM::new(argument); - // Loading `0x1::DiemAccount` and its transitive dependency into the code cache. + // Loading `0x1::AptosAccount` and its transitive dependency into the code cache. // // This should give us a warm VM to avoid the overhead of VM cold start. // Result of this load could be omitted as this is a best effort approach and won't hurt if that fails. // - // Loading up `0x1::DiemAccount` should be sufficient as this is the most common module + // Loading up `0x1::AptosAccount` should be sufficient as this is the most common module // used for prologue, epilogue and transfer functionality. - let _ = vm.load_module(&DIEM_ACCOUNT_MODULE, &RemoteStorage::new(argument)); + let _ = vm.load_module( + &ModuleId::new(CORE_CODE_ADDRESS, ident_str!("AptosAccount").to_owned()), + &RemoteStorage::new(argument), + ); Self { vm, @@ -53,7 +58,7 @@ impl<'a, S: 'a + StateView> ExecutorTask for DiemVMWrapper<'a, S> { &self, view: &MVHashMapView, txn: &PreprocessedTransaction, - ) -> ExecutionStatus { + ) -> ExecutionStatus { let log_context = AdapterLogSchema::new(self.base_view.id(), view.txn_idx()); let versioned_view = VersionedView::new_view(self.base_view, view); @@ -76,9 +81,9 @@ impl<'a, S: 'a + StateView> ExecutorTask for DiemVMWrapper<'a, S> { }; } if AptosVM::should_restart_execution(&output) { - ExecutionStatus::SkipRest(DiemTransactionOutput::new(output)) + ExecutionStatus::SkipRest(AptosTransactionOutput::new(output)) } else { - ExecutionStatus::Success(DiemTransactionOutput::new(output)) + ExecutionStatus::Success(AptosTransactionOutput::new(output)) } } Err(err) => ExecutionStatus::Abort(err), diff --git a/aptos-move/e2e-tests/src/executor.rs b/aptos-move/e2e-tests/src/executor.rs index c46254bef6e5e..fe5fe977d79a1 100644 --- a/aptos-move/e2e-tests/src/executor.rs +++ b/aptos-move/e2e-tests/src/executor.rs @@ -23,9 +23,7 @@ use aptos_types::{ access_path::AccessPath, account_config::{AccountResource, BalanceResource, TransferEventsResource, CORE_CODE_ADDRESS}, block_metadata::{new_block_event_key, BlockMetadata, NewBlockEvent}, - on_chain_config::{ - OnChainConfig, ParallelExecutionConfig, VMPublishingOption, ValidatorSet, Version, - }, + on_chain_config::{OnChainConfig, VMPublishingOption, ValidatorSet, Version}, state_store::state_key::StateKey, transaction::{ ChangeSet, SignedTransaction, Transaction, TransactionOutput, TransactionStatus, @@ -40,9 +38,6 @@ use aptos_vm::{ parallel_executor::ParallelAptosVM, AptosVM, VMExecutor, VMValidator, }; -use aptos_writeset_generator::{ - encode_disable_parallel_execution, encode_enable_parallel_execution_with_config, -}; use move_core_types::{ account_address::AccountAddress, identifier::Identifier, @@ -488,40 +483,6 @@ impl FakeExecutor { self.apply_write_set(output.write_set()); } - pub fn enable_parallel_execution(&mut self) { - let aptos_root = Account::new_aptos_root(); - let seq_num = self - .read_account_resource_at_address(aptos_root.address()) - .unwrap() - .sequence_number(); - - let txn = aptos_root - .transaction() - .write_set(encode_enable_parallel_execution_with_config()) - .sequence_number(seq_num) - .sign(); - self.execute_and_apply(txn); - assert!( - ParallelExecutionConfig::fetch_config(&self.data_store.as_move_resolver()).is_some() - ); - } - - pub fn disable_parallel_execution(&mut self) { - if ParallelExecutionConfig::fetch_config(&self.data_store.as_move_resolver()).is_some() { - let aptos_root = Account::new_aptos_root(); - let txn = aptos_root - .transaction() - .write_set(encode_disable_parallel_execution()) - .sequence_number( - self.read_account_resource_at_address(aptos_root.address()) - .unwrap() - .sequence_number(), - ) - .sign(); - self.execute_and_apply(txn); - } - } - fn module(name: &str) -> ModuleId { ModuleId::new(CORE_CODE_ADDRESS, Identifier::new(name).unwrap()) } diff --git a/aptos-move/parallel-executor/src/executor.rs b/aptos-move/parallel-executor/src/executor.rs index 6ae245ff770ac..ec2d1927b49a0 100644 --- a/aptos-move/parallel-executor/src/executor.rs +++ b/aptos-move/parallel-executor/src/executor.rs @@ -8,21 +8,19 @@ use crate::{ task::{ExecutionStatus, ExecutorTask, Transaction, TransactionOutput}, txn_last_input_output::{ReadDescriptor, TxnLastInputOutput}, }; -use anyhow::{bail, Result as AResult}; use aptos_infallible::Mutex; use mvhashmap::MVHashMap; use num_cpus; -use rayon::{prelude::*, scope}; -use std::{ - collections::HashSet, - hash::Hash, - marker::PhantomData, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread::spawn, -}; +use once_cell::sync::Lazy; +use rayon::prelude::*; +use std::{collections::HashSet, hash::Hash, marker::PhantomData, sync::Arc, thread::spawn}; + +static RAYON_EXEC_POOL: Lazy = Lazy::new(|| { + rayon::ThreadPoolBuilder::new() + .num_threads(num_cpus::get()) + .build() + .unwrap() +}); /// A struct that is always used by a single thread performing an execution task. The struct is /// passed to the VM and acts as a proxy to resolve reads first in the shared multi-version @@ -35,7 +33,6 @@ pub struct MVHashMapView<'a, K, V> { versioned_map: &'a MVHashMap, txn_idx: TxnIndex, scheduler: &'a Scheduler, - read_dependency: AtomicBool, captured_reads: Mutex>>, } @@ -47,7 +44,7 @@ impl<'a, K: PartialOrd + Send + Clone + Hash + Eq, V: Send + Sync> MVHashMapView } /// Captures a read from the VM execution. - pub fn read(&self, key: &K) -> AResult>> { + pub fn read(&self, key: &K) -> Option> { loop { match self.versioned_map.read(key, self.txn_idx) { Ok((version, v)) => { @@ -57,23 +54,39 @@ impl<'a, K: PartialOrd + Send + Clone + Hash + Eq, V: Send + Sync> MVHashMapView txn_idx, incarnation, )); - return Ok(Some(v)); + return Some(v); } Err(None) => { self.captured_reads .lock() .push(ReadDescriptor::from_storage(key.clone())); - return Ok(None); + return None; } Err(Some(dep_idx)) => { - // Don't start execution transaction `self.txn_idx` until `dep_idx` is computed. - if self.scheduler.try_add_dependency(self.txn_idx, dep_idx) { - // dep_idx is already executed, push `self.txn_idx` to ready queue. - self.read_dependency.store(true, Ordering::Relaxed); - bail!("Read dependency is not computed, retry later") - } else { - // Re-read, as the dependency got resolved. - continue; + // `self.txn_idx` estimated to depend on a write from `dep_idx`. + match self.scheduler.wait_for_dependency(self.txn_idx, dep_idx) { + Some(dep_condition) => { + // Wait on a condition variable correpsonding to the encountered + // read dependency. Once the dep_idx finishes re-execution, scheduler + // will mark the dependency as resolved, and then the txn_idx will be + // scheduled for re-execution, which will re-awaken cvar here. + // A deadlock is not possible due to these condition variables: + // suppose all threads are waiting on read dependency, and consider + // one with lowest txn_idx. It observed a dependency, so some thread + // aborted dep_idx. If that abort returned execution task, by + // minimality (lower transactions aren't waiting), that thread would + // finish execution unblock txn_idx, contradiction. Otherwise, + // execution_idx in scheduler was lower at a time when at least the + // thread that aborted dep_idx was alive, and again, since lower txns + // than txn_idx are not blocked, so the execution of dep_idx will + // eventually finish and lead to unblocking txn_idx, contradiction. + let (lock, cvar) = &*dep_condition; + let mut dep_resolved = lock.lock(); + while !*dep_resolved { + dep_resolved = cvar.wait(dep_resolved).unwrap(); + } + } + None => continue, } } }; @@ -84,15 +97,12 @@ impl<'a, K: PartialOrd + Send + Clone + Hash + Eq, V: Send + Sync> MVHashMapView pub fn txn_idx(&self) -> TxnIndex { self.txn_idx } - - /// Return whether a read dependency was encountered during VM execution. - pub fn read_dependency(&self) -> bool { - self.read_dependency.load(Ordering::Relaxed) - } } pub struct ParallelTransactionExecutor { - num_cpus: usize, + // number of active concurrent tasks, corresponding to the maximum number of rayon + // threads that may be concurrently participating in parallel execution. + concurrency_level: usize, phantom: PhantomData<(T, E)>, } @@ -103,14 +113,15 @@ where { pub fn new() -> Self { Self { - num_cpus: num_cpus::get(), + // TODO: must be a configurable parameter. + concurrency_level: num_cpus::get(), phantom: PhantomData, } } - pub fn execute<'a>( + fn execute<'a>( &self, - version_to_execute: Version, + idx_to_execute: TxnIndex, guard: TaskGuard<'a>, signature_verified_block: &[T], last_input_output: &TxnLastInputOutput< @@ -122,41 +133,18 @@ where scheduler: &'a Scheduler, executor: &E, ) -> SchedulerTask<'a> { - let (idx_to_execute, incarnation) = version_to_execute; let txn = &signature_verified_block[idx_to_execute]; - // An optimization to pre-check that there are no read dependencies once prior read-set - // is available, to avoid an execution that will likely be discarded due to the dependency. - // TODO (issue 10180): remove once we have a way to suspend VM execution (so partial - // execution would not be discarded). - if let Some(read_set) = last_input_output.read_set(idx_to_execute) { - if read_set.iter().any( - |r| match versioned_data_cache.read(r.path(), idx_to_execute) { - Err(Some(dep_idx)) => scheduler.try_add_dependency(idx_to_execute, dep_idx), - Ok(_) | Err(None) => false, - }, - ) { - // Transaction has a read dependency. Was not executed and thus nothing to validate. - return SchedulerTask::NoTask; - } - } - let state_view = MVHashMapView { versioned_map: versioned_data_cache, txn_idx: idx_to_execute, scheduler, - read_dependency: AtomicBool::new(false), captured_reads: Mutex::new(Vec::new()), }; // VM execution. let execute_result = executor.execute_transaction(&state_view, txn); - - if state_view.read_dependency() { - // Encountered and already handled (added to Scheduler) a read dependency. - return SchedulerTask::NoTask; - } - + let incarnation = scheduler.get_executing_incarnation(idx_to_execute); let mut prev_write_set: HashSet = last_input_output.write_set(idx_to_execute); // For tracking whether the recent execution wrote outside of the previous write set. @@ -199,7 +187,7 @@ where scheduler.finish_execution(idx_to_execute, incarnation, writes_outside, guard) } - pub fn validate<'a>( + fn validate<'a>( &self, version_to_validate: Version, guard: TaskGuard<'a>, @@ -238,6 +226,57 @@ where } } + fn work_task_with_scope( + &self, + executor_arguments: &E::Argument, + block: &[T], + last_input_output: &TxnLastInputOutput< + ::Key, + ::Output, + ::Error, + >, + versioned_data_cache: &MVHashMap<::Key, ::Value>, + scheduler: &Scheduler, + ) { + // Make executor for each task. TODO: fast concurrent executor. + let executor = E::init(*executor_arguments); + + let mut scheduler_task = SchedulerTask::NoTask; + loop { + scheduler_task = match scheduler_task { + SchedulerTask::ValidationTask(version_to_validate, guard) => self.validate( + version_to_validate, + guard, + last_input_output, + versioned_data_cache, + scheduler, + ), + SchedulerTask::ExecutionTask(idx_to_execute, None, guard) => self.execute( + idx_to_execute, + guard, + block, + last_input_output, + versioned_data_cache, + scheduler, + &executor, + ), + SchedulerTask::ExecutionTask(_, Some(condvar), _guard) => { + let (lock, cvar) = &*condvar; + // Mark dependency resolved. + *lock.lock() = true; + // Wake up the process waiting for dependency. + cvar.notify_one(); + + SchedulerTask::NoTask + } + SchedulerTask::NoTask => scheduler.next_task(), + SchedulerTask::Done => { + break; + } + } + } + } + pub fn execute_transactions_parallel( &self, executor_initial_arguments: E::Argument, @@ -250,63 +289,38 @@ where let num_txns = signature_verified_block.len(); let versioned_data_cache = MVHashMap::new(); let outcomes = OutcomeArray::new(num_txns); - let compute_cpus = self.num_cpus; let last_input_output = TxnLastInputOutput::new(num_txns); let scheduler = Scheduler::new(num_txns); - scope(|s| { - println!( - "Launching {} threads to execute... total txns: {:?}", - compute_cpus, - scheduler.num_txn_to_execute(), - ); - - for _ in 0..(compute_cpus) { + RAYON_EXEC_POOL.scope(|s| { + for _ in 0..self.concurrency_level { s.spawn(|_| { - // Make executor for each thread. - let executor = E::init(executor_initial_arguments); - - let mut scheduler_task = SchedulerTask::NoTask; - loop { - scheduler_task = match scheduler_task { - SchedulerTask::ValidationTask(version_to_validate, guard) => self - .validate( - version_to_validate, - guard, - &last_input_output, - &versioned_data_cache, - &scheduler, - ), - SchedulerTask::ExecutionTask(version_to_execute, guard) => self - .execute( - version_to_execute, - guard, - &signature_verified_block, - &last_input_output, - &versioned_data_cache, - &scheduler, - &executor, - ), - SchedulerTask::NoTask => scheduler.next_task(), - SchedulerTask::Done => break, - } - } + self.work_task_with_scope( + &executor_initial_arguments, + &signature_verified_block, + &last_input_output, + &versioned_data_cache, + &scheduler, + ); }); } }); // Extract outputs in parallel let valid_results_size = scheduler.num_txn_to_execute(); - let chunk_size = (valid_results_size + 4 * compute_cpus - 1) / (4 * compute_cpus); - (0..valid_results_size) - .collect::>() - .par_chunks(chunk_size) - .map(|chunk| { - for idx in chunk.iter() { - outcomes.set_result(*idx, last_input_output.take_output(*idx)); - } - }) - .collect::<()>(); + let chunk_size = + (valid_results_size + 4 * self.concurrency_level - 1) / (4 * self.concurrency_level); + RAYON_EXEC_POOL.install(|| { + (0..valid_results_size) + .collect::>() + .par_chunks(chunk_size) + .map(|chunk| { + for idx in chunk.iter() { + outcomes.set_result(*idx, last_input_output.take_output(*idx)); + } + }) + .collect::<()>(); + }); spawn(move || { // Explicit async drops. diff --git a/aptos-move/parallel-executor/src/proptest_types/tests.rs b/aptos-move/parallel-executor/src/proptest_types/tests.rs index 7bf35d073d85f..4e4abff391433 100644 --- a/aptos-move/parallel-executor/src/proptest_types/tests.rs +++ b/aptos-move/parallel-executor/src/proptest_types/tests.rs @@ -73,7 +73,6 @@ proptest! { prop_assert!(run_transactions(universe, transaction_gen, abort_transactions, skip_rest_transactions)); } - #[test] fn mixed_transactions( universe in vec(any::<[u8; 32]>(), 100), diff --git a/aptos-move/parallel-executor/src/proptest_types/types.rs b/aptos-move/parallel-executor/src/proptest_types/types.rs index 9259aa2069fe9..a215f9a744f3d 100644 --- a/aptos-move/parallel-executor/src/proptest_types/types.rs +++ b/aptos-move/parallel-executor/src/proptest_types/types.rs @@ -242,11 +242,7 @@ where // Reads let mut reads_result = vec![]; for k in reads.iter() { - reads_result.push(match view.read(k) { - Ok(Some(v)) => Some((*v).clone()), - Ok(None) => None, - Err(_) => return ExecutionStatus::Abort(0), - }) + reads_result.push(view.read(k).map(|v| (*v).clone())); } ExecutionStatus::Success(Output(actual_writes.clone(), reads_result)) } diff --git a/aptos-move/parallel-executor/src/scheduler.rs b/aptos-move/parallel-executor/src/scheduler.rs index 3147c4ea35cf7..0236bbdf6d9ea 100644 --- a/aptos-move/parallel-executor/src/scheduler.rs +++ b/aptos-move/parallel-executor/src/scheduler.rs @@ -4,15 +4,19 @@ use aptos_infallible::Mutex; use crossbeam::utils::CachePadded; use std::{ - cmp::min, + cmp::{max, min}, hint, - sync::atomic::{AtomicBool, AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Condvar, + }, }; // Type aliases. pub type TxnIndex = usize; pub type Incarnation = usize; pub type Version = (TxnIndex, Incarnation); +type DependencyCondvar = Arc<(Mutex, Condvar)>; // A struct to track the number of active tasks in the scheduler using RAII. pub struct TaskGuard<'a> { @@ -37,18 +41,21 @@ impl Drop for TaskGuard<'_> { /// NoTask holds no task (similar None if we wrapped tasks in Option), and Done implies that /// there are no more tasks and the scheduler is done. pub enum SchedulerTask<'a> { - ExecutionTask(Version, TaskGuard<'a>), + ExecutionTask(TxnIndex, Option, TaskGuard<'a>), ValidationTask(Version, TaskGuard<'a>), NoTask, Done, } -#[derive(PartialEq)] /// All possible statuses for each transaction. Each status contains the latest incarnation number. /// /// 'ReadyToExecute' means that the corresponding incarnation should be executed and the scheduler /// must eventually create a corresponding execution task. The scheduler ensures that exactly one -/// execution task gets created, changing the status to 'Executing' in the process. +/// execution task gets created, changing the status to 'Executing' in the process. If a dependency +/// condition variable is set, then an execution of a prior incarnation is waiting on it with +/// a read dependency resolved (when dependency was encountered, the status changed to Suspended, +/// and suspended changed to ReadyToExecute when the dependency finished its execution). In this case +/// the caller need not create a new execution task, but just nofity the suspended execution. /// /// 'Executing' status of an incarnation turns into 'Executed' if the execution task finishes, or /// if a dependency is encountered, it becomes 'ReadyToExecute(incarnation + 1)' once the @@ -59,31 +66,43 @@ pub enum SchedulerTask<'a> { /// to 'ReadyToExecute(incarnation + 1)', allowing the scheduler to create an execution /// task for the next incarnation of the transaction. /// -/// Status transition diagram when with no dependencies: -/// ReadyToExecute(i) => --exactly once-- => Executing(i) => --transaction executed i-th time-- => -/// => Executed(i) => --validations happen-- => -/// 1. validation failures: --exactly once-- => Aborting(i) => => ReadyToExecute(i+1) -/// 2. no validation failures: status remains Executed(i). -/// /// Status transition diagram: /// Ready(i) /// | try_incarnate (incarnate successfully) -/// ↓ resume -/// Executing(i) (pending for exactly one execution) ------------> Ready(i+1) +/// | +/// ↓ suspend (waiting on dependency) resume +/// Executing(i) -----------------------------> Suspended(i) ------------> Ready(i+1) +/// | /// | finish_execution /// ↓ /// Executed(i) (pending for (re)validations) +/// | /// | try_abort (abort successfully) /// ↓ finish_abort -/// Aborting(i) -------------------------------------------------> Ready(i+1) +/// Aborting(i) ---------------------------------------------------------> Ready(i+1) /// enum TransactionStatus { - ReadyToExecute(Incarnation), + ReadyToExecute(Incarnation, Option), Executing(Incarnation), + Suspended(Incarnation, DependencyCondvar), Executed(Incarnation), Aborting(Incarnation), } +impl PartialEq for TransactionStatus { + fn eq(&self, other: &Self) -> bool { + use TransactionStatus::*; + match (self, other) { + (&ReadyToExecute(ref a, _), &ReadyToExecute(ref b, _)) + | (&Executing(ref a), &Executing(ref b)) + | (&Suspended(ref a, _), &Suspended(ref b, _)) + | (&Executed(ref a), &Executed(ref b)) + | (&Aborting(ref a), &Aborting(ref b)) => a == b, + _ => false, + } + } +} + pub struct Scheduler { /// A shared index that tracks the minimum of all transaction indices that require execution. /// The threads increment the index and attempt to create an execution task for the corresponding @@ -110,6 +129,10 @@ pub struct Scheduler { /// Shared number of txns to execute: updated before executing a block or when an error or /// reconfiguration leads to early stopping (at that transaction idx). stop_idx: AtomicUsize, + /// When stop_idx is reduced, we should stop creating tasks for higher indices, and let + /// existing tasks with higher indices drain. To not drain the whole block, drain_idx counts + /// the number of transactions that we ever scheduled for execution. + drain_idx: AtomicUsize, /// An index i maps to indices of other transactions that depend on transaction i, i.e. they /// should be re-executed once transaction i's next incarnation finishes. @@ -128,11 +151,12 @@ impl Scheduler { num_active_tasks: AtomicUsize::new(0), done_marker: AtomicBool::new(false), stop_idx: AtomicUsize::new(num_txns), + drain_idx: AtomicUsize::new(1), txn_dependency: (0..num_txns) .map(|_| CachePadded::new(Mutex::new(Vec::new()))) .collect(), txn_status: (0..num_txns) - .map(|_| CachePadded::new(Mutex::new(TransactionStatus::ReadyToExecute(0)))) + .map(|_| CachePadded::new(Mutex::new(TransactionStatus::ReadyToExecute(0, None)))) .collect(), } } @@ -180,8 +204,10 @@ impl Scheduler { if let Some((version_to_validate, guard)) = self.try_validate_next_version() { return SchedulerTask::ValidationTask(version_to_validate, guard); } - } else if let Some((version_to_execute, guard)) = self.try_execute_next_version() { - return SchedulerTask::ExecutionTask(version_to_execute, guard); + } else if let Some((idx_to_execute, maybe_condvar, guard)) = + self.try_execute_next_index() + { + return SchedulerTask::ExecutionTask(idx_to_execute, maybe_condvar, guard); } } } @@ -192,32 +218,40 @@ impl Scheduler { /// transaction txn_idx will be resumed, and corresponding execution task created. /// If false is returned, it is caller's responsibility to repeat the read that caused the /// dependency and continue the ongoing execution of txn_idx. - pub fn try_add_dependency(&self, txn_idx: TxnIndex, dep_txn_idx: TxnIndex) -> bool { + pub fn wait_for_dependency( + &self, + txn_idx: TxnIndex, + dep_txn_idx: TxnIndex, + ) -> Option { // Note: Could pre-check that txn dep_txn_idx isn't in an executed state, but the caller // usually has just observed the read dependency. + // Create a condition variable associated with the dependency. + let dep_condvar = Arc::new((Mutex::new(false), Condvar::new())); + let mut stored_deps = self.txn_dependency[dep_txn_idx].lock(); - if self.is_executed(dep_txn_idx).is_some() { - // Current status of dep_txn_idx is 'executed', so the dependency got resolved. - // To avoid zombie dependency (and losing liveness), must return here and - // not add a (stale) dependency. + { + if self.is_executed(dep_txn_idx).is_some() { + // Current status of dep_txn_idx is 'executed', so the dependency got resolved. + // To avoid zombie dependency (and losing liveness), must return here and + // not add a (stale) dependency. - // Note: acquires (a different, status) mutex, while holding (dependency) mutex. - // Only place in scheduler where a thread may hold >1 mutexes, hence, such - // acquisitions always happens in the same order (this function), may not deadlock. + // Note: acquires (a different, status) mutex, while holding (dependency) mutex. + // Only place in scheduler where a thread may hold >1 mutexes, hence, such + // acquisitions always happens in the same order (this function), may not deadlock. - return false; - } + return None; + } - // Safe to add dependency here (still holding the lock) - finish_execution of txn - // dep_txn_idx is guaranteed to acquire the same lock later and clear the dependency. - stored_deps.push(txn_idx); + self.suspend(txn_idx, dep_condvar.clone()); - // Note: we could set status of txn_idx to 'Aborting', but don't, as an optimization, - // since the resume function below can work with 'Executing' status directly. + // Safe to add dependency here (still holding the lock) - finish_execution of txn + // dep_txn_idx is guaranteed to acquire the same lock later and clear the dependency. + stored_deps.push(txn_idx); + } - true + Some(dep_condvar) } /// After txn is executed, schedule its dependencies for re-execution. @@ -296,13 +330,24 @@ impl Scheduler { // re-execution task back to the caller. If incarnation fails, there is // nothing to do, as another thread must have succeeded to incarnate and // obtain the task for re-execution. - if let Some(new_incarnation) = self.try_incarnate(txn_idx) { - return SchedulerTask::ExecutionTask((txn_idx, new_incarnation), guard); + if let Some((_new_incarnation, maybe_condvar)) = self.try_incarnate(txn_idx) { + return SchedulerTask::ExecutionTask(txn_idx, maybe_condvar, guard); } } SchedulerTask::NoTask } + + /// If the status is EXECUTING, return the executing incarnation number. + pub fn get_executing_incarnation(&self, txn_idx: TxnIndex) -> Incarnation { + let status = self.txn_status[txn_idx].lock(); + + if let TransactionStatus::Executing(incarnation) = &*status { + *incarnation + } else { + unreachable!(); + } + } } /// Public functions of the Scheduler @@ -326,15 +371,16 @@ impl Scheduler { /// status is (atomically, due to the mutex) updated to Executing(incarnation). /// An unsuccessful incarnation returns None. Since incarnation numbers never decrease /// for each transaction, incarnate function may not succeed more than once per version. - fn try_incarnate(&self, txn_idx: TxnIndex) -> Option { + fn try_incarnate(&self, txn_idx: TxnIndex) -> Option<(Incarnation, Option)> { if txn_idx >= self.txn_status.len() { return None; } let mut status = self.txn_status[txn_idx].lock(); - if let TransactionStatus::ReadyToExecute(incarnation) = *status { - *status = TransactionStatus::Executing(incarnation); - Some(incarnation) + if let TransactionStatus::ReadyToExecute(incarnation, maybe_condvar) = &*status { + let ret = (*incarnation, maybe_condvar.clone()); + *status = TransactionStatus::Executing(*incarnation); + Some(ret) } else { None } @@ -366,7 +412,10 @@ impl Scheduler { let idx_to_validate = self.validation_idx.load(Ordering::SeqCst); // Optimization for check-done, to avoid num_tasks going up and down. - let num_txns = self.num_txn_to_execute(); + let num_txns = max( + self.num_txn_to_execute(), + self.drain_idx.load(Ordering::Acquire), + ); if idx_to_validate >= num_txns { if !self.check_done(num_txns) { // Avoid pointlessly spinning, and give priority to other threads that may @@ -389,20 +438,23 @@ impl Scheduler { /// Grab an index to try and execute next (by fetch-and-incrementing execution_idx). /// - If the index is out of bounds, return None (and invoke a check of whethre /// all txns can be committed). - /// - If the transaction is ready for execution (READY_TO_EXECUTE state), attempt + /// - If the transaction is ready for execution (ReadyToExecute state), attempt /// to create the next incarnation (should happen exactly once), and if successful, /// return the version to the caller together with a guard to be used for the /// corresponding ExecutionTask. /// - Otherwise, return None. - fn try_execute_next_version(&self) -> Option<(Version, TaskGuard)> { + fn try_execute_next_index(&self) -> Option<(TxnIndex, Option, TaskGuard)> { let idx_to_execute = self.execution_idx.load(Ordering::SeqCst); // Optimization for check-done, to avoid num_tasks going up and down. - let num_txns = self.num_txn_to_execute(); + let num_txns = max( + self.num_txn_to_execute(), + self.drain_idx.load(Ordering::Acquire), + ); if idx_to_execute >= num_txns { - // Avoid pointlessly spinning, and give priority to other threads that may - // be working to finish the remaining tasks. if !self.check_done(num_txns) { + // Avoid pointlessly spinning, and give priority to other threads that may + // be working to finish the remaining tasks. hint::spin_loop(); } return None; @@ -416,16 +468,36 @@ impl Scheduler { // If successfully incarnated (changed status from ready to executing), // return version and guard for execution task, otherwise None. self.try_incarnate(idx_to_execute) - .map(|incarnation| ((idx_to_execute, incarnation), guard)) + .map(|(incarnation, maybe_condvar)| { + if incarnation == 0 { + // counting transactions that ever got scheduled for execution. + self.drain_idx.fetch_add(1, Ordering::SeqCst); + } + + (idx_to_execute, maybe_condvar, guard) + }) + } + + /// Put a transaction in a suspended state, with a condition variable that can be + /// used to wake it up after the dependency is resolved. + fn suspend(&self, txn_idx: TxnIndex, dep_condvar: DependencyCondvar) { + let mut status = self.txn_status[txn_idx].lock(); + + if let TransactionStatus::Executing(incarnation) = *status { + *status = TransactionStatus::Suspended(incarnation, dep_condvar); + } else { + unreachable!(); + } } /// When a dependency is resolved, mark the transaction as ReadyToExecute with an /// incremented incarnation number. - /// The caller must ensure that the transaction is in the Executing state. + /// The caller must ensure that the transaction is in the Suspended state. fn resume(&self, txn_idx: TxnIndex) { let mut status = self.txn_status[txn_idx].lock(); - if let TransactionStatus::Executing(incarnation) = *status { - *status = TransactionStatus::ReadyToExecute(incarnation + 1); + if let TransactionStatus::Suspended(incarnation, dep_condvar) = &*status { + *status = + TransactionStatus::ReadyToExecute(*incarnation + 1, Some(dep_condvar.clone())); } else { unreachable!(); } @@ -449,7 +521,7 @@ impl Scheduler { // Only makes sense when the current status is 'Aborting'. debug_assert!(*status == TransactionStatus::Aborting(incarnation)); - *status = TransactionStatus::ReadyToExecute(incarnation + 1); + *status = TransactionStatus::ReadyToExecute(incarnation + 1, None); } /// A lazy, check of whether the scheduler execution is completed. diff --git a/aptos-move/parallel-executor/src/unit_tests/mod.rs b/aptos-move/parallel-executor/src/unit_tests/mod.rs index 7aaa3dd8c048a..6ab2415b3d374 100644 --- a/aptos-move/parallel-executor/src/unit_tests/mod.rs +++ b/aptos-move/parallel-executor/src/unit_tests/mod.rs @@ -140,8 +140,9 @@ fn scheduler_tasks() { // not calling finish execution, so validation tasks not dispatched. assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((j, 0), _) if i == j + SchedulerTask::ExecutionTask(j, None, _) if i == j )); + assert!(s.get_executing_incarnation(i) == 0); } // Finish execution for txns 0, 2, 4. txn 0 without validate_suffix and because @@ -201,21 +202,24 @@ fn scheduler_tasks() { assert!(!s.try_abort(3, 0)); assert!(matches!( s.finish_abort(3, 0, TaskGuard::new(&fake_counter)), - SchedulerTask::ExecutionTask((3, 1), _) + SchedulerTask::ExecutionTask(3, None, _) )); + assert!(s.get_executing_incarnation(3) == 1); // can abort even after succesful validation assert!(s.try_abort(4, 0)); assert!(matches!( s.finish_abort(4, 0, TaskGuard::new(&fake_counter)), - SchedulerTask::ExecutionTask((4, 1), _) + SchedulerTask::ExecutionTask(4, None, _) )); + assert!(s.get_executing_incarnation(4) == 1); // txn 4 is aborted, so there won't be a validation task. assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((5, 0), _) + SchedulerTask::ExecutionTask(5, None, _) )); + assert!(s.get_executing_incarnation(5) == 0); // Wrap up all outstanding tasks. assert!(matches!( s.finish_execution(4, 1, false, TaskGuard::new(&fake_counter)), @@ -248,8 +252,9 @@ fn scheduler_dependency() { // not calling finish execution, so validation tasks not dispatched. assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((j, 0), _) if j == i + SchedulerTask::ExecutionTask(j, None, _) if j == i )); + assert!(s.get_executing_incarnation(i) == 0); } assert!(matches!( @@ -258,11 +263,12 @@ fn scheduler_dependency() { )); assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((5, 0), _) + SchedulerTask::ExecutionTask(5, None, _) )); + assert!(s.get_executing_incarnation(5) == 0); - assert!(!s.try_add_dependency(3, 0)); - assert!(s.try_add_dependency(4, 2)); + assert!(s.wait_for_dependency(3, 0).is_none()); + assert!(s.wait_for_dependency(4, 2).is_some()); assert!(matches!( s.finish_execution(2, 0, false, TaskGuard::new(&fake_counter)), @@ -270,8 +276,9 @@ fn scheduler_dependency() { )); assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((4, 1), _) + SchedulerTask::ExecutionTask(4, Some(_), _) )); + assert!(s.get_executing_incarnation(4) == 1); } #[test] @@ -283,12 +290,13 @@ fn scheduler_incarnation() { // not calling finish execution, so validation tasks not dispatched. assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((j, 0), _) if j == i + SchedulerTask::ExecutionTask(j, None, _) if j == i )); + assert!(s.get_executing_incarnation(i) == 0); } // execution index = 5 - assert!(s.try_add_dependency(1, 0)); - assert!(s.try_add_dependency(3, 0)); + assert!(s.wait_for_dependency(1, 0).is_some()); + assert!(s.wait_for_dependency(3, 0).is_some()); assert!(matches!( s.finish_execution(2, 0, true, TaskGuard::new(&fake_counter)), @@ -314,8 +322,9 @@ fn scheduler_incarnation() { assert!(matches!( s.finish_abort(2, 0, TaskGuard::new(&fake_counter)), - SchedulerTask::ExecutionTask((2, 1), _) + SchedulerTask::ExecutionTask(2, None, _) )); + assert!(s.get_executing_incarnation(2) == 1); assert!(matches!( s.finish_execution(0, 0, false, TaskGuard::new(&fake_counter)), @@ -330,16 +339,19 @@ fn scheduler_incarnation() { assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((1, 1), _) + SchedulerTask::ExecutionTask(1, Some(_), _) )); + assert!(s.get_executing_incarnation(1) == 1); assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((3, 1), _) + SchedulerTask::ExecutionTask(3, Some(_), _) )); + assert!(s.get_executing_incarnation(3) == 1); assert!(matches!( s.next_task(), - SchedulerTask::ExecutionTask((4, 1), _) + SchedulerTask::ExecutionTask(4, None, _) )); + assert!(s.get_executing_incarnation(4) == 1); // execution index = 5 assert!(matches!( diff --git a/aptos-move/writeset-transaction-generator/templates/update_parallel_execution_config.move b/aptos-move/writeset-transaction-generator/templates/update_parallel_execution_config.move deleted file mode 100644 index 8ee38a0c011a1..0000000000000 --- a/aptos-move/writeset-transaction-generator/templates/update_parallel_execution_config.move +++ /dev/null @@ -1,6 +0,0 @@ -script { - use DiemFramework::ParallelExecutionConfig; - fun main(diem_root: signer, _execute_as: signer, payload: vector) { - ParallelExecutionConfig::enable_parallel_execution_with_config(&diem_root, payload); - } -}