Skip to content

Commit

Permalink
Async dependency waiting, task/workers over rayon threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
gelash authored and aptos-bot committed Apr 13, 2022
1 parent f165482 commit ceb0237
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 281 deletions.
67 changes: 32 additions & 35 deletions aptos-move/aptos-transaction-benchmarks/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use aptos_types::{
on_chain_config::{OnChainConfig, ValidatorSet},
transaction::Transaction,
};
use aptos_vm::{data_cache::AsMoveResolver, AptosVM, VMExecutor};
use aptos_vm::{
data_cache::AsMoveResolver, parallel_executor::ParallelAptosVM, AptosVM, VMExecutor,
};
use criterion::{measurement::Measurement, BatchSize, Bencher};
use language_e2e_tests::{
account_universe::{log_balance_strategy, AUTransactionGen, AccountUniverseGen},
Expand Down Expand Up @@ -80,13 +82,13 @@ where
pub fn bench_parallel<M: Measurement>(&self, b: &mut Bencher<M>) {
b.iter_batched(
|| {
TransactionBenchState::with_size_parallel(
TransactionBenchState::with_size(
&self.strategy,
self.num_accounts,
self.num_transactions,
)
},
|state| state.execute(),
|state| state.execute_parallel(),
// The input here is the entire list of signed transactions, so it's pretty large.
BatchSize::LargeInput,
)
Expand Down Expand Up @@ -115,11 +117,30 @@ impl TransactionBenchState {
S: Strategy,
S::Value: AUTransactionGen,
{
Self::with_universe(
let mut state = Self::with_universe(
strategy,
universe_strategy(num_accounts, num_transactions),
num_transactions,
)
);

// Insert a blockmetadata transaction at the beginning to better simulate the real life traffic.
let validator_set =
ValidatorSet::fetch_config(&state.executor.get_state_view().as_move_resolver())
.expect("Unable to retrieve the validator set from storage");

let new_block = BlockMetadata::new(
HashValue::zero(),
0,
1,
vec![],
*validator_set.payload()[0].account_address(),
);

state
.transactions
.insert(0, Transaction::BlockMetadata(new_block));

state
}

/// Creates a new benchmark state with the given account universe strategy and number of
Expand Down Expand Up @@ -168,36 +189,12 @@ impl TransactionBenchState {
.expect("VM should not fail to start");
}

fn with_size_parallel<S>(strategy: S, num_accounts: usize, num_transactions: usize) -> Self
where
S: Strategy,
S::Value: AUTransactionGen,
{
let mut state = Self::with_universe(
strategy,
universe_strategy(num_accounts, num_transactions),
num_transactions,
);
state.executor.enable_parallel_execution();

// Insert a blockmetadata transaction at the beginning to better simulate the real life traffic.
let validator_set =
ValidatorSet::fetch_config(&state.executor.get_state_view().as_move_resolver())
.expect("Unable to retrieve the validator set from storage");

let new_block = BlockMetadata::new(
HashValue::zero(),
0,
1,
vec![],
*validator_set.payload()[0].account_address(),
);

state
.transactions
.insert(0, Transaction::BlockMetadata(new_block));

state
/// Executes this state in a single block via parallel execution.
fn execute_parallel(self) {
// The output is ignored here since we're just testing transaction performance, not trying
// to assert correctness.
ParallelAptosVM::execute_block(self.transactions, self.executor.get_state_view())
.expect("VM should not fail to start");
}
}

Expand Down
12 changes: 6 additions & 6 deletions aptos-move/aptos-vm/src/parallel_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod vm_wrapper;
use crate::{
adapter_common::{preprocess_transaction, PreprocessedTransaction},
aptos_vm::AptosVM,
parallel_executor::vm_wrapper::DiemVMWrapper,
parallel_executor::vm_wrapper::AptosVMWrapper,
};
use aptos_parallel_executor::{
errors::Error,
Expand All @@ -30,9 +30,9 @@ impl PTransaction for PreprocessedTransaction {
}

// Wrapper to avoid orphan rule
pub(crate) struct DiemTransactionOutput(TransactionOutput);
pub(crate) struct AptosTransactionOutput(TransactionOutput);

impl DiemTransactionOutput {
impl AptosTransactionOutput {
pub fn new(output: TransactionOutput) -> Self {
Self(output)
}
Expand All @@ -41,7 +41,7 @@ impl DiemTransactionOutput {
}
}

impl PTransactionOutput for DiemTransactionOutput {
impl PTransactionOutput for AptosTransactionOutput {
type T = PreprocessedTransaction;

fn get_writes(&self) -> Vec<(StateKey, WriteOp)> {
Expand Down Expand Up @@ -74,13 +74,13 @@ impl ParallelAptosVM {
.map(|txn| preprocess_transaction::<AptosVM>(txn.clone()))
.collect();

match ParallelTransactionExecutor::<PreprocessedTransaction, DiemVMWrapper<S>>::new()
match ParallelTransactionExecutor::<PreprocessedTransaction, AptosVMWrapper<S>>::new()
.execute_transactions_parallel(state_view, signature_verified_block)
{
Ok(results) => Ok((
results
.into_iter()
.map(DiemTransactionOutput::into)
.map(AptosTransactionOutput::into)
.collect(),
None,
)),
Expand Down
5 changes: 2 additions & 3 deletions aptos-move/aptos-vm/src/parallel_executor/storage_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ impl<'a, S: StateView> StateView for VersionedView<'a, S> {
// Get some data either through the cache or the `StateView` on a cache miss.
fn get_state_value(&self, state_key: &StateKey) -> anyhow::Result<Option<Vec<u8>>> {
match self.hashmap_view.read(state_key) {
Ok(Some(v)) => Ok(match v.as_ref() {
Some(v) => Ok(match v.as_ref() {
WriteOp::Value(w) => Some(w.clone()),
WriteOp::Deletion => None,
}),
Ok(None) => self.base_view.get_state_value(state_key),
Err(err) => Err(err),
None => self.base_view.get_state_value(state_key),
}
}

Expand Down
31 changes: 18 additions & 13 deletions aptos-move/aptos-vm/src/parallel_executor/vm_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,47 @@ use crate::{
aptos_vm::AptosVM,
data_cache::RemoteStorage,
logging::AdapterLogSchema,
parallel_executor::{storage_wrapper::VersionedView, DiemTransactionOutput},
parallel_executor::{storage_wrapper::VersionedView, AptosTransactionOutput},
};
use aptos_logger::prelude::*;
use aptos_parallel_executor::{
executor::MVHashMapView,
task::{ExecutionStatus, ExecutorTask},
};
use aptos_state_view::StateView;
use aptos_types::{
account_config::DIEM_ACCOUNT_MODULE, state_store::state_key::StateKey, write_set::WriteOp,
use aptos_types::{state_store::state_key::StateKey, write_set::WriteOp};
use move_core_types::{
ident_str,
language_storage::{ModuleId, CORE_CODE_ADDRESS},
vm_status::VMStatus,
};
use move_core_types::vm_status::VMStatus;

pub(crate) struct DiemVMWrapper<'a, S> {
pub(crate) struct AptosVMWrapper<'a, S> {
vm: AptosVM,
base_view: &'a S,
}

impl<'a, S: 'a + StateView> ExecutorTask for DiemVMWrapper<'a, S> {
impl<'a, S: 'a + StateView> ExecutorTask for AptosVMWrapper<'a, S> {
type T = PreprocessedTransaction;
type Output = DiemTransactionOutput;
type Output = AptosTransactionOutput;
type Error = VMStatus;
type Argument = &'a S;

fn init(argument: &'a S) -> Self {
let vm = AptosVM::new(argument);

// Loading `0x1::DiemAccount` and its transitive dependency into the code cache.
// Loading `0x1::AptosAccount` and its transitive dependency into the code cache.
//
// This should give us a warm VM to avoid the overhead of VM cold start.
// Result of this load could be omitted as this is a best effort approach and won't hurt if that fails.
//
// Loading up `0x1::DiemAccount` should be sufficient as this is the most common module
// Loading up `0x1::AptosAccount` should be sufficient as this is the most common module
// used for prologue, epilogue and transfer functionality.

let _ = vm.load_module(&DIEM_ACCOUNT_MODULE, &RemoteStorage::new(argument));
let _ = vm.load_module(
&ModuleId::new(CORE_CODE_ADDRESS, ident_str!("AptosAccount").to_owned()),
&RemoteStorage::new(argument),
);

Self {
vm,
Expand All @@ -53,7 +58,7 @@ impl<'a, S: 'a + StateView> ExecutorTask for DiemVMWrapper<'a, S> {
&self,
view: &MVHashMapView<StateKey, WriteOp>,
txn: &PreprocessedTransaction,
) -> ExecutionStatus<DiemTransactionOutput, VMStatus> {
) -> ExecutionStatus<AptosTransactionOutput, VMStatus> {
let log_context = AdapterLogSchema::new(self.base_view.id(), view.txn_idx());
let versioned_view = VersionedView::new_view(self.base_view, view);

Expand All @@ -76,9 +81,9 @@ impl<'a, S: 'a + StateView> ExecutorTask for DiemVMWrapper<'a, S> {
};
}
if AptosVM::should_restart_execution(&output) {
ExecutionStatus::SkipRest(DiemTransactionOutput::new(output))
ExecutionStatus::SkipRest(AptosTransactionOutput::new(output))
} else {
ExecutionStatus::Success(DiemTransactionOutput::new(output))
ExecutionStatus::Success(AptosTransactionOutput::new(output))
}
}
Err(err) => ExecutionStatus::Abort(err),
Expand Down
41 changes: 1 addition & 40 deletions aptos-move/e2e-tests/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use aptos_types::{
access_path::AccessPath,
account_config::{AccountResource, BalanceResource, TransferEventsResource, CORE_CODE_ADDRESS},
block_metadata::{new_block_event_key, BlockMetadata, NewBlockEvent},
on_chain_config::{
OnChainConfig, ParallelExecutionConfig, VMPublishingOption, ValidatorSet, Version,
},
on_chain_config::{OnChainConfig, VMPublishingOption, ValidatorSet, Version},
state_store::state_key::StateKey,
transaction::{
ChangeSet, SignedTransaction, Transaction, TransactionOutput, TransactionStatus,
Expand All @@ -40,9 +38,6 @@ use aptos_vm::{
parallel_executor::ParallelAptosVM,
AptosVM, VMExecutor, VMValidator,
};
use aptos_writeset_generator::{
encode_disable_parallel_execution, encode_enable_parallel_execution_with_config,
};
use move_core_types::{
account_address::AccountAddress,
identifier::Identifier,
Expand Down Expand Up @@ -488,40 +483,6 @@ impl FakeExecutor {
self.apply_write_set(output.write_set());
}

pub fn enable_parallel_execution(&mut self) {
let aptos_root = Account::new_aptos_root();
let seq_num = self
.read_account_resource_at_address(aptos_root.address())
.unwrap()
.sequence_number();

let txn = aptos_root
.transaction()
.write_set(encode_enable_parallel_execution_with_config())
.sequence_number(seq_num)
.sign();
self.execute_and_apply(txn);
assert!(
ParallelExecutionConfig::fetch_config(&self.data_store.as_move_resolver()).is_some()
);
}

pub fn disable_parallel_execution(&mut self) {
if ParallelExecutionConfig::fetch_config(&self.data_store.as_move_resolver()).is_some() {
let aptos_root = Account::new_aptos_root();
let txn = aptos_root
.transaction()
.write_set(encode_disable_parallel_execution())
.sequence_number(
self.read_account_resource_at_address(aptos_root.address())
.unwrap()
.sequence_number(),
)
.sign();
self.execute_and_apply(txn);
}
}

fn module(name: &str) -> ModuleId {
ModuleId::new(CORE_CODE_ADDRESS, Identifier::new(name).unwrap())
}
Expand Down
Loading

0 comments on commit ceb0237

Please sign in to comment.