Skip to content

Commit

Permalink
Fixed zeromq#22, refactor to remove peer.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix committed Jul 9, 2014
1 parent f7f889b commit e317b6a
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 219 deletions.
18 changes: 7 additions & 11 deletions src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use consts;
use inproc::InprocManager;
use rep;
use req;
use socket::ZmqSocket;
use socket_base::SocketBase;
use rep::RepSocket;
use req::ReqSocket;


pub struct Context {
Expand All @@ -16,16 +17,11 @@ impl Context {
}
}

pub fn socket(&self, type_: consts::SocketType) -> Box<SocketBase + Send> {
pub fn socket(&self, type_: consts::SocketType) -> Box<ZmqSocket + Send> {
let base = SocketBase::new(self.inproc_mgr.chan());
match type_ {
consts::REQ => {
let ret: ReqSocket = SocketBase::new(self.inproc_mgr.chan());
box ret as Box<SocketBase + Send>
},
consts::REP => {
let ret: RepSocket = SocketBase::new(self.inproc_mgr.chan());
box ret as Box<SocketBase + Send>
},
consts::REQ => box req::new(base) as Box<ZmqSocket + Send>,
consts::REP => box rep::new(base) as Box<ZmqSocket + Send>,
}
}
}
Expand Down
146 changes: 0 additions & 146 deletions src/peer.rs

This file was deleted.

40 changes: 23 additions & 17 deletions src/rep.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use consts;
use inproc::InprocCommand;
use msg;
use msg::Msg;
use peer::PeerManager;
use result::{ZmqError, ZmqResult};
use socket::ZmqSocket;
use socket_base::SocketBase;


Expand All @@ -15,32 +14,29 @@ enum State {


pub struct RepSocket {
pm: PeerManager,
base: SocketBase,
state: State,
last_identity: uint,
}

impl SocketBase for RepSocket {
fn new(chan: Sender<InprocCommand>) -> RepSocket {
RepSocket {
pm: PeerManager::new(chan),
state: Initial,
last_identity: 0,
}.init(consts::REP)

impl ZmqSocket for RepSocket {
fn getsockopt(&self, option: consts::SocketOption) -> int {
self.base.getsockopt(option)
}

fn pm<'a>(&'a self) -> &'a PeerManager {
&self.pm
fn bind(&self, addr: &str) -> ZmqResult<()> {
self.base.bind(addr)
}

fn pmut<'a>(&'a mut self) -> &'a mut PeerManager {
&mut self.pm
fn connect(&self, addr: &str) -> ZmqResult<()> {
self.base.connect(addr)
}

fn msg_recv(&mut self) -> ZmqResult<Box<Msg>> {
let (id, ret) = match self.state {
Initial => self.pm.recv_first(),
Receiving => (self.last_identity, self.pm.recv_from(self.last_identity)),
Initial => self.base.recv_first(),
Receiving => (self.last_identity, self.base.recv_from(self.last_identity)),
_ => return Err(ZmqError::new(
consts::EFSM, "Operation cannot be accomplished in current state")),
};
Expand All @@ -56,7 +52,7 @@ impl SocketBase for RepSocket {
self.state = match self.state {
Sending => {
let flags = msg.flags;
self.pm.send_to(self.last_identity, msg);
self.base.send_to(self.last_identity, msg);
match flags & msg::MORE {
0 => Initial,
_ => Sending,
Expand All @@ -70,6 +66,16 @@ impl SocketBase for RepSocket {
}


pub fn new(base: SocketBase) -> RepSocket {
base.set_type(consts::REP);
RepSocket {
base: base,
state: Initial,
last_identity: 0,
}
}


#[cfg(test)]
mod test {
use ctx::Context;
Expand Down
44 changes: 25 additions & 19 deletions src/req.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use consts;
use inproc::InprocCommand;
use msg;
use msg::Msg;
use peer::PeerManager;
use result::{ZmqError, ZmqResult};
use socket::ZmqSocket;
use socket_base::SocketBase;


Expand All @@ -15,34 +14,30 @@ enum State {


pub struct ReqSocket {
pm: PeerManager,
base: SocketBase,
state: State,
last_identity: uint,
send_count: uint,
}

impl SocketBase for ReqSocket {
fn new(chan: Sender<InprocCommand>) -> ReqSocket {
ReqSocket {
pm: PeerManager::new(chan),
state: Initial,
last_identity: 0,
send_count: 0,
}.init(consts::REQ)

impl ZmqSocket for ReqSocket {
fn getsockopt(&self, option: consts::SocketOption) -> int {
self.base.getsockopt(option)
}

fn pm<'a>(&'a self) -> &'a PeerManager {
&self.pm
fn bind(&self, addr: &str) -> ZmqResult<()> {
self.base.bind(addr)
}

fn pmut<'a>(&'a mut self) -> &'a mut PeerManager {
&mut self.pm
fn connect(&self, addr: &str) -> ZmqResult<()> {
self.base.connect(addr)
}

fn msg_recv(&mut self) -> ZmqResult<Box<Msg>> {
let ret = match self.state {
Receiving => {
self.pm.recv_from(self.last_identity)
self.base.recv_from(self.last_identity)
},
_ => return Err(ZmqError::new(
consts::EFSM, "Operation cannot be accomplished in current state")),
Expand All @@ -58,13 +53,13 @@ impl SocketBase for ReqSocket {
let flags = msg.flags;
match self.state {
Initial => {
let (count, id) = self.pm.round_robin(self.send_count);
let (count, id) = self.base.round_robin(self.send_count);
self.send_count = count;
self.pm.send_to(id, msg);
self.base.send_to(id, msg);
self.last_identity = id;
},
Sending => {
self.pm.send_to(self.last_identity, msg);
self.base.send_to(self.last_identity, msg);
},
_ => return Err(ZmqError::new(
consts::EFSM, "Operation cannot be accomplished in current state")),
Expand All @@ -78,6 +73,17 @@ impl SocketBase for ReqSocket {
}


pub fn new(base: SocketBase) -> ReqSocket {
base.set_type(consts::REQ);
ReqSocket {
base: base,
state: Initial,
last_identity: 0,
send_count: 0,
}
}


#[cfg(test)]
mod test {
use ctx::Context;
Expand Down
16 changes: 16 additions & 0 deletions src/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use consts;
use msg::Msg;
use result::ZmqResult;


pub trait ZmqSocket {
fn getsockopt(&self, option: consts::SocketOption) -> int;

fn bind(&self, addr: &str) -> ZmqResult<()>;

fn connect(&self, addr: &str) -> ZmqResult<()>;

fn msg_recv(&mut self) -> ZmqResult<Box<Msg>>;

fn msg_send(&mut self, msg: Box<Msg>) -> ZmqResult<()>;
}
Loading

0 comments on commit e317b6a

Please sign in to comment.