Skip to content

Commit

Permalink
[test on forge] Add state sync catching up stress tests (aptos-labs#4036
Browse files Browse the repository at this point in the history
)
  • Loading branch information
igor-aptos authored Sep 30, 2022
1 parent 2a8f88d commit 9e14a63
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Continuous E2E State Sync Failures Catching Up Test

permissions:
issues: write
pull-requests: write

on:
workflow_dispatch:
schedule:
- cron: "0 */3 * * *"

jobs:
### Please remember to use different namespace for different tests
run-state-sync-failures-catching-up-test:
uses: ./.github/workflows/run-forge.yaml
secrets: inherit
with:
FORGE_NAMESPACE: forge-state-sync-failures-catching-up-test
FORGE_CLUSTER_NAME: aptos-forge-big-1
FORGE_RUNNER_DURATION_SECS: 900
FORGE_TEST_SUITE: failures_catching_up
POST_TO_SLACK: true
FORGE_ENABLE_FAILPOINTS: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Continuous E2E State Sync Slow Processing Catching Up Test

permissions:
issues: write
pull-requests: write

on:
workflow_dispatch:
schedule:
- cron: "0 */3 * * *"

jobs:
### Please remember to use different namespace for different tests
run-state-sync-slow-processing-catching-up-test:
uses: ./.github/workflows/run-forge.yaml
secrets: inherit
with:
FORGE_NAMESPACE: forge-state-sync-slow-processing-catching-up-test
FORGE_CLUSTER_NAME: aptos-forge-big-1
FORGE_RUNNER_DURATION_SECS: 900
FORGE_TEST_SUITE: slow_processing_catching_up
POST_TO_SLACK: true
FORGE_ENABLE_FAILPOINTS: true
44 changes: 29 additions & 15 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
// to test different timings)
check_period_s: 27,
},
false,
),
"changing_working_quorum_test" => changing_working_quorum_test(
20,
Expand All @@ -634,7 +633,6 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
// to test different timings)
check_period_s: 27,
},
false,
),
"changing_working_quorum_test_high_load" => changing_working_quorum_test(
20,
Expand All @@ -653,9 +651,6 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
// to test different timings)
check_period_s: 27,
},
// Max load cannot be sustained without gaps in progress.
// Using high load instead.
false,
),
// not scheduled on continuous
"large_test_only_few_nodes_down" => changing_working_quorum_test(
Expand All @@ -671,7 +666,6 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
add_execution_delay: false,
check_period_s: 27,
},
false,
),
"different_node_speed_and_reliability_test" => changing_working_quorum_test(
20,
Expand All @@ -686,7 +680,34 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
add_execution_delay: true,
check_period_s: 27,
},
false,
),
"slow_processing_catching_up" => changing_working_quorum_test(
10,
300,
3000,
2500,
&ChangingWorkingQuorumTest {
min_tps: 1500,
always_healthy_nodes: 2,
max_down_nodes: 0,
num_large_validators: 2,
add_execution_delay: true,
check_period_s: 57,
},
),
"failures_catching_up" => changing_working_quorum_test(
10,
300,
3000,
2500,
&ChangingWorkingQuorumTest {
min_tps: 1500,
always_healthy_nodes: 2,
max_down_nodes: 1,
num_large_validators: 2,
add_execution_delay: false,
check_period_s: 27,
},
),
"twin_validator_test" => config
.with_network_tests(vec![&TwinValidatorTest])
Expand Down Expand Up @@ -879,7 +900,6 @@ fn changing_working_quorum_test(
target_tps: usize,
min_avg_tps: usize,
test: &'static ChangingWorkingQuorumTest,
max_load: bool,
) -> ForgeConfig<'static> {
let config = ForgeConfig::default();
let num_large_validators = test.num_large_validators;
Expand Down Expand Up @@ -910,13 +930,7 @@ fn changing_working_quorum_test(
}))
.with_emit_job(
EmitJobRequest::default()
.mode(if max_load {
EmitJobMode::MaxLoad {
mempool_backlog: 20000,
}
} else {
EmitJobMode::ConstTps { tps: target_tps }
})
.mode(EmitJobMode::ConstTps { tps: target_tps })
.transaction_mix(vec![
(TransactionType::P2P, 80),
(TransactionType::AccountGeneration, 20),
Expand Down
17 changes: 7 additions & 10 deletions testsuite/forge/src/backend/k8s/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,31 +287,27 @@ impl Swarm for K8sSwarm {

async fn ensure_no_validator_restart(&self) -> Result<()> {
for validator in &self.validators {
if let Err(e) = check_for_container_restart(
check_for_container_restart(
&self.kube_client,
&self.kube_namespace.clone(),
validator.1.stateful_set_name(),
)
.await
{
return Err(e);
}
.await?;
}
info!("Found no validator restarts");
Ok(())
}

async fn ensure_no_fullnode_restart(&self) -> Result<()> {
for fullnode in &self.fullnodes {
if let Err(e) = check_for_container_restart(
check_for_container_restart(
&self.kube_client,
&self.kube_namespace.clone(),
fullnode.1.stateful_set_name(),
)
.await
{
return Err(e);
}
.await?;
}
info!("Found no fullnode restarts");
Ok(())
}

Expand Down Expand Up @@ -345,6 +341,7 @@ impl Swarm for K8sSwarm {
)
.await?;
threshold.ensure_threshold(&system_metrics)?;
info!("System metrics are healthy");
Ok(())
} else {
bail!("No prom client");
Expand Down
30 changes: 28 additions & 2 deletions testsuite/forge/src/interface/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_config::config::NodeConfig;
use aptos_logger::info;
use aptos_rest_client::Client as RestClient;
use aptos_sdk::types::PeerId;
use futures::future::try_join_all;
use futures::future::{join_all, try_join_all};
use prometheus_http_query::response::PromqlResult;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -296,6 +296,31 @@ pub trait SwarmExt: Swarm {
})
.collect()
}

async fn get_client_with_newest_ledger_version(&self) -> Option<(u64, RestClient)> {
let clients = self.get_all_nodes_clients_with_names();
let ledger_infos = join_all(clients.iter().map(|(_name, client)| async {
let start = Instant::now();
let result = client.get_ledger_information().await;

info!(
"Fetch from {:?} took {}ms, at version: {}",
client.path_prefix_string(),
start.elapsed().as_millis(),
result
.as_ref()
.map(|r| r.inner().version as i64)
.unwrap_or(-1)
);
result
}))
.await;
ledger_infos
.into_iter()
.zip(clients.into_iter())
.flat_map(|(resp, (_, client))| resp.map(|r| (r.into_inner().version, client)))
.max_by_key(|(v, _c)| *v)
}
}

/// Waits for all nodes to have caught up to the specified `verison`.
Expand Down Expand Up @@ -330,8 +355,9 @@ pub async fn wait_for_all_nodes_to_catchup_to_version(

if start_time.elapsed() > timeout {
return Err(anyhow!(
"Waiting for nodes to catch up to version {} timed out, current status: {:?}",
"Waiting for nodes to catch up to version {} timed out after {}s, current status: {:?}",
version,
start_time.elapsed().as_secs(),
versions.unwrap_or_default()
));
}
Expand Down
38 changes: 19 additions & 19 deletions testsuite/forge/src/success_criteria.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use anyhow::bail;
use anyhow::{bail, Context};
use aptos::node::analyze::fetch_metadata::FetchMetadata;
use aptos_sdk::types::PeerId;
use futures::future::join_all;
use serde::Serialize;
use std::time::Duration;
use transaction_emitter_lib::emitter::stats::TxnStats;
Expand Down Expand Up @@ -69,12 +68,21 @@ impl SuccessCriteria {
}

if let Some(timeout) = self.wait_for_all_nodes_to_catchup {
swarm.wait_for_all_nodes_to_catchup_to_next(timeout).await?;
swarm
.wait_for_all_nodes_to_catchup_to_next(timeout)
.await
.context("Failed waiting for all nodes to catchup to next version")?;
}

if self.check_no_restarts {
swarm.ensure_no_validator_restart().await?;
swarm.ensure_no_fullnode_restart().await?;
swarm
.ensure_no_validator_restart()
.await
.context("Failed ensuring no validator restarted")?;
swarm
.ensure_no_fullnode_restart()
.await
.context("Failed ensuring no fullnode restarted")?;
}

// TODO(skedia) Add latency success criteria after we have support for querying prometheus
Expand All @@ -92,7 +100,8 @@ impl SuccessCriteria {

if let Some(chain_progress_threshold) = &self.chain_progress_check {
self.check_chain_progress(swarm, chain_progress_threshold, start_version, end_version)
.await?;
.await
.context("Failed check chain progress")?;
}

Ok(())
Expand All @@ -106,19 +115,10 @@ impl SuccessCriteria {
end_version: u64,
) -> anyhow::Result<()> {
// Choose client with newest ledger version to fetch NewBlockEvents from:
let clients = swarm.get_all_nodes_clients_with_names();
let ledger_infos = join_all(
clients
.iter()
.map(|(_name, client)| client.get_ledger_information()),
)
.await;
let (_max_v, client) = ledger_infos
.into_iter()
.zip(clients.into_iter())
.flat_map(|(resp, (_, client))| resp.map(|r| (r.into_inner().version, client)))
.max_by_key(|(v, _c)| *v)
.unwrap();
let (_max_v, client) = swarm
.get_client_with_newest_ledger_version()
.await
.context("No clients replied in check_chain_progress")?;

let epochs = FetchMetadata::fetch_new_block_events(&client, None, None)
.await
Expand Down
21 changes: 7 additions & 14 deletions testsuite/forge/src/test_utils/consensus_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, bail, Context, Result};
use anyhow::{bail, Context, Result};
use aptos_rest_client::Client as RestClient;
use async_trait::async_trait;
use chrono::Utc;
Expand Down Expand Up @@ -159,32 +159,25 @@ pub async fn test_consensus_fault_tolerance(
let target_v = largest_v + 10;

wait_for_all_nodes_to_catchup_to_version(&validator_clients, target_v, Duration::from_secs(30))
.await?;
.await
.context("catchup failed")?;

let transactions: Vec<_> =
join_all(validator_clients.iter().cloned().map(move |v| async move {
let mut txns =
v.1.get_transactions_bcs(Some(target_v.saturating_sub(1000)), Some(1000))
.await
.map_err(|e| anyhow!("{:?}", e))?
.unwrap()
.into_inner();
txns.retain(|t| t.version <= target_v);
<anyhow::Result<Vec<_>>>::Ok(txns)
<Result<Vec<_>>>::Ok(txns)
}))
.await;

let txns_a = transactions
.first()
.unwrap()
.as_ref()
.map_err(|e| anyhow!("{:?}", &e))?;
let txns_a = transactions.first().unwrap().as_ref().unwrap();

for i in 1..transactions.len() {
let txns_b = transactions
.get(i)
.unwrap()
.as_ref()
.map_err(|e| anyhow!("{:?}", &e))?;
let txns_b = transactions.get(i).unwrap().as_ref().unwrap();
assert_eq!(
txns_a.len(),
txns_b.len(),
Expand Down
Loading

0 comments on commit 9e14a63

Please sign in to comment.