Skip to content

Commit

Permalink
Merge pull request zeromq#193 from zeromq/wrap-tests-in-cfg-test
Browse files Browse the repository at this point in the history
enable async dispatcher macros only for tests
  • Loading branch information
rgbkrk authored May 31, 2024
2 parents a0ef148 + 3f74ee4 commit 01355be
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 382 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all --all-targets --no-default-features --features async-std-runtime,all-transport -- --deny warnings
args: --all --all-targets --no-default-features --features async-std-runtime,all-transport,async-dispatcher-macros -- --deny warnings

test:
name: Test
Expand Down Expand Up @@ -63,7 +63,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all --no-default-features --features async-dispatcher-runtime,all-transport
args: --all --no-default-features --features async-dispatcher-runtime,all-transport,async-dispatcher-macros

fmt:
name: Formatting
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ default = ["tokio-runtime", "all-transport"]
tokio-runtime = ["tokio", "tokio-util"]
async-std-runtime = ["async-std"]
async-dispatcher-runtime = ["async-std", "async-dispatcher"]
async-dispatcher-macros = ["async-dispatcher/macros"]
all-transport = ["ipc-transport", "tcp-transport"]
ipc-transport = []
tcp-transport = []
Expand Down
5 changes: 4 additions & 1 deletion src/async_rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,8 @@ extern crate async_std;
#[cfg(feature = "async-std-runtime")]
pub use async_std::{main, test};

