forked from zeromq/zmq.rs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpub_sub.rs
104 lines (91 loc) · 3.79 KB
/
pub_sub.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use zeromq::prelude::*;
use zeromq::Endpoint;
use zeromq::ZmqMessage;
use zeromq::__async_rt as async_rt;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
use std::time::Duration;
#[async_rt::test]
async fn test_pub_sub_sockets() {
pretty_env_logger::try_init().ok();
async fn helper(bind_addr: &'static str) {
// We will join on these at the end to determine if any tasks we spawned
// panicked
let mut task_handles = Vec::new();
let payload = chrono::Utc::now().to_rfc2822();
let cloned_payload = payload.clone();
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>();
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>();
task_handles.push(async_rt::task::spawn(async move {
let mut pub_socket = zeromq::PubSocket::new();
let bound_to = pub_socket
.bind(bind_addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e));
has_bound_sender
.send(bound_to)
.expect("channel was dropped");
loop {
if let Ok(Some(_)) = server_stop.try_recv() {
break;
}
let s: String = cloned_payload.clone();
let m = ZmqMessage::from(s);
pub_socket.send(m).await.expect("Failed to send");
async_rt::task::sleep(Duration::from_millis(1)).await;
}
let errs = pub_socket.close().await;
if !errs.is_empty() {
panic!("Could not unbind socket: {:?}", errs);
}
}));
// Block until the pub has finished binding
// TODO: ZMQ sockets should not care about this sort of ordering.
// See https://github.com/zeromq/zmq.rs/issues/73
let bound_addr = has_bound.await.expect("channel was cancelled");
if let Endpoint::Tcp(_host, port) = bound_addr.clone() {
assert_ne!(port, 0);
}
let (sub_results_sender, sub_results) = mpsc::channel(100);
for _ in 0..10 {
let mut cloned_sub_sender = sub_results_sender.clone();
let cloned_payload = payload.clone();
let cloned_bound_addr = bound_addr.to_string();
task_handles.push(async_rt::task::spawn(async move {
let mut sub_socket = zeromq::SubSocket::new();
sub_socket
.connect(&cloned_bound_addr)
.await
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr));
sub_socket.subscribe("").await.expect("Failed to subscribe");
async_rt::task::sleep(std::time::Duration::from_millis(500)).await;
for _ in 0..10 {
let recv_message = sub_socket.recv().await.unwrap();
let recv_payload =
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap();
assert_eq!(cloned_payload, recv_payload);
cloned_sub_sender.send(()).await.unwrap();
}
}));
}
drop(sub_results_sender);
let res_vec: Vec<()> = sub_results.collect().await;
assert_eq!(100, res_vec.len());
server_stop_sender.send(()).unwrap();
for t in task_handles {
t.await.expect("Task failed unexpectedly!");
}
}
let addrs = vec![
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"tcp://127.0.0.1:0",
"tcp://localhost:0",
"tcp://127.0.0.1:0",
"tcp://[::1]:0",
"ipc://asdf.sock",
"ipc://anothersocket-asdf",
];
futures::future::join_all(addrs.into_iter().map(helper)).await;
}