From 89a2f7f16dc8f3a705490e9eab388af157346975 Mon Sep 17 00:00:00 2001 From: ryardley Date: Sat, 21 Dec 2024 14:49:34 +1100 Subject: [PATCH] Ensure tests work with changes to NetworkManager --- .../tests/test_aggregation_and_decryption.rs | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs index 2464eb50..b2011ed5 100644 --- a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs +++ b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs @@ -8,7 +8,7 @@ use enclave_core::{ }; use fhe::{setup_crp_params, ParamsWithCrp, SharedRng}; use logger::SimpleLogger; -use net::NetworkManager; +use net::{correlation_id::CorrelationId, events::NetworkPeerEvent, NetworkManager}; use router::{ CiphernodeSelector, E3RequestRouter, FheFeature, KeyshareFeature, PlaintextAggregatorFeature, PublicKeyAggregatorFeature, RepositoriesFactory, @@ -27,8 +27,8 @@ use rand::Rng; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; use std::{env, path::Path, sync::Arc, time::Duration}; -use tokio::sync::Mutex; -use tokio::{sync::mpsc::channel, time::sleep}; +use tokio::sync::{broadcast, Mutex}; +use tokio::{sync::mpsc, time::sleep}; // Simulating a local node type LocalCiphernodeTuple = ( @@ -465,24 +465,35 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { #[actix::test] async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { // Setup elements in test - let (tx, mut output) = channel(100); // Transmit byte events to the network - let (input, rx) = channel(100); // Receive byte events from the network + let (cmd_tx, mut cmd_rx) = mpsc::channel(100); // Transmit byte events to the network + let (event_tx, _) = broadcast::channel(100); // Receive byte events from the network let bus = EventBus::new(true).start(); - NetworkManager::setup(bus.clone(), tx.clone(), rx); + let event_rx = event_tx.subscribe(); + // Pas cmd and event channels to NetworkManager + NetworkManager::setup(bus.clone(), cmd_tx.clone(), event_rx, "my-topic"); // Capture messages from output on msgs vec let msgs: Arc>>> = Arc::new(Mutex::new(Vec::new())); let msgs_loop = msgs.clone(); tokio::spawn(async move { - while let Some(msg) = output.recv().await { - msgs_loop.lock().await.push(msg.clone()); - let _ = input.send(msg).await; - // loopback to simulate a rebroadcast message + // Pull events from command channel + while let Some(cmd) = cmd_rx.recv().await { + // If the command is a GossipPublish then extract it and save it whilst sending it to + // the event bus as if it was gossiped from the network and ended up as an external + // message this simulates a rebroadcast message + if let Some(msg) = match cmd { + net::events::NetworkPeerCommand::GossipPublish { data, .. } => Some(data), + _ => None, + } { + msgs_loop.lock().await.push(msg.clone()); + event_tx.send(NetworkPeerEvent::GossipData(msg))?; + } // if this manages to broadcast an event to the // event bus we will expect to see an extra event on - // the bus + // the bus but we don't because we handle this } + anyhow::Ok(()) }); let evt_1 = EnclaveEvent::from(PlaintextAggregated { @@ -532,10 +543,10 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { let seed = Seed(ChaCha20Rng::seed_from_u64(123).get_seed()); // Setup elements in test - let (tx, _) = channel(100); // Transmit byte events to the network - let (input, rx) = channel(100); // Receive byte events from the network + let (cmd_tx, _) = mpsc::channel(100); // Transmit byte events to the network + let (event_tx, event_rx) = broadcast::channel(100); // Receive byte events from the network let bus = EventBus::new(true).start(); - NetworkManager::setup(bus.clone(), tx.clone(), rx); + NetworkManager::setup(bus.clone(), cmd_tx.clone(), event_rx, "mytopic"); // Capture messages from output on msgs vec let event = EnclaveEvent::from(E3Requested { @@ -547,7 +558,7 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { }); // lets send an event from the network - let _ = input.send(event.to_bytes()?).await; + let _ = event_tx.send(NetworkPeerEvent::GossipData(event.to_bytes()?)); sleep(Duration::from_millis(1)).await; // need to push to next tick