Skip to content

Commit

Permalink
Fixed zeromq#21, allow socket objects be sent to different tasks
Browse files Browse the repository at this point in the history
fantix committed Jul 6, 2014
1 parent e569c9c commit f7f889b
Showing 7 changed files with 131 additions and 93 deletions.
57 changes: 9 additions & 48 deletions src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,33 @@
use consts;
use socket_base::{SocketBase, SocketMessage, OnConnected};
use inproc::InprocManager;
use socket_base::SocketBase;
use rep::RepSocket;
use req::ReqSocket;
use result::ZmqResult;

use std::collections::HashMap;


pub struct Context {
inproc_binders: HashMap<String, Sender<ZmqResult<SocketMessage>>>,
inproc_connecters: HashMap<String, Vec<Sender<ZmqResult<SocketMessage>>>>,
inproc_mgr: InprocManager,
}

impl Context {
pub fn new() -> Context {
Context {
inproc_binders: HashMap::new(),
inproc_connecters: HashMap::new(),
inproc_mgr: InprocManager::new(),
}
}

pub fn socket(&mut self, type_: consts::SocketType) -> Box<SocketBase> {
pub fn socket(&self, type_: consts::SocketType) -> Box<SocketBase + Send> {
match type_ {
consts::REQ => {
let ret: ReqSocket = SocketBase::new(self);
box ret as Box<SocketBase>
let ret: ReqSocket = SocketBase::new(self.inproc_mgr.chan());
box ret as Box<SocketBase + Send>
},
consts::REP => {
let ret: RepSocket = SocketBase::new(self);
box ret as Box<SocketBase>
let ret: RepSocket = SocketBase::new(self.inproc_mgr.chan());
box ret as Box<SocketBase + Send>
},
}
}

pub fn _bind_inproc(&mut self, address: &str, tx: Sender<ZmqResult<SocketMessage>>) {
let key = String::from_str(address);

if self.inproc_binders.contains_key(&key) {
// TODO: return error
fail!();
}

if self.inproc_connecters.contains_key(&key) {
let connecters = self.inproc_connecters.get(&key);
for connecter_tx in connecters.iter() {
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
connecter_tx.send(Ok(OnConnected(tx1, rx2)));
tx.send(Ok(OnConnected(tx2, rx1)));
}
}

self.inproc_binders.insert(key.clone(), tx);
}

pub fn _connect_inproc(&mut self, address: &str, tx: Sender<ZmqResult<SocketMessage>>) {
let key = String::from_str(address);

if self.inproc_binders.contains_key(&key) {
let binder_tx = self.inproc_binders.get(&key);
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
binder_tx.send(Ok(OnConnected(tx1, rx2)));
tx.send(Ok(OnConnected(tx2, rx1)));
}

self.inproc_connecters.find_or_insert(key.clone(), vec!()).push(tx);
}
}


83 changes: 83 additions & 0 deletions src/inproc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use result::ZmqResult;
use socket_base::{SocketMessage, OnConnected};

use std::collections::HashMap;


pub enum InprocCommand {
DoBind(String, Sender<ZmqResult<SocketMessage>>),
DoConnect(String, Sender<ZmqResult<SocketMessage>>),
}


struct InprocManagerTask {
chan: Receiver<InprocCommand>,
inproc_binders: HashMap<String, Sender<ZmqResult<SocketMessage>>>,
inproc_connecters: HashMap<String, Vec<Sender<ZmqResult<SocketMessage>>>>,
}

impl InprocManagerTask {
fn run(&mut self) {
loop {
match self.chan.recv_opt() {
Ok(DoBind(key, tx)) => {
if self.inproc_binders.contains_key(&key) {
// TODO: return error
fail!("Key already exist: {}", key);
}

if self.inproc_connecters.contains_key(&key) {
let connecters = self.inproc_connecters.get(&key);
for connecter_tx in connecters.iter() {
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
connecter_tx.send(Ok(OnConnected(tx1, rx2)));
tx.send(Ok(OnConnected(tx2, rx1)));
}
}

self.inproc_binders.insert(key.clone(), tx);
},
Ok(DoConnect(key, tx)) => {
if self.inproc_binders.contains_key(&key) {
let binder_tx = self.inproc_binders.get(&key);
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
binder_tx.send(Ok(OnConnected(tx1, rx2)));
tx.send(Ok(OnConnected(tx2, rx1)));
}

self.inproc_connecters.find_or_insert(key.clone(), vec!()).push(tx);
},
_ => break,
}
}
}
}


pub struct InprocManager {
chan: Sender<InprocCommand>,
}

impl InprocManager {
pub fn new() -> InprocManager {
let (tx, rx) = channel();

spawn(proc() {
InprocManagerTask {
chan: rx,
inproc_binders: HashMap::new(),
inproc_connecters: HashMap::new(),
}.run();
});

InprocManager {
chan: tx,
}
}

pub fn chan(&self) -> Sender<InprocCommand> {
self.chan.clone()
}
}
5 changes: 4 additions & 1 deletion src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use inproc::InprocCommand;
use msg::Msg;
use options::Options;
use result::ZmqResult;
@@ -29,17 +30,19 @@ pub struct PeerManager {
rx: Receiver<ZmqResult<SocketMessage>>,
pub peers: HashMap<uint, Peer>,
ids: Vec<uint>,
pub inproc_chan: Sender<InprocCommand>,
}

impl PeerManager {
pub fn new() -> PeerManager {
pub fn new(chan: Sender<InprocCommand>) -> PeerManager {
let (tx, rx) = channel();
PeerManager {
options: Arc::new(RWLock::new(Options::new())),
tx: tx,
rx: rx,
peers: HashMap::new(),
ids: Vec::new(),
inproc_chan: chan,
}
}

18 changes: 6 additions & 12 deletions src/rep.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use consts;
use ctx::Context;
use inproc::InprocCommand;
use msg;
use msg::Msg;
use peer::PeerManager;
@@ -14,18 +14,16 @@ enum State {
}


pub struct RepSocket<'s> {
ctx: &'s mut Context,
pub struct RepSocket {
pm: PeerManager,
state: State,
last_identity: uint,
}

impl<'s> SocketBase<'s> for RepSocket<'s> {
fn new(ctx: &'s mut Context) -> RepSocket<'s> {
impl SocketBase for RepSocket {
fn new(chan: Sender<InprocCommand>) -> RepSocket {
RepSocket {
ctx: ctx,
pm: PeerManager::new(),
pm: PeerManager::new(chan),
state: Initial,
last_identity: 0,
}.init(consts::REP)
@@ -39,10 +37,6 @@ impl<'s> SocketBase<'s> for RepSocket<'s> {
&mut self.pm
}

fn ctx<'a>(&'a mut self) -> &'a mut Context {
&mut *self.ctx
}

fn msg_recv(&mut self) -> ZmqResult<Box<Msg>> {
let (id, ret) = match self.state {
Initial => self.pm.recv_first(),
@@ -84,7 +78,7 @@ mod test {

#[test]
fn test_fsm() {
let mut ctx = Context::new();
let ctx = Context::new();
let mut s = ctx.socket(consts::REP);
let msg = box Msg::new(1);
assert_eq!(s.msg_send(msg).unwrap_err().code, consts::EFSM);
18 changes: 6 additions & 12 deletions src/req.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use consts;
use ctx::Context;
use inproc::InprocCommand;
use msg;
use msg::Msg;
use peer::PeerManager;
@@ -14,19 +14,17 @@ enum State {
}


pub struct ReqSocket<'s> {
ctx: &'s mut Context,
pub struct ReqSocket {
pm: PeerManager,
state: State,
last_identity: uint,
send_count: uint,
}

impl<'s> SocketBase<'s> for ReqSocket<'s> {
fn new(ctx: &'s mut Context) -> ReqSocket<'s> {
impl SocketBase for ReqSocket {
fn new(chan: Sender<InprocCommand>) -> ReqSocket {
ReqSocket {
ctx: ctx,
pm: PeerManager::new(),
pm: PeerManager::new(chan),
state: Initial,
last_identity: 0,
send_count: 0,
@@ -41,10 +39,6 @@ impl<'s> SocketBase<'s> for ReqSocket<'s> {
&mut self.pm
}

fn ctx<'a>(&'a mut self) -> &'a mut Context {
&mut *self.ctx
}

fn msg_recv(&mut self) -> ZmqResult<Box<Msg>> {
let ret = match self.state {
Receiving => {
@@ -91,7 +85,7 @@ mod test {

#[test]
fn test_fsm() {
let mut ctx = Context::new();
let ctx = Context::new();
let mut s = ctx.socket(consts::REQ);
assert_eq!(s.msg_recv().unwrap_err().code, consts::EFSM);
}
12 changes: 5 additions & 7 deletions src/socket_base.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use consts;
use ctx::Context;
use inproc::{InprocCommand, DoBind, DoConnect};
use msg::Msg;
use peer::PeerManager;
use result::{ZmqError, ZmqResult};
@@ -17,15 +17,13 @@ pub enum SocketMessage {
}


pub trait SocketBase<'s> {
fn new(ctx: &'s mut Context) -> Self;
pub trait SocketBase {
fn new(chan: Sender<InprocCommand>) -> Self;

fn pm<'a>(&'a self) -> &'a PeerManager;

fn pmut<'a>(&'a mut self) -> &'a mut PeerManager;

fn ctx<'a>(&'a mut self) -> &'a mut Context;

fn init(self, type_: consts::SocketType) -> Self {
self.pm().options.write().type_ = type_ as int;
self
@@ -51,7 +49,7 @@ pub trait SocketBase<'s> {
},
"inproc" => {
let tx = self.pm().tx.clone();
self.ctx()._bind_inproc(address, tx);
self.pm().inproc_chan.send(DoBind(String::from_str(address), tx));
Ok(())
},
_ => Err(ZmqError::new(consts::EPROTONOSUPPORT, "Protocol not supported")),
@@ -74,7 +72,7 @@ pub trait SocketBase<'s> {
},
"inproc" => {
let tx = self.pm().tx.clone();
self.ctx()._connect_inproc(address, tx);
self.pm().inproc_chan.send(DoConnect(String::from_str(address), tx));
Ok(())
},
_ => Err(ZmqError::new(consts::EPROTONOSUPPORT, "Protocol not supported")),
31 changes: 18 additions & 13 deletions src/zmq.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ pub use socket_base::SocketBase;

mod ctx;
mod consts;
mod inproc;
mod msg;
mod peer;
mod rep;
@@ -41,14 +42,14 @@ mod test {

#[test]
fn test_socket_create() {
let mut c = super::Context::new();
let c = super::Context::new();
let s = c.socket(super::REQ);
assert_eq!(s.getsockopt(super::TYPE), super::REQ as int);
}

#[test]
fn test_socket_bind() {
let mut c = super::Context::new();
let c = super::Context::new();
let mut s = c.socket(super::REQ);
assert_eq!(s.bind("").unwrap_err().code, super::EINVAL);
assert_eq!(s.bind("://127").unwrap_err().code, super::EINVAL);
@@ -62,7 +63,7 @@ mod test {

#[test]
fn test_socket_connect() {
let mut c = super::Context::new();
let c = super::Context::new();
let mut s = c.socket(super::REQ);
assert_eq!(s.connect("").unwrap_err().code, super::EINVAL);
assert_eq!(s.connect("://127").unwrap_err().code, super::EINVAL);
@@ -75,7 +76,7 @@ mod test {

#[test]
fn test_socket_small_message() {
let mut c = super::Context::new();
let c = super::Context::new();
let mut req = c.socket(super::REQ);
let mut rep = c.socket(super::REP);
assert!(rep.bind("tcp://127.0.0.1:12347").is_ok());
@@ -90,17 +91,21 @@ mod test {
}

#[test]
fn test_inproc() {
let mut c = super::Context::new();
let mut req = c.socket(super::REQ);
let mut rep = c.socket(super::REP);
assert!(req.connect("inproc://#1").is_ok());
assert!(rep.bind("inproc://#1").is_ok());
fn test_inproc_and_moved_socket() {
let c = super::Context::new();
let req = c.socket(super::REQ);

let mut msg_sent = box super::Msg::new(4);
msg_sent.data.push_all([65u8, 66u8, 67u8, 68u8]);
assert!(req.msg_send(msg_sent).is_ok());
spawn(proc() {
let mut req = req;
assert!(req.connect("inproc://#1").is_ok());

let mut msg_sent = box super::Msg::new(4);
msg_sent.data.push_all([65u8, 66u8, 67u8, 68u8]);
assert!(req.msg_send(msg_sent).is_ok());
});

let mut rep = c.socket(super::REP);
assert!(rep.bind("inproc://#1").is_ok());
let msg_recv = rep.msg_recv().unwrap();
assert_eq!(msg_recv.data, [65u8, 66u8, 67u8, 68u8].into_owned());
}

0 comments on commit f7f889b

Please sign in to comment.