Skip to content

Commit

Permalink
[mempool] Remove older flaky integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gregnazario authored and bors-libra committed Nov 25, 2021
1 parent 41ee1ad commit 167b693
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 380 deletions.
352 changes: 0 additions & 352 deletions mempool/src/tests/multi_node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,6 @@ impl TestHarness {
self.node(node_id).add_txns(txns)
}

/// Commits transactions and removes them from the local mempool, stops them from being broadcasted later
fn commit_txns(&self, node_id: &NodeId, txns: Vec<TestTransaction>) {
self.node(node_id).commit_txns(txns)
}

fn find_common_network(&self, node_a: &NodeId, node_b: &NodeId) -> NetworkId {
let node_a = self.node(node_a);
let node_b = self.node(node_b);
Expand Down Expand Up @@ -268,22 +263,6 @@ impl TestHarness {
);
}

/// Disconnect two nodes
fn disconnect(&mut self, node_a_id: &NodeId, node_b_id: &NodeId) {
let network_id = self.find_common_network(node_a_id, node_b_id);
// Tell B about A
let node_a = self.node(node_a_id);
let id_a = node_a.peer_network_id(network_id);
let node_b = self.mut_node(node_b_id);
node_b.send_lost_peer_event(id_a);

// Tell A about B
let node_b = self.node(node_b_id);
let id_b = node_b.peer_network_id(network_id);
let node_a = self.mut_node(node_a_id);
node_a.send_lost_peer_event(id_b);
}

/// Blocks, expecting the next event to be the type provided
fn wait_for_event(&mut self, node_id: &NodeId, expected: SharedMempoolNotification) {
self.mut_node(node_id).wait_for_event(expected);
Expand Down Expand Up @@ -403,41 +382,6 @@ impl TestHarness {
}
}

/// Convenience function, broadcasts transactions, and makes sure they got to the right place,
/// and received the right sequence number
fn broadcast_txns_and_validate(
&mut self,
sender_id: &NodeId,
receiver_id: &NodeId,
seq_num: u64,
) {
self.broadcast_txns_and_validate_with_networks(sender_id, receiver_id, seq_num)
}

fn broadcast_txns_and_validate_with_networks(
&mut self,
sender_id: &NodeId,
receiver_id: &NodeId,
seq_num: u64,
) {
let network_id = self.find_common_network(sender_id, receiver_id);
let (txns, rx_peer) = self.broadcast_txns_successfully(sender_id, network_id, 1);
assert_eq!(1, txns.len(), "Expected only one transaction");
let actual_seq_num = txns.get(0).unwrap().sequence_number();
assert_eq!(
seq_num, actual_seq_num,
"Expected seq_num {}, got {}",
seq_num, actual_seq_num
);
let receiver = self.node(receiver_id);
let expected_peer_id = receiver.peer_id(network_id);
assert_eq!(
expected_peer_id, rx_peer,
"Expected peer {} to receive message, but {} got it instead",
expected_peer_id, rx_peer
);
}

