forked from zeromq/zmq.rs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbackend.rs
125 lines (113 loc) · 4.05 KB
/
backend.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::codec::{FramedIo, Message, ZmqFramedRead, ZmqFramedWrite};
use crate::fair_queue::QueueInner;
use crate::util::PeerIdentity;
use crate::{
MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult,
};
use async_trait::async_trait;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures_channel::mpsc;
use futures_util::SinkExt;
use parking_lot::Mutex;
use std::sync::Arc;
pub(crate) struct Peer {
pub(crate) send_queue: ZmqFramedWrite,
}
pub(crate) struct GenericSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_type: SocketType,
socket_options: SocketOptions,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}
impl GenericSocketBackend {
pub(crate) fn with_options(
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
socket_type: SocketType,
options: SocketOptions,
) -> Self {
Self {
peers: DashMap::new(),
fair_queue_inner,
round_robin: SegQueue::new(),
socket_type,
socket_options: options,
socket_monitor: Mutex::new(None),
}
}
pub(crate) async fn send_round_robin(&self, message: Message) -> ZmqResult<PeerIdentity> {
// In normal scenario this will always be only 1 iteration
// There can be special case when peer has disconnected and his id is still in
// RR queue This happens because SegQueue don't have an api to delete
// items from queue. So in such case we'll just pop item and skip it if
// we don't have a matching peer in peers map
loop {
let next_peer_id = match self.round_robin.pop() {
Some(peer) => peer,
None => match message {
Message::Greeting(_) => panic!("Sending greeting is not supported"),
Message::Command(_) => panic!("Sending commands is not supported"),
Message::Message(m) => {
return Err(ZmqError::ReturnToSender {
reason: "Not connected to peers. Unable to send messages",
message: m,
})
}
},
};
let send_result = match self.peers.get_mut(&next_peer_id) {
Some(mut peer) => peer.send_queue.send(message).await,
None => continue,
};
return match send_result {
Ok(()) => {
self.round_robin.push(next_peer_id.clone());
Ok(next_peer_id)
}
Err(e) => {
self.peer_disconnected(&next_peer_id);
Err(e.into())
}
};
}
}
}
impl SocketBackend for GenericSocketBackend {
fn socket_type(&self) -> SocketType {
self.socket_type
}
fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}
fn shutdown(&self) {
self.peers.clear();
}
fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}
#[async_trait]
impl MultiPeerBackend for GenericSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(peer_id.clone(), Peer { send_queue });
self.round_robin.push(peer_id.clone());
match &self.fair_queue_inner {
None => {}
Some(inner) => {
inner.lock().insert(peer_id.clone(), recv_queue);
}
};
}
fn peer_disconnected(&self, peer_id: &PeerIdentity) {
self.peers.remove(peer_id);
match &self.fair_queue_inner {
None => {}
Some(inner) => {
inner.lock().remove(peer_id);
}
};
}
}