Skip to content

Commit

Permalink
Merge pull request ProvableHQ#2551 from niklaslong/e2e
Browse files Browse the repository at this point in the history
[Narwhal] set up BFT e2e test tooling
  • Loading branch information
niklaslong authored Jul 18, 2023
2 parents a5952b1 + 33dc96b commit b30013e
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 35 deletions.
2 changes: 1 addition & 1 deletion node/narwhal/src/helpers/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusRe
(sender, receiver)
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct BFTSender<N: Network> {
pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<Result<()>>)>,
pub tx_primary_certificate: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
Expand Down
125 changes: 125 additions & 0 deletions node/narwhal/tests/bft_e2e.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (C) 2019-2023 Aleo Systems Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod common;

use crate::common::primary::{TestNetwork, TestNetworkConfig};
use deadline::deadline;
use snarkos_node_narwhal::MAX_BATCH_DELAY;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::test(flavor = "multi_thread")]
#[ignore = "long-running e2e test"]
async fn test_state_coherence() {
const N: u16 = 4;
const CANNON_INTERVAL_MS: u64 = 10;

let mut network = TestNetwork::new(TestNetworkConfig {
num_nodes: N,
bft: true,
connect_all: true,
fire_cannons: Some(CANNON_INTERVAL_MS),
// Set this to Some(0..=4) to see the logs.
log_level: Some(0),
log_connections: true,
});

network.start().await;

std::future::pending::<()>().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_quorum_threshold() {
// Start N nodes but don't connect them.
const N: u16 = 4;
const CANNON_INTERVAL_MS: u64 = 10;

let mut network = TestNetwork::new(TestNetworkConfig {
num_nodes: N,
bft: true,
connect_all: false,
fire_cannons: None,
// Set this to Some(0..=4) to see the logs.
log_level: None,
log_connections: true,
});
network.start().await;

// Check each node is at round 1 (0 is genesis).
for validators in network.validators.values() {
assert_eq!(validators.primary.current_round(), 1);
}

// Start the cannons for node 0.
network.fire_cannons_at(0, CANNON_INTERVAL_MS);

sleep(Duration::from_millis(MAX_BATCH_DELAY * 2)).await;

// Check each node is still at round 1.
for validator in network.validators.values() {
assert_eq!(validator.primary.current_round(), 1);
}

// Connect the first two nodes and start the cannons for node 1.
network.connect_validators(0, 1).await;
network.fire_cannons_at(1, CANNON_INTERVAL_MS);

sleep(Duration::from_millis(MAX_BATCH_DELAY * 2)).await;

// Check each node is still at round 1.
for validator in network.validators.values() {
assert_eq!(validator.primary.current_round(), 1);
}

// Connect the third node and start the cannons for it.
network.connect_validators(0, 2).await;
network.connect_validators(1, 2).await;
network.fire_cannons_at(2, CANNON_INTERVAL_MS);

// Check the nodes reach quorum and advance through the rounds.
const TARGET_ROUND: u64 = 4;
deadline!(Duration::from_secs(20), move || { network.is_round_reached(TARGET_ROUND) });
}

#[tokio::test(flavor = "multi_thread")]
async fn test_quorum_break() {
// Start N nodes, connect them and start the cannons for each.
const N: u16 = 4;
const CANNON_INTERVAL_MS: u64 = 10;
let mut network = TestNetwork::new(TestNetworkConfig {
num_nodes: N,
bft: true,
connect_all: true,
fire_cannons: Some(CANNON_INTERVAL_MS),
// Set this to Some(0..=4) to see the logs.
log_level: None,
log_connections: true,
});
network.start().await;

// Check the nodes have started advancing through the rounds.
const TARGET_ROUND: u64 = 4;
// Note: cloning the network is fine because the primaries it wraps are `Arc`ed.
let network_clone = network.clone();
deadline!(Duration::from_secs(20), move || { network_clone.is_round_reached(TARGET_ROUND) });

// Break the quorum by disconnecting two nodes.
const NUM_NODES: u16 = 2;
network.disconnect(NUM_NODES).await;

// Check the nodes have stopped advancing through the rounds.
assert!(network.is_halted().await);
}
69 changes: 51 additions & 18 deletions node/narwhal/tests/common/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use snarkos_account::Account;
use snarkos_node_narwhal::{
helpers::{init_primary_channels, PrimarySender, Storage},
Primary,
BFT,
MAX_BATCH_DELAY,
MAX_GC_ROUNDS,
};
Expand All @@ -35,33 +36,53 @@ use parking_lot::Mutex;
use tokio::{task::JoinHandle, time::sleep};
use tracing::*;

/// The configuration for the test network.
#[derive(Clone, Copy, Debug)]
pub struct TestNetworkConfig {
/// The number of nodes to spin up.
pub num_nodes: u16,
/// If this is set to `true`, the BFT protocol is started on top of Narwhal.
pub bft: bool,
/// If this is set to `true`, all nodes are connected to each other (when they're first
/// started).
pub connect_all: bool,
pub fire_cannons: bool,
/// If `Some(i)` is set, the cannons will fire every `i` milliseconds.
pub fire_cannons: Option<u64>,
/// The log level to use for the test.
pub log_level: Option<u8>,
/// If this is set to `true`, the number of connections is logged every 5 seconds.
pub log_connections: bool,
}

/// A test network.
#[derive(Clone)]
pub struct TestNetwork {
/// The configuration for the test network.
pub config: TestNetworkConfig,
/// A map of node IDs to validators in the network.
pub validators: HashMap<u16, TestValidator>,
}

/// A test validator.
#[derive(Clone)]
pub struct TestValidator {
/// The ID of the validator.
pub id: u16,
/// The primary instance. When the BFT is enabled this is a clone of the BFT primary.
pub primary: Primary<CurrentNetwork>,
pub sender: Option<PrimarySender<CurrentNetwork>>,
/// The channel sender of the primary.
pub primary_sender: Option<PrimarySender<CurrentNetwork>>,
/// The BFT instance. This is only set if the BFT is enabled.
pub bft: Option<BFT<CurrentNetwork>>,
/// The tokio handles of all long-running tasks associated with the validator (incl. cannons).
pub handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
}

impl TestValidator {
pub fn fire_cannons(&mut self) {
let solution_handle = fire_unconfirmed_solutions(self.sender.as_mut().unwrap(), self.id);
let transaction_handle = fire_unconfirmed_transactions(self.sender.as_mut().unwrap(), self.id);
pub fn fire_cannons(&mut self, interval_ms: u64) {
let solution_handle = fire_unconfirmed_solutions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);
let transaction_handle =
fire_unconfirmed_transactions(self.primary_sender.as_mut().unwrap(), self.id, interval_ms);

self.handles.lock().push(solution_handle);
self.handles.lock().push(transaction_handle);
Expand All @@ -76,7 +97,7 @@ impl TestValidator {
for connection in connections {
debug!(" {}", connection);
}
sleep(Duration::from_secs(10)).await;
sleep(Duration::from_secs(5)).await;
}
}));
}
Expand All @@ -95,9 +116,17 @@ impl TestNetwork {
for (id, account) in accounts.into_iter().enumerate() {
let storage = Storage::new(committee.clone(), MAX_GC_ROUNDS);
let ledger = Arc::new(MockLedgerService::new());
let primary = Primary::<CurrentNetwork>::new(account, storage, ledger, None, Some(id as u16)).unwrap();

let test_validator = TestValidator { id: id as u16, primary, sender: None, handles: Default::default() };
let (primary, bft) = if config.bft {
let bft = BFT::<CurrentNetwork>::new(account, storage, ledger, None, Some(id as u16)).unwrap();
(bft.primary().clone(), Some(bft))
} else {
let primary = Primary::<CurrentNetwork>::new(account, storage, ledger, None, Some(id as u16)).unwrap();
(primary, None)
};

let test_validator =
TestValidator { id: id as u16, primary, primary_sender: None, bft, handles: Default::default() };
validators.insert(id as u16, test_validator);
}

Expand All @@ -107,13 +136,18 @@ impl TestNetwork {
// Starts each node in the network.
pub async fn start(&mut self) {
for validator in self.validators.values_mut() {
// Setup the channels and start the primary.
let (sender, receiver) = init_primary_channels();
validator.sender = Some(sender.clone());
validator.primary.run(sender.clone(), receiver, None).await.unwrap();
let (primary_sender, primary_receiver) = init_primary_channels();
validator.primary_sender = Some(primary_sender.clone());
if let Some(bft) = &mut validator.bft {
// Setup the channels and start the bft.
bft.run(primary_sender, primary_receiver, None).await.unwrap();
} else {
// Setup the channels and start the primary.
validator.primary.run(primary_sender, primary_receiver, None).await.unwrap();
}

if self.config.fire_cannons {
validator.fire_cannons();
if let Some(interval_ms) = self.config.fire_cannons {
validator.fire_cannons(interval_ms);
}

if self.config.log_connections {
Expand All @@ -127,8 +161,8 @@ impl TestNetwork {
}

// Starts the solution and trasnaction cannons for node.
pub fn fire_cannons_at(&mut self, id: u16) {
self.validators.get_mut(&id).unwrap().fire_cannons();
pub fn fire_cannons_at(&mut self, id: u16, interval_ms: u64) {
self.validators.get_mut(&id).unwrap().fire_cannons(interval_ms);
}

// Connects a node to another node.
Expand All @@ -147,7 +181,7 @@ impl TestNetwork {
let ip = other_validator.primary.gateway().local_ip();
validator.primary.gateway().connect(ip);
// Give the connection time to be established.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}

Expand Down Expand Up @@ -187,7 +221,6 @@ fn new_test_committee(n: u16) -> (Vec<Account<CurrentNetwork>>, Committee<Curren
// Sample the account.
let account = Account::new(&mut TestRng::fixed(i as u64)).unwrap();

// TODO(nkls): use tracing instead.
info!("Validator {}: {}", i, account.address());

members.insert(account.address(), INITIAL_STAKE);
Expand Down
16 changes: 12 additions & 4 deletions node/narwhal/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ pub fn initialize_logger(verbosity: u8) {
}

/// Fires *fake* unconfirmed solutions at the node.
pub fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_id: u16) -> JoinHandle<()> {
pub fn fire_unconfirmed_solutions(
sender: &PrimarySender<CurrentNetwork>,
node_id: u16,
interval_ms: u64,
) -> JoinHandle<()> {
let tx_unconfirmed_solution = sender.tx_unconfirmed_solution.clone();
tokio::task::spawn(async move {
// This RNG samples the *same* fake solutions for all nodes.
Expand Down Expand Up @@ -111,13 +115,17 @@ pub fn fire_unconfirmed_solutions(sender: &PrimarySender<CurrentNetwork>, node_i
// Increment the counter.
counter += 1;
// Sleep briefly.
sleep(Duration::from_millis(10)).await;
sleep(Duration::from_millis(interval_ms)).await;
}
})
}

/// Fires *fake* unconfirmed transactions at the node.
pub fn fire_unconfirmed_transactions(sender: &PrimarySender<CurrentNetwork>, node_id: u16) -> JoinHandle<()> {
pub fn fire_unconfirmed_transactions(
sender: &PrimarySender<CurrentNetwork>,
node_id: u16,
interval_ms: u64,
) -> JoinHandle<()> {
let tx_unconfirmed_transaction = sender.tx_unconfirmed_transaction.clone();
tokio::task::spawn(async move {
// This RNG samples the *same* fake transactions for all nodes.
Expand Down Expand Up @@ -155,7 +163,7 @@ pub fn fire_unconfirmed_transactions(sender: &PrimarySender<CurrentNetwork>, nod
// Increment the counter.
counter += 1;
// Sleep briefly.
sleep(Duration::from_millis(10)).await;
sleep(Duration::from_millis(interval_ms)).await;
}
})
}
Loading

0 comments on commit b30013e

Please sign in to comment.