#[cfg(feature = "async-dispatcher-runtime")]
#[cfg(all(
feature = "async-dispatcher-runtime",
feature = "async-dispatcher-macros"
))]
pub use async_dispatcher::{main, test};
83 changes: 43 additions & 40 deletions tests/message.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,48 @@
use bytes::Bytes;
use std::collections::vec_deque::VecDeque;
use std::convert::TryFrom;
use zeromq::ZmqMessage;
#[cfg(test)]
mod test {
use bytes::Bytes;
use std::collections::vec_deque::VecDeque;
use std::convert::TryFrom;
use zeromq::ZmqMessage;

#[test]
fn test_split_off() {
let mut frames = VecDeque::with_capacity(5);
frames.push_back(Bytes::from("id1"));
frames.push_back(Bytes::from("id2"));
frames.push_back(Bytes::from(""));
frames.push_back(Bytes::from("data1"));
frames.push_back(Bytes::from("data2"));
let mut m = ZmqMessage::try_from(frames).unwrap();
let data = m.split_off(3);
assert_eq!(m.len(), 3);
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
assert_eq!(m.get(2), Some(&Bytes::from("")));
assert_eq!(data.len(), 2);
assert_eq!(data.get(0), Some(&Bytes::from("data1")));
assert_eq!(data.get(1), Some(&Bytes::from("data2")));
}
#[test]
fn test_split_off() {
let mut frames = VecDeque::with_capacity(5);
frames.push_back(Bytes::from("id1"));
frames.push_back(Bytes::from("id2"));
frames.push_back(Bytes::from(""));
frames.push_back(Bytes::from("data1"));
frames.push_back(Bytes::from("data2"));
let mut m = ZmqMessage::try_from(frames).unwrap();
let data = m.split_off(3);
assert_eq!(m.len(), 3);
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
assert_eq!(m.get(2), Some(&Bytes::from("")));
assert_eq!(data.len(), 2);
assert_eq!(data.get(0), Some(&Bytes::from("data1")));
assert_eq!(data.get(1), Some(&Bytes::from("data2")));
}

#[test]
fn test_prepend() {
let mut frames = VecDeque::with_capacity(2);
frames.push_back(Bytes::from("data1"));
frames.push_back(Bytes::from("data2"));
let mut m = ZmqMessage::try_from(frames).unwrap();
#[test]
fn test_prepend() {
let mut frames = VecDeque::with_capacity(2);
frames.push_back(Bytes::from("data1"));
frames.push_back(Bytes::from("data2"));
let mut m = ZmqMessage::try_from(frames).unwrap();

let mut envelope_frames = VecDeque::with_capacity(3);
envelope_frames.push_back(Bytes::from("id1"));
envelope_frames.push_back(Bytes::from("id2"));
envelope_frames.push_back(Bytes::from(""));
let envelope = ZmqMessage::try_from(envelope_frames).unwrap();
let mut envelope_frames = VecDeque::with_capacity(3);
envelope_frames.push_back(Bytes::from("id1"));
envelope_frames.push_back(Bytes::from("id2"));
envelope_frames.push_back(Bytes::from(""));
let envelope = ZmqMessage::try_from(envelope_frames).unwrap();

m.prepend(&envelope);
assert_eq!(m.len(), 5);
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
assert_eq!(m.get(2), Some(&Bytes::from("")));
assert_eq!(m.get(3), Some(&Bytes::from("data1")));
assert_eq!(m.get(4), Some(&Bytes::from("data2")));
m.prepend(&envelope);
assert_eq!(m.len(), 5);
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
assert_eq!(m.get(2), Some(&Bytes::from("")));
assert_eq!(m.get(3), Some(&Bytes::from("data1")));
assert_eq!(m.get(4), Some(&Bytes::from("data2")));
}
}
177 changes: 90 additions & 87 deletions tests/pub_sub.rs
Original file line number Diff line number Diff line change
@@ -1,104 +1,107 @@
use zeromq::prelude::*;
use zeromq::Endpoint;
use zeromq::ZmqMessage;
use zeromq::__async_rt as async_rt;
#[cfg(test)]
mod test {
use zeromq::prelude::*;
use zeromq::Endpoint;
use zeromq::ZmqMessage;
use zeromq::__async_rt as async_rt;

use futures_channel::{mpsc, oneshot};
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
use futures_channel::{mpsc, oneshot};
use futures_util::{SinkExt, StreamExt};
use std::time::Duration;

#[async_rt::test]
async fn test_pub_sub_sockets() {
pretty_env_logger::try_init().ok();
#[async_rt::test]
async fn test_pub_sub_sockets() {
pretty_env_logger::try_init().ok();

async fn helper(bind_addr: &'static str) {
// We will join on these at the end to determine if any tasks we spawned
// panicked
let mut task_handles = Vec::new();
let payload = chrono::Utc::now().to_rfc2822();
async fn helper(bind_addr: &'static str) {
// We will join on these at the end to determine if any tasks we spawned
// panicked
let mut task_handles = Vec::new();
let payload = chrono::Utc::now().to_rfc2822();

let cloned_payload = payload.clone();
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>();
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>();
task_handles.push(async_rt::task::spawn(async move {
let mut pub_socket = zeromq::PubSocket::new();
let bound_to = pub_socket
.bind(bind_addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e));
has_bound_sender
.send(bound_to)
.expect("channel was dropped");
let cloned_payload = payload.clone();
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>();
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>();
task_handles.push(async_rt::task::spawn(async move {
let mut pub_socket = zeromq::PubSocket::new();
let bound_to = pub_socket
.bind(bind_addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e));
has_bound_sender
.send(bound_to)
.expect("channel was dropped");

loop {
if let Ok(Some(_)) = server_stop.try_recv() {
break;
loop {
if let Ok(Some(_)) = server_stop.try_recv() {
break;
}

let s: String = cloned_payload.clone();
let m = ZmqMessage::from(s);
pub_socket.send(m).await.expect("Failed to send");
async_rt::task::sleep(Duration::from_millis(1)).await;
}

let s: String = cloned_payload.clone();
let m = ZmqMessage::from(s);
pub_socket.send(m).await.expect("Failed to send");
async_rt::task::sleep(Duration::from_millis(1)).await;
let errs = pub_socket.close().await;
if !errs.is_empty() {
panic!("Could not unbind socket: {:?}", errs);
}
}));
// Block until the pub has finished binding
// TODO: ZMQ sockets should not care about this sort of ordering.
// See https://github.com/zeromq/zmq.rs/issues/73
let bound_addr = has_bound.await.expect("channel was cancelled");
if let Endpoint::Tcp(_host, port) = bound_addr.clone() {
assert_ne!(port, 0);
}

let errs = pub_socket.close().await;
if !errs.is_empty() {
panic!("Could not unbind socket: {:?}", errs);
}
}));
// Block until the pub has finished binding
// TODO: ZMQ sockets should not care about this sort of ordering.
// See https://github.com/zeromq/zmq.rs/issues/73
let bound_addr = has_bound.await.expect("channel was cancelled");
if let Endpoint::Tcp(_host, port) = bound_addr.clone() {
assert_ne!(port, 0);
}
let (sub_results_sender, sub_results) = mpsc::channel(100);
for _ in 0..10 {
let mut cloned_sub_sender = sub_results_sender.clone();
let cloned_payload = payload.clone();
let cloned_bound_addr = bound_addr.to_string();
task_handles.push(async_rt::task::spawn(async move {
let mut sub_socket = zeromq::SubSocket::new();
sub_socket
.connect(&cloned_bound_addr)
.await
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr));

let (sub_results_sender, sub_results) = mpsc::channel(100);
for _ in 0..10 {
let mut cloned_sub_sender = sub_results_sender.clone();
let cloned_payload = payload.clone();
let cloned_bound_addr = bound_addr.to_string();
task_handles.push(async_rt::task::spawn(async move {
let mut sub_socket = zeromq::SubSocket::new();
sub_socket
.connect(&cloned_bound_addr)
.await
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr));
sub_socket.subscribe("").await.expect("Failed to subscribe");

sub_socket.subscribe("").await.expect("Failed to subscribe");
async_rt::task::sleep(std::time::Duration::from_millis(500)).await;

async_rt::task::sleep(std::time::Duration::from_millis(500)).await;
for _ in 0..10 {
let recv_message = sub_socket.recv().await.unwrap();
let recv_payload =
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap();
assert_eq!(cloned_payload, recv_payload);
cloned_sub_sender.send(()).await.unwrap();
}
}));
}
drop(sub_results_sender);
let res_vec: Vec<()> = sub_results.collect().await;
assert_eq!(100, res_vec.len());

for _ in 0..10 {
let recv_message = sub_socket.recv().await.unwrap();
let recv_payload =
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap();
assert_eq!(cloned_payload, recv_payload);
cloned_sub_sender.send(()).await.unwrap();
}
}));
server_stop_sender.send(()).unwrap();
for t in task_handles {
t.await.expect("Task failed unexpectedly!");
}
}
drop(sub_results_sender);
let res_vec: Vec<()> = sub_results.collect().await;
assert_eq!(100, res_vec.len());

server_stop_sender.send(()).unwrap();
for t in task_handles {
t.await.expect("Task failed unexpectedly!");
}
let addrs = vec![
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"tcp://127.0.0.1:0",
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"ipc://asdf.sock",
"ipc://anothersocket-asdf",
];
futures_util::future::join_all(addrs.into_iter().map(helper)).await;
}

let addrs = vec![
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"tcp://127.0.0.1:0",
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"ipc://asdf.sock",
"ipc://anothersocket-asdf",
];
futures_util::future::join_all(addrs.into_iter().map(helper)).await;
}
Loading

0 comments on commit 01355be

Please sign in to comment.