Skip to content

Commit

Permalink
[mempool] Make tests more generic for MempoolRpc
Browse files Browse the repository at this point in the history
  • Loading branch information
gregnazario authored and bors-libra committed Nov 13, 2021
1 parent b432a3a commit 4ec6e12
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 142 deletions.
303 changes: 175 additions & 128 deletions mempool/src/tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,106 +1,140 @@
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::tests::test_framework::{test_transactions, MempoolTestFramework};
use diem_config::{
config::PeerRole,
network_id::{NetworkId, PeerNetworkId},
};
use diem_types::PeerId;
use crate::tests::test_framework::{test_transactions, MempoolNode, MempoolTestFrameworkBuilder};
use diem_config::network_id::PeerNetworkId;
use netcore::transport::ConnectionOrigin;
use network::{
testutils::{
builder::TestFrameworkBuilder,
test_framework::TestFramework,
test_node::{mock_conn_metadata, ApplicationNode, NodeId, TestNode},
test_node::{
pfn_pfn_mock_connection, pfn_vfn_mock_connection, validator_mock_connection,
vfn_validator_mock_connection, vfn_vfn_mock_connection, NodeId, TestNode,
},
},
transport::ConnectionMetadata,
ProtocolId,
};

#[tokio::test]
async fn single_node_test() {
let mut test_framework: MempoolTestFramework =
TestFrameworkBuilder::new(1).add_validator(0).build();
let mut node = test_framework.take_node(NodeId::validator(0));
let network_id = NetworkId::Validator;
let other_peer_network_id = PeerNetworkId::new(network_id, PeerId::random());
let other_metadata = mock_conn_metadata(
other_peer_network_id,
PeerRole::Validator,
ConnectionOrigin::Outbound,
&[],
);
let all_txns = test_transactions(0, 3);
let all_txns = all_txns.as_slice();
let inbound_handle = node.get_inbound_handle(network_id);
node.assert_txns_not_in_mempool(&all_txns[0..1]);
node.add_txns_via_client(&all_txns[0..1]).await;

// After we connect, we should try to send messages to it
inbound_handle.connect(
node.node_id().role(),
node.peer_network_id(network_id),
other_metadata,
);

// Respond and at this point, txn will have shown up
node.verify_broadcast_and_ack(other_peer_network_id, &all_txns[0..1])
.await;
const ALL_PROTOCOLS: [ProtocolId; 1] = [ProtocolId::MempoolDirectSend];

fn inbound_node_combinations() -> [(MempoolNode, (PeerNetworkId, ConnectionMetadata)); 6] {
[
(
MempoolTestFrameworkBuilder::single_validator(),
validator_mock_connection(ConnectionOrigin::Inbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_validator(),
validator_mock_connection(ConnectionOrigin::Outbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_validator(),
vfn_validator_mock_connection(ConnectionOrigin::Inbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_vfn(),
vfn_vfn_mock_connection(ConnectionOrigin::Inbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_vfn(),
pfn_vfn_mock_connection(ConnectionOrigin::Inbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_pfn(),
pfn_pfn_mock_connection(ConnectionOrigin::Inbound, &ALL_PROTOCOLS),
),
]
}

// Now submit another txn and check
node.add_txns_via_client(&all_txns[1..2]).await;
node.assert_only_txns_in_mempool(&all_txns[0..2]);
node.verify_broadcast_and_ack(other_peer_network_id, &all_txns[1..2])
fn outbound_node_combinations() -> [(MempoolNode, (PeerNetworkId, ConnectionMetadata)); 6] {
[
(
MempoolTestFrameworkBuilder::single_validator(),
validator_mock_connection(ConnectionOrigin::Inbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_validator(),
validator_mock_connection(ConnectionOrigin::Outbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_vfn(),
vfn_validator_mock_connection(ConnectionOrigin::Outbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_vfn(),
vfn_vfn_mock_connection(ConnectionOrigin::Outbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_pfn(),
pfn_vfn_mock_connection(ConnectionOrigin::Outbound, &ALL_PROTOCOLS),
),
(
MempoolTestFrameworkBuilder::single_pfn(),
pfn_pfn_mock_connection(ConnectionOrigin::Outbound, &ALL_PROTOCOLS),
),
]
}

/// Tests all possible inbound "downstream" peers
#[tokio::test]
async fn single_inbound_node_test() {
for (mut node, (other_peer_network_id, other_metadata)) in inbound_node_combinations() {
let all_txns = test_transactions(0, 2);
let all_txns = all_txns.as_slice();
node.connect_self(other_peer_network_id.network_id(), other_metadata);

// Let's also send it an incoming request with more txns and respond with an ack (DirectSend & Rpc)
node.send_message(
ProtocolId::MempoolDirectSend,
other_peer_network_id,
&all_txns[0..1],
)
.await;
node.assert_only_txns_in_mempool(&all_txns[0..1]);
}
}

// Let's also send it an incoming request with more txns and respond with an ack (DirectSend)
node.send_message(
ProtocolId::MempoolDirectSend,
other_peer_network_id,
&all_txns[2..3],
)
.await;
node.assert_only_txns_in_mempool(&all_txns[0..3]);
node.commit_txns(&all_txns[0..3]);
node.assert_txns_not_in_mempool(&all_txns[0..3]);
/// Tests all possible outbound "upstream" peers
#[tokio::test]
async fn single_outbound_node_test() {
for (mut node, (other_peer_network_id, other_metadata)) in outbound_node_combinations() {
let all_txns = test_transactions(0, 2);
let all_txns = all_txns.as_slice();

// Add transactions
node.assert_txns_not_in_mempool(&all_txns[0..1]);
node.add_txns_via_client(&all_txns[0..1]).await;

// After we connect, all messages should be received and broadcast upstream
node.connect_self(other_peer_network_id.network_id(), other_metadata);
node.verify_broadcast_and_ack(other_peer_network_id, &all_txns[0..1])
.await;
node.assert_only_txns_in_mempool(&all_txns[0..1]);

// Adding more txns should also broadcast them upstream
node.add_txns_via_client(&all_txns[1..2]).await;
node.verify_broadcast_and_ack(other_peer_network_id, &all_txns[1..2])
.await;
node.assert_only_txns_in_mempool(&all_txns[0..2]);
}
}

/// Tests if the node is a VFN, and it's getting forwarded messages from a PFN. It should forward
/// messages to the upstream VAL. Upstream and downstream nodes are mocked.
#[tokio::test]
async fn vfn_middle_man_test() {
let mut test_framework: MempoolTestFramework = TestFrameworkBuilder::new(1).add_vfn(0).build();
let mut node = test_framework.take_node(NodeId::vfn(0));
let validator_peer_network_id = PeerNetworkId::new(NetworkId::Vfn, PeerId::random());
let validator_metadata = mock_conn_metadata(
validator_peer_network_id,
PeerRole::Validator,
ConnectionOrigin::Outbound,
&[],
);

let fn_peer_network_id = PeerNetworkId::new(NetworkId::Vfn, PeerId::random());
let fn_metadata = mock_conn_metadata(
fn_peer_network_id,
PeerRole::Unknown,
ConnectionOrigin::Inbound,
&[],
);
let mut node = MempoolTestFrameworkBuilder::single_vfn();
let (validator_peer_network_id, validator_metadata) =
vfn_validator_mock_connection(ConnectionOrigin::Outbound, &ALL_PROTOCOLS);

let (fn_peer_network_id, fn_metadata) =
pfn_vfn_mock_connection(ConnectionOrigin::Inbound, &ALL_PROTOCOLS);

let test_txns = test_transactions(0, 2);
let inbound_handle = node.get_inbound_handle(NetworkId::Vfn);
// Connect upstream Validator and downstream FN
inbound_handle.connect(
node.node_id().role(),
node.peer_network_id(NetworkId::Vfn),
validator_metadata,
);
let inbound_handle = node.get_inbound_handle(NetworkId::Public);
inbound_handle.connect(
node.node_id().role(),
node.peer_network_id(NetworkId::Public),
fn_metadata,
);
node.connect_self(validator_peer_network_id.network_id(), validator_metadata);
node.connect_self(fn_peer_network_id.network_id(), fn_metadata);

// Incoming transactions should be accepted
node.send_message(
Expand All @@ -116,56 +150,69 @@ async fn vfn_middle_man_test() {
.await;
}

// -- Multi node tests below here --

/// Tests if the node is a VFN, and it's getting forwarded messages from a PFN. It should forward
/// messages to the upstream VAL. Upstream and downstream nodes also are running nodes.
#[tokio::test]
async fn fn_to_val_test() {
let mut test_framework: MempoolTestFramework = TestFrameworkBuilder::new(1)
.add_validator(0)
.add_vfn(0)
.add_pfn(0)
.build();

let mut val = test_framework.take_node(NodeId::validator(0));
let mut vfn = test_framework.take_node(NodeId::vfn(0));
let mut pfn = test_framework.take_node(NodeId::pfn(0));
let pfn_txns = test_transactions(0, 3);
let vfn_txns = pfn_txns.clone();
let val_txns = pfn_txns.clone();

let pfn_vfn_network = pfn.find_common_network(&vfn).unwrap();
let vfn_metadata = vfn.conn_metadata(pfn_vfn_network, ConnectionOrigin::Outbound, &[]);
let vfn_val_network = vfn.find_common_network(&val).unwrap();
let val_metadata = val.conn_metadata(vfn_val_network, ConnectionOrigin::Outbound, &[]);

// NOTE: Always return node at end, or it will be dropped and channels closed
let pfn_future = async move {
pfn.connect(pfn_vfn_network, vfn_metadata);
pfn.add_txns_via_client(&pfn_txns).await;
pfn.assert_only_txns_in_mempool(&pfn_txns);
// Forward to VFN
pfn.send_next_network_msg(pfn_vfn_network).await;
pfn
};

let vfn_future = async move {
vfn.connect(vfn_val_network, val_metadata);

// Respond to PFN
vfn.send_next_network_msg(pfn_vfn_network).await;
vfn.assert_only_txns_in_mempool(&vfn_txns);

// Forward to VAL
vfn.send_next_network_msg(vfn_val_network).await;
vfn
};

let val_future = async move {
// Respond to VFN
val.send_next_network_msg(vfn_val_network).await;
val.assert_only_txns_in_mempool(&val_txns);
val
};

futures::future::join3(pfn_future, vfn_future, val_future).await;
for protocol_id in ALL_PROTOCOLS {
let mut test_framework = MempoolTestFrameworkBuilder::new(1)
.add_validator(0)
.add_vfn(0)
.add_pfn(0)
.build();

let mut val = test_framework.take_node(NodeId::validator(0));
let mut vfn = test_framework.take_node(NodeId::vfn(0));
let mut pfn = test_framework.take_node(NodeId::pfn(0));
let test_txns = test_transactions(0, 3);
let pfn_txns = test_txns.clone();
let val_txns = test_txns.clone();

let pfn_vfn_network = pfn.find_common_network(&vfn).unwrap();
let vfn_metadata =
vfn.conn_metadata(pfn_vfn_network, ConnectionOrigin::Outbound, &[protocol_id]);
let vfn_val_network = vfn.find_common_network(&val).unwrap();
let val_metadata =
val.conn_metadata(vfn_val_network, ConnectionOrigin::Outbound, &[protocol_id]);

// NOTE: Always return node at end, or it will be dropped and channels closed
let pfn_future = async move {
pfn.connect(pfn_vfn_network, vfn_metadata);
pfn.add_txns_via_client(&pfn_txns).await;

// Forward to VFN
pfn.send_next_network_msg(pfn_vfn_network).await;
pfn
};

let vfn_future = async move {
vfn.connect(vfn_val_network, val_metadata);

// Respond to PFN (RPC doesn't need to do this)
if protocol_id == ProtocolId::MempoolDirectSend {
vfn.send_next_network_msg(pfn_vfn_network).await;
}

// Forward to VAL
vfn.send_next_network_msg(vfn_val_network).await;
vfn
};

let val_future = async move {
// Respond to VFN (RPC doesn't need to do this)
if protocol_id == ProtocolId::MempoolDirectSend {
val.send_next_network_msg(vfn_val_network).await;
}

val.wait_on_txns_in_mempool(&val_txns).await;
val
};

let (pfn, vfn, val) = futures::future::join3(pfn_future, vfn_future, val_future).await;
pfn.assert_only_txns_in_mempool(&test_txns);
vfn.assert_only_txns_in_mempool(&test_txns);
val.assert_only_txns_in_mempool(&test_txns);
}
}
28 changes: 14 additions & 14 deletions mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ use network::{
peer_manager::{PeerManagerNotification, PeerManagerRequest},
protocols::{direct_send::Message, rpc::InboundRpcRequest},
testutils::{
builder::TestFrameworkBuilder,
test_framework::{setup_node_networks, TestFramework},
test_node::{
ApplicationNetworkHandle, ApplicationNode, InboundNetworkHandle, NodeId, NodeType,
ApplicationNetworkHandle, ApplicationNode, InboundNetworkHandle, NodeId,
OutboundMessageReceiver, TestNode,
},
},
Expand All @@ -39,7 +40,7 @@ use std::{
sync::Arc,
};
use storage_interface::{mock::MockDbReaderWriter, DbReaderWriter};
use tokio::runtime::Handle;
use tokio::{runtime::Handle, time::Duration};
use tokio_stream::StreamExt;
use vm_validator::mocks::mock_vm_validator::MockVMValidator;

Expand Down Expand Up @@ -150,19 +151,16 @@ impl MempoolNode {
self.assert_txns_in_mempool(txns);
}

/// Commits transactions and removes them from the local mempool, stops them from being broadcasted later
pub fn commit_txns(&self, txns: &[TestTransaction]) {
if NodeType::Validator == self.node_id.node_type {
let mut mempool = self.mempool.lock();
for txn in txns.iter() {
mempool.remove_transaction(
&TestTransaction::get_address(txn.address),
txn.sequence_number,
false,
);
/// Asynchronously waits for up to 1 second for txns to appear in mempool
pub async fn wait_on_txns_in_mempool(&self, txns: &[TestTransaction]) {
for _ in 0..10 {
let block = self.mempool.lock().get_block(100, HashSet::new());

if block_contains_all_transactions(&block, txns) {
break;
}
} else {
panic!("Can't commit transactions on anything but a validator");

tokio::time::sleep(Duration::from_millis(100)).await
}
}

Expand Down Expand Up @@ -376,6 +374,8 @@ impl MempoolNode {

impl TestNode for MempoolNode {}

pub type MempoolTestFrameworkBuilder = TestFrameworkBuilder<MempoolTestFramework, MempoolNode>;

/// A [`TestFramework`] for [`MempoolNode`]s to test Mempool in a single and multi-node mock network
/// environment.
pub struct MempoolTestFramework {
Expand Down

0 comments on commit 4ec6e12

Please sign in to comment.