Skip to content

Commit

Permalink
Fixed zeromq#13, implemented inproc protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix committed Jul 6, 2014
1 parent 6b65e1f commit e569c9c
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 22 deletions.
54 changes: 45 additions & 9 deletions src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use consts;
use socket_base::SocketBase;
use socket_base::{SocketBase, SocketMessage, OnConnected};
use rep::RepSocket;
use req::ReqSocket;
use result::ZmqResult;

use std::collections::HashMap;


pub struct Context {
starting: bool,
terminating: bool,
inproc_binders: HashMap<String, Sender<ZmqResult<SocketMessage>>>,
inproc_connecters: HashMap<String, Vec<Sender<ZmqResult<SocketMessage>>>>,
}

impl Context {
pub fn new() -> Context {
Context {
starting: true,
terminating: false,
inproc_binders: HashMap::new(),
inproc_connecters: HashMap::new(),
}
}

pub fn socket(&self, type_: consts::SocketType) -> Box<SocketBase> {
pub fn socket(&mut self, type_: consts::SocketType) -> Box<SocketBase> {
match type_ {
consts::REQ => {
let ret: ReqSocket = SocketBase::new(self);
Expand All @@ -29,6 +32,41 @@ impl Context {
},
}
}

pub fn _bind_inproc(&mut self, address: &str, tx: Sender<ZmqResult<SocketMessage>>) {
let key = String::from_str(address);

if self.inproc_binders.contains_key(&key) {
// TODO: return error
fail!();
}

if self.inproc_connecters.contains_key(&key) {
let connecters = self.inproc_connecters.get(&key);
for connecter_tx in connecters.iter() {
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
connecter_tx.send(Ok(OnConnected(tx1, rx2)));
tx.send(Ok(OnConnected(tx2, rx1)));
}
}

self.inproc_binders.insert(key.clone(), tx);
}

pub fn _connect_inproc(&mut self, address: &str, tx: Sender<ZmqResult<SocketMessage>>) {
let key = String::from_str(address);

if self.inproc_binders.contains_key(&key) {
let binder_tx = self.inproc_binders.get(&key);
let (tx1, rx1) = channel();
let (tx2, rx2) = channel();
binder_tx.send(Ok(OnConnected(tx1, rx2)));
tx.send(Ok(OnConnected(tx2, rx1)));
}

self.inproc_connecters.find_or_insert(key.clone(), vec!()).push(tx);
}
}


Expand All @@ -38,8 +76,6 @@ mod test {

#[test]
fn test_new() {
let ctx = Context::new();
assert_eq!(ctx.starting, true);
Context::new();
}
}

10 changes: 7 additions & 3 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ enum State {


pub struct RepSocket<'s> {
ctx: &'s Context,
ctx: &'s mut Context,
pm: PeerManager,
state: State,
last_identity: uint,
}

impl<'s> SocketBase<'s> for RepSocket<'s> {
fn new(ctx: &'s Context) -> RepSocket<'s> {
fn new(ctx: &'s mut Context) -> RepSocket<'s> {
RepSocket {
ctx: ctx,
pm: PeerManager::new(),
Expand All @@ -39,6 +39,10 @@ impl<'s> SocketBase<'s> for RepSocket<'s> {
&mut self.pm
}

fn ctx<'a>(&'a mut self) -> &'a mut Context {
&mut *self.ctx
}

fn msg_recv(&mut self) -> ZmqResult<Box<Msg>> {
let (id, ret) = match self.state {
Initial => self.pm.recv_first(),
Expand Down Expand Up @@ -80,7 +84,7 @@ mod test {

#[test]
fn test_fsm() {
let ctx = Context::new();
let mut ctx = Context::new();
let mut s = ctx.socket(consts::REP);
let msg = box Msg::new(1);
assert_eq!(s.msg_send(msg).unwrap_err().code, consts::EFSM);
Expand Down
10 changes: 7 additions & 3 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ enum State {


pub struct ReqSocket<'s> {
ctx: &'s Context,
ctx: &'s mut Context,
pm: PeerManager,
state: State,
last_identity: uint,
send_count: uint,
}

impl<'s> SocketBase<'s> for ReqSocket<'s> {
fn new(ctx: &'s Context) -> ReqSocket<'s> {
fn new(ctx: &'s mut Context) -> ReqSocket<'s> {
ReqSocket {
ctx: ctx,
pm: PeerManager::new(),
Expand All @@ -41,6 +41,10 @@ impl<'s> SocketBase<'s> for ReqSocket<'s> {
&mut self.pm
}

fn ctx<'a>(&'a mut self) -> &'a mut Context {
&mut *self.ctx
}

fn msg_recv(&mut self) -> ZmqResult<Box<Msg>> {
let ret = match self.state {
Receiving => {
Expand Down Expand Up @@ -87,7 +91,7 @@ mod test {

#[test]
fn test_fsm() {
let ctx = Context::new();
let mut ctx = Context::new();
let mut s = ctx.socket(consts::REQ);
assert_eq!(s.msg_recv().unwrap_err().code, consts::EFSM);
}
Expand Down
20 changes: 17 additions & 3 deletions src/socket_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ pub enum SocketMessage {


pub trait SocketBase<'s> {
fn new(ctx: &'s Context) -> Self;
fn new(ctx: &'s mut Context) -> Self;

fn pm<'a>(&'a self) -> &'a PeerManager;

fn pmut<'a>(&'a mut self) -> &'a mut PeerManager;

fn ctx<'a>(&'a mut self) -> &'a mut Context;

fn init(self, type_: consts::SocketType) -> Self {
self.pm().options.write().type_ = type_ as int;
self
Expand All @@ -45,7 +47,13 @@ pub trait SocketBase<'s> {
}
None => Err(ZmqError::new(
consts::EINVAL, "Invaid argument: bad address")),
}},
}
},
"inproc" => {
let tx = self.pm().tx.clone();
self.ctx()._bind_inproc(address, tx);
Ok(())
},
_ => Err(ZmqError::new(consts::EPROTONOSUPPORT, "Protocol not supported")),
}
}
Expand All @@ -62,7 +70,13 @@ pub trait SocketBase<'s> {
}
None => Err(ZmqError::new(
consts::EINVAL, "Invaid argument: bad address")),
}},
}
},
"inproc" => {
let tx = self.pm().tx.clone();
self.ctx()._connect_inproc(address, tx);
Ok(())
},
_ => Err(ZmqError::new(consts::EPROTONOSUPPORT, "Protocol not supported")),
}
}
Expand Down
24 changes: 20 additions & 4 deletions src/zmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ mod test {

#[test]
fn test_socket_create() {
let c = super::Context::new();
let mut c = super::Context::new();
let s = c.socket(super::REQ);
assert_eq!(s.getsockopt(super::TYPE), super::REQ as int);
}

#[test]
fn test_socket_bind() {
let c = super::Context::new();
let mut c = super::Context::new();
let mut s = c.socket(super::REQ);
assert_eq!(s.bind("").unwrap_err().code, super::EINVAL);
assert_eq!(s.bind("://127").unwrap_err().code, super::EINVAL);
Expand All @@ -62,7 +62,7 @@ mod test {

#[test]
fn test_socket_connect() {
let c = super::Context::new();
let mut c = super::Context::new();
let mut s = c.socket(super::REQ);
assert_eq!(s.connect("").unwrap_err().code, super::EINVAL);
assert_eq!(s.connect("://127").unwrap_err().code, super::EINVAL);
Expand All @@ -75,7 +75,7 @@ mod test {

#[test]
fn test_socket_small_message() {
let c = super::Context::new();
let mut c = super::Context::new();
let mut req = c.socket(super::REQ);
let mut rep = c.socket(super::REP);
assert!(rep.bind("tcp://127.0.0.1:12347").is_ok());
Expand All @@ -88,4 +88,20 @@ mod test {
let msg_recv = rep.msg_recv().unwrap();
assert_eq!(msg_recv.data, [65u8, 66u8, 67u8, 68u8].into_owned());
}

#[test]
fn test_inproc() {
let mut c = super::Context::new();
let mut req = c.socket(super::REQ);
let mut rep = c.socket(super::REP);
assert!(req.connect("inproc://#1").is_ok());
assert!(rep.bind("inproc://#1").is_ok());

let mut msg_sent = box super::Msg::new(4);
msg_sent.data.push_all([65u8, 66u8, 67u8, 68u8]);
assert!(req.msg_send(msg_sent).is_ok());

let msg_recv = rep.msg_recv().unwrap();
assert_eq!(msg_recv.data, [65u8, 66u8, 67u8, 68u8].into_owned());
}
}

0 comments on commit e569c9c

Please sign in to comment.