Skip to content

Commit

Permalink
Merge #1356
Browse files Browse the repository at this point in the history
1356: Make load test scenarios execution parallel. r=popzxc a=alekseysidorov



Co-authored-by: Aleksei Sidorov <[email protected]>
Co-authored-by: Aleksey Sidorov <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2021
2 parents e562426 + 1f1e08b commit 66efe34
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 45 deletions.
18 changes: 14 additions & 4 deletions core/tests/loadtest/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,21 @@ impl LoadtestExecutor {

// Run scenarios concurrently.
let fees = self.fees.clone();
wait_all_failsafe(
"executor/process",
self.scenarios = wait_all_failsafe(
"executor/process_par",
self.scenarios
.iter_mut()
.map(|(scenario, wallets)| scenario.run(&monitor, &fees, wallets)),
.drain(..)
.map(move |(mut scenario, wallets)| {
let monitor = monitor.clone();
let fees = fees.clone();
async move {
tokio::spawn(async move {
let wallets = scenario.run(monitor, fees, wallets).await?;
Ok((scenario, wallets)) as anyhow::Result<_>
})
.await?
}
}),
)
.await?;
self.monitor.wait_for_verify().await;
Expand Down
15 changes: 7 additions & 8 deletions core/tests/loadtest/src/scenarios/batch_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::{Fees, Scenario, ScenarioResources};
use crate::{
monitor::Monitor,
test_wallet::TestWallet,
utils::{gwei_to_wei, wait_all_failsafe_chunks, DynamicChunks, CHUNK_SIZES},
utils::{foreach_failsafe, gwei_to_wei, wait_all_failsafe_chunks, DynamicChunks, CHUNK_SIZES},
};

/// Configuration options for the transfers scenario.
Expand Down Expand Up @@ -140,10 +140,10 @@ impl Scenario for BatchTransferScenario {

async fn run(
&mut self,
monitor: &Monitor,
_fees: &Fees,
_wallets: &[TestWallet],
) -> anyhow::Result<()> {
monitor: Monitor,
_fees: Fees,
wallets: Vec<TestWallet>,
) -> anyhow::Result<Vec<TestWallet>> {
let max_batch_size = self.max_batch_size;
let batch_sizes = std::iter::repeat_with(move || match thread_rng().gen_range(0, 3) {
0 => 2,
Expand All @@ -153,14 +153,13 @@ impl Scenario for BatchTransferScenario {
});

let txs = self.txs.drain(..);
wait_all_failsafe_chunks(
foreach_failsafe(
"run/batch_transfers",
&[1, 2, 3, 2, 1],
DynamicChunks::new(txs, batch_sizes).map(|txs| monitor.send_txs_batch(txs)),
)
.await?;

Ok(())
Ok(wallets)
}

async fn finalize(
Expand Down
16 changes: 8 additions & 8 deletions core/tests/loadtest/src/scenarios/full_exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,21 @@ impl Scenario for FullExitScenario {

async fn run(
&mut self,
monitor: &Monitor,
fees: &Fees,
wallets: &[TestWallet],
) -> anyhow::Result<()> {
monitor: Monitor,
fees: Fees,
wallets: Vec<TestWallet>,
) -> anyhow::Result<Vec<TestWallet>> {
vlog::info!("Full exit and deposit cycle started");

let futures = wallets
let full_exit_task = wallets
.iter()
.map(|wallet| Self::full_exit_and_deposit(monitor, fees, wallet))
.map(|wallet| Self::full_exit_and_deposit(&monitor, &fees, wallet))
.collect::<Vec<_>>();
wait_all_failsafe("full_exit/run", futures).await?;
wait_all_failsafe("full_exit/run", full_exit_task).await?;

vlog::info!("Full exit scenario has been finished");

Ok(())
Ok(wallets)
}

async fn finalize(
Expand Down
10 changes: 5 additions & 5 deletions core/tests/loadtest/src/scenarios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct Fees {

/// Describes the general steps of a load test scenario.
#[async_trait]
pub trait Scenario: Debug + Display {
pub trait Scenario: Debug + Display + Send + Sync + 'static {
/// Returns resources that should be provided by the scenario executor.
fn requested_resources(&self, fees: &Fees) -> ScenarioResources;

Expand All @@ -69,10 +69,10 @@ pub trait Scenario: Debug + Display {
/// Runs main scenario routine with the enabled load monitor.
async fn run(
&mut self,
monitor: &Monitor,
fees: &Fees,
wallets: &[TestWallet],
) -> anyhow::Result<()>;
monitor: Monitor,
fees: Fees,
wallets: Vec<TestWallet>,
) -> anyhow::Result<Vec<TestWallet>>;

/// Performs actions after running the main scenario, for example, it can
/// return the funds to the specified wallets.
Expand Down
15 changes: 7 additions & 8 deletions core/tests/loadtest/src/scenarios/transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{Fees, Scenario, ScenarioResources};
use crate::{
monitor::Monitor,
test_wallet::TestWallet,
utils::{gwei_to_wei, wait_all_failsafe_chunks, CHUNK_SIZES},
utils::{foreach_failsafe, gwei_to_wei, wait_all_failsafe_chunks, CHUNK_SIZES},
};

/// Configuration options for the transfers scenario.
Expand Down Expand Up @@ -132,20 +132,19 @@ impl Scenario for TransferScenario {

async fn run(
&mut self,
monitor: &Monitor,
_fees: &Fees,
_wallets: &[TestWallet],
) -> anyhow::Result<()> {
wait_all_failsafe_chunks(
monitor: Monitor,
_fees: Fees,
wallets: Vec<TestWallet>,
) -> anyhow::Result<Vec<TestWallet>> {
foreach_failsafe(
"run/transfers",
CHUNK_SIZES,
self.txs
.drain(..)
.map(|(tx, sign)| monitor.send_tx(tx, sign)),
)
.await?;

Ok(())
Ok(wallets)
}

async fn finalize(
Expand Down
12 changes: 6 additions & 6 deletions core/tests/loadtest/src/scenarios/withdraw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ impl Scenario for WithdrawScenario {

async fn run(
&mut self,
monitor: &Monitor,
fees: &Fees,
wallets: &[TestWallet],
) -> anyhow::Result<()> {
monitor: Monitor,
fees: Fees,
wallets: Vec<TestWallet>,
) -> anyhow::Result<Vec<TestWallet>> {
for i in 0..self.config.withdraw_rounds {
vlog::info!(
"Withdraw and deposit cycle [{}/{}] started",
Expand All @@ -87,7 +87,7 @@ impl Scenario for WithdrawScenario {

let futures = wallets
.iter()
.map(|wallet| Self::withdraw_and_deposit(monitor, fees, wallet))
.map(|wallet| Self::withdraw_and_deposit(&monitor, &fees, wallet))
.collect::<Vec<_>>();
wait_all_failsafe(&format!("withdraw/run/cycle/{}", i), futures).await?;

Expand All @@ -100,7 +100,7 @@ impl Scenario for WithdrawScenario {

vlog::info!("Withdraw scenario has been finished");

Ok(())
Ok(wallets)
}

async fn finalize(
Expand Down
58 changes: 52 additions & 6 deletions core/tests/loadtest/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ pub fn gwei_to_wei(gwei: impl Into<BigUint>) -> BigUint {
///
/// But unlike the `futures::future::join_all` method, it performs futures in chunks
/// to reduce descriptors usage.
pub async fn wait_all_chunks<I>(chunk_sizes: &[usize], i: I) -> Vec<<I::Item as Future>::Output>
pub async fn wait_all_chunks<I>(chunk_sizes: &[usize], iter: I) -> Vec<<I::Item as Future>::Output>
where
I: IntoIterator,
I::Item: Future,
{
let mut output = Vec::new();
for chunk in DynamicChunks::with_sizes(i, chunk_sizes) {
for chunk in DynamicChunks::with_sizes(iter, chunk_sizes) {
let values = futures::future::join_all(chunk).await;
output.extend(values);
}
Expand All @@ -46,7 +46,7 @@ where
/// futures.
pub async fn wait_all_failsafe<I>(
category: &str,
i: I,
iter: I,
) -> Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>
where
I: IntoIterator,
Expand All @@ -58,7 +58,7 @@ where
let mut oks = Vec::new();
let mut errs = Vec::new();

let output = futures::future::join_all(i).await;
let output = futures::future::join_all(iter).await;
for item in output {
match item.into() {
Ok(ok) => oks.push(ok),
Expand Down Expand Up @@ -93,7 +93,7 @@ where
pub async fn wait_all_failsafe_chunks<I>(
category: &str,
chunk_sizes: &[usize],
i: I,
iter: I,
) -> Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>
where
I: IntoIterator,
Expand All @@ -104,7 +104,7 @@ where
{
let mut oks = Vec::new();
let mut errs = Vec::new();
for chunk in DynamicChunks::with_sizes(i, chunk_sizes) {
for chunk in DynamicChunks::with_sizes(iter, chunk_sizes) {
let output = futures::future::join_all(chunk).await;
for item in output {
match item.into() {
Expand Down Expand Up @@ -133,6 +133,52 @@ where
Ok(oks)
}

/// Creates a future which represents either a collection of the results of the
/// futures given or an error.
///
/// But unlike the `try_wait_all_failsafe` method, it performs futures one by one
/// without concurrency.
pub async fn foreach_failsafe<I>(
category: &str,
iter: I,
) -> Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>
where
I: IntoIterator,
I::Item: TryFuture,
<I::Item as Future>::Output:
Into<Result<<I::Item as TryFuture>::Ok, <I::Item as TryFuture>::Error>>,
<I::Item as TryFuture>::Error: std::fmt::Display,
{
let mut oks = Vec::new();
let mut errs = Vec::new();

for item in iter.into_iter() {
match item.await.into() {
Ok(ok) => oks.push(ok),
Err(err) => {
save_error(category, &err);
errs.push(err)
}
}
}

if oks.is_empty() {
match errs.into_iter().next() {
Some(err) => return Err(err),
None => return Ok(Vec::new()),
}
} else if errs.len() > ERRORS_CUTOFF {
vlog::warn!(
"During the `{}` execution {} out of total {} actions erred.",
category,
errs.len(),
errs.len() + oks.len(),
);
}

Ok(oks)
}

type ChunksIter = dyn Iterator<Item = usize> + Send + 'static;

/// An iterator similar to `.iter().chunks(..)`, but supporting multiple
Expand Down

0 comments on commit 66efe34

Please sign in to comment.