/// Delivers broadcast ACK from `peer`.
fn deliver_response(&mut self, sender_id: &NodeId, network_id: NetworkId) {
// Wait for an ACK to come in on the events
Expand Down Expand Up @@ -508,24 +452,6 @@ fn test_transaction(seq_num: u64) -> TestTransaction {
TestTransaction::new(1, seq_num, 1)
}

#[test]
fn test_basic_flow() {
let (mut harness, validators) =
TestHarness::bootstrap_validator_network(2, Some(MempoolOverrideConfig::new()));
let (v_a, v_b) = (validators.get(0).unwrap(), validators.get(1).unwrap());

// Add transactions to send
harness.add_txns(v_a, test_transactions(0, 3));

// A discovers new peer B
harness.connect(v_b, v_a);

// A sends messages, which are received by B
for seq_num in 0..3 {
harness.broadcast_txns_and_validate(v_a, v_b, seq_num);
}
}

#[test]
fn test_metric_cache_ignore_shared_txns() {
let (mut harness, validators) =
Expand Down Expand Up @@ -560,284 +486,6 @@ fn test_metric_cache_ignore_shared_txns() {
}
}

#[test]
fn test_interruption_in_sync() {
let (mut harness, validators) =
TestHarness::bootstrap_validator_network(3, Some(MempoolOverrideConfig::new()));
let (v_a, v_b, v_c) = (
validators.get(0).unwrap(),
validators.get(1).unwrap(),
validators.get(2).unwrap(),
);

harness.add_txns(v_a, vec![test_transaction(0)]);

// A discovers first peer
harness.connect(v_b, v_a);

// Make sure first txn delivered to first peer
harness.broadcast_txns_and_validate(v_a, v_b, 0);

// A discovers second peer
harness.connect(v_c, v_a);

// Make sure first txn delivered to second peer
harness.broadcast_txns_and_validate(v_a, v_c, 0);

// A loses connection to B
harness.disconnect(v_a, v_b);

// Only C receives the following transactions
for seq_num in 1..3 {
harness.add_txns(v_a, vec![test_transaction(seq_num)]);
harness.broadcast_txns_and_validate(v_a, v_c, seq_num);
}

// B reconnects to A
harness.connect(v_b, v_a);

// B starts over from the beginning, and receives 0..3
for seq_num in 0..3 {
harness.broadcast_txns_and_validate(v_a, v_b, seq_num);
}
}

#[test]
fn test_ready_transactions() {
let (mut harness, validators) =
TestHarness::bootstrap_validator_network(2, Some(MempoolOverrideConfig::new()));
let (v_a, v_b) = (validators.get(0).unwrap(), validators.get(1).unwrap());

harness.add_txns(v_a, vec![test_transaction(0), test_transaction(2)]);

// First message delivery
harness.connect(v_b, v_a);
harness.broadcast_txns_and_validate(v_a, v_b, 0);

// Add txn1 to mempool
harness.add_txns(v_a, vec![test_transaction(1)]);
// txn1 unlocked txn2. Now all transactions can go through in correct order
harness.broadcast_txns_and_validate(v_a, v_b, 1);
harness.broadcast_txns_and_validate(v_a, v_b, 2);
}

#[test]
fn test_broadcast_self_transactions() {
let (mut harness, validators) =
TestHarness::bootstrap_validator_network(2, Some(MempoolOverrideConfig::new()));
let (v_a, v_b) = (validators.get(0).unwrap(), validators.get(1).unwrap());
harness.add_txns(v_a, vec![test_transaction(0)]);

// A and B discover each other
harness.connect(v_b, v_a);

// A sends txn to B
harness.broadcast_txns_successfully(v_a, NetworkId::Validator, 1);

// Add new txn to B
harness.add_txns(v_b, vec![TestTransaction::new(2, 0, 1)]);

// Verify that A will receive only second transaction from B
let (txn, _) = harness.broadcast_txns_successfully(v_b, NetworkId::Validator, 1);
assert_eq!(
txn.get(0).unwrap().sender(),
TestTransaction::get_address(2)
);
}

#[test]
fn test_broadcast_dependencies() {
let validator_config = MempoolOverrideConfig::new();
let (mut harness, validators) =
TestHarness::bootstrap_validator_network(2, Some(validator_config));
let (v_a, v_b) = (validators.get(0).unwrap(), validators.get(1).unwrap());

// Peer A has transactions with sequence numbers 0 and 2
harness.add_txns(v_a, vec![test_transaction(0), test_transaction(2)]);

// Peer B has txn1
harness.add_txns(v_b, vec![test_transaction(1)]);

// A and B discover each other
harness.connect(v_b, v_a);

// B receives 0
harness.broadcast_txns_and_validate(v_a, v_b, 0);
// Now B can broadcast 1
harness.broadcast_txns_and_validate(v_b, v_a, 1);
// Now A can broadcast 2
harness.broadcast_txns_and_validate(v_a, v_b, 2);
}

#[test]
fn test_broadcast_updated_transaction() {
let (mut harness, validators) =
TestHarness::bootstrap_validator_network(2, Some(MempoolOverrideConfig::new()));
let (v_a, v_b) = (validators.get(0).unwrap(), validators.get(1).unwrap());

// Peer A has a transaction with sequence number 0 and gas price 1
harness.add_txns(v_a, vec![test_transaction(0)]);

// A and B discover each other
harness.connect(v_b, v_a);

// B receives 0
let (txn, _) = harness.broadcast_txns_successfully(v_a, NetworkId::Validator, 1);
assert_eq!(txn.get(0).unwrap().sequence_number(), 0);
assert_eq!(txn.get(0).unwrap().gas_unit_price(), 1);

// Update the gas price of the transaction with sequence 0 after B has already received 0
harness.add_txns(v_a, vec![TestTransaction::new(1, 0, 5)]);

// Trigger send from A to B and check B has updated gas price for sequence 0
let (txn, _) = harness.broadcast_txns_successfully(v_a, NetworkId::Validator, 1);
assert_eq!(txn.get(0).unwrap().sequence_number(), 0);
assert_eq!(txn.get(0).unwrap().gas_unit_price(), 5);
}

// Tests VFN properly identifying upstream peers in a network with both upstream and downstream peers
#[test]
fn test_vfn_multi_network() {
let (mut harness, peers) = TestHarness::bootstrap_network(
2,
Some(MempoolOverrideConfig::new()),
true,
Some(MempoolOverrideConfig::new()),
1,
Some(MempoolOverrideConfig::new()),
);
let validators = peers.get(&PeerRole::Validator).unwrap();
let (v_a, v_b) = (validators.get(0).unwrap(), validators.get(1).unwrap());

let vfns = peers.get(&PeerRole::ValidatorFullNode).unwrap();
let (vfn_a, vfn_b) = (vfns.get(0).unwrap(), vfns.get(1).unwrap());

let pfn = peers.get(&PeerRole::Unknown).unwrap().get(0).unwrap();

// Make a Chain PFN -> VFN A -> VFN B (-> upstream)
// VFN A discovers pfn as Inbound
harness.connect(pfn, vfn_a);
// VFN B discovers VFN A as Inbound
harness.connect(vfn_a, vfn_b);

// Also add Validator chain PFN -> VFN A -> V A and VFN B -> VB
harness.connect(vfn_a, v_a);
harness.connect(vfn_b, v_b);

// Add V A <-> V B
harness.connect(v_b, v_a);

// Add txn to VFN A
harness.add_txns(vfn_a, vec![test_transaction(0)]);

// VFN A should broadcast to upstream
harness.broadcast_txns_and_validate(vfn_a, v_a, 0);

// VFN A should additionally broadcast to failover upstream vfn in public network
harness.broadcast_txns_and_validate_with_networks(vfn_a, vfn_b, 0);

// Check no other mesages sent
harness.assert_no_message_sent(vfn_a, NetworkId::Vfn);
harness.assert_no_message_sent(vfn_a, NetworkId::Public);
}

#[test]
fn test_rebroadcast_mempool_is_full() {
let mut validator_mempool_config = MempoolOverrideConfig::new();
validator_mempool_config.broadcast_batch_size = Some(3);
validator_mempool_config.mempool_size = Some(5);
let mut vfn_mempool_config = MempoolOverrideConfig::new();
vfn_mempool_config.broadcast_batch_size = Some(3);
// Backoff retry is shorter so that test doesn't take forever
vfn_mempool_config.backoff_interval_ms = Some(50);
let (mut harness, peers) = TestHarness::bootstrap_network(
1,
Some(validator_mempool_config),
true,
Some(vfn_mempool_config),
0,
None,
);
let val = peers.get(&PeerRole::Validator).unwrap().get(0).unwrap();
let vfn = peers
.get(&PeerRole::ValidatorFullNode)
.unwrap()
.get(0)
.unwrap();
let all_txns = test_transactions(0, 8);

harness.add_txns(vfn, all_txns.clone());

// VFN connects to Val
harness.connect(vfn, val);

// We should get all three txns in a batch
let (txns, _) = harness.broadcast_txns_successfully(vfn, NetworkId::Vfn, 1);
let seq_nums = txns
.iter()
.map(|txn| txn.sequence_number())
.collect::<Vec<_>>();
assert_eq!(vec![0, 1, 2], seq_nums);

// We continue getting broadcasts because we haven't committed the txns
for _ in 0..2 {
let (txns, _) = harness.broadcast_txns(vfn, NetworkId::Vfn, 1, false, true, false);
let seq_nums = txns
.iter()
.map(|txn| txn.sequence_number())
.collect::<Vec<_>>();
assert_eq!(vec![3, 4, 5], seq_nums);
}

// Test getting out of rebroadcasting mode: checking we can move past rebroadcasting after receiving non-retry ACK.
// Remove some txns, which should free space for more
harness.commit_txns(val, all_txns[..1].to_vec());

// Send retry batch again, this time it should be processed
harness.broadcast_txns_successfully(vfn, NetworkId::Vfn, 1);

// Retry batch sent above should be processed successfully, and FN should move on to broadcasting later txns
let (txns, _) = harness.broadcast_txns(vfn, NetworkId::Vfn, 1, false, true, false);
let seq_nums = txns
.iter()
.map(|txn| txn.sequence_number())
.collect::<Vec<_>>();
assert_eq!(vec![6, 7], seq_nums);
}

#[test]
fn test_rebroadcast_missing_ack() {
let (mut harness, validators) =
TestHarness::bootstrap_validator_network(2, Some(MempoolOverrideConfig::new()));
let (v_a, v_b) = (validators.get(0).unwrap(), validators.get(1).unwrap());
let pool_txns = test_transactions(0, 3);
harness.add_txns(v_a, pool_txns);

// A and B discover each other
harness.connect(v_b, v_a);

// Test that txn broadcasts that don't receive an ACK, A rebroadcasts the unACK'ed batch of txns
for _ in 0..3 {
let (txns, _) = harness.broadcast_txns(v_a, NetworkId::Validator, 1, true, false, false);
assert_eq!(0, txns.get(0).unwrap().sequence_number());
}

// Getting out of rebroadcasting mode scenario 1: B sends back ACK eventually
let (txns, _) = harness.broadcast_txns(v_a, NetworkId::Validator, 1, true, true, false);
assert_eq!(0, txns.get(0).unwrap().sequence_number());

for _ in 0..3 {
let (txns, _) = harness.broadcast_txns(v_a, NetworkId::Validator, 1, true, false, false);
assert_eq!(1, txns.get(0).unwrap().sequence_number());
}

// Getting out of rebroadcasting mode scenario 2: txns in unACK'ed batch gets committed
harness.commit_txns(v_a, vec![test_transaction(1)]);

let (txns, _) = harness.broadcast_txns(v_a, NetworkId::Validator, 1, true, false, false);
assert_eq!(2, txns.get(0).unwrap().sequence_number());
}

#[test]
fn test_max_broadcast_limit() {
let mut validator_mempool_config = MempoolOverrideConfig::new();
Expand Down
Loading

0 comments on commit 167b693

Please sign in to comment.