Skip to content

Commit

Permalink
adds server socket type and routing id to msg
Browse files Browse the repository at this point in the history
  • Loading branch information
somdoron committed Feb 1, 2015
1 parent 9826a7b commit 5632b57
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 10 deletions.
8 changes: 7 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ src_libzmq_la_SOURCES = \
src/router.hpp \
src/select.cpp \
src/select.hpp \
src/server.cpp \
src/server.hpp \
src/session_base.cpp \
src/session_base.hpp \
src/signaler.cpp \
Expand Down Expand Up @@ -338,7 +340,8 @@ test_apps = \
tests/test_xpub_nodrop \
tests/test_xpub_manual \
tests/test_xpub_welcome_msg \
tests/test_atomics
tests/test_atomics \
tests/test_server

tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
Expand Down Expand Up @@ -510,6 +513,9 @@ tests_test_xpub_welcome_msg_LDADD = src/libzmq.la
tests_test_atomics_SOURCES = tests/test_atomics.cpp
tests_test_atomics_LDADD = src/libzmq.la

tests_test_server_SOURCES = tests/test_server.cpp
tests_test_server_LDADD = src/libzmq.la

if !ON_MINGW
if !ON_CYGWIN
test_apps += \
Expand Down
3 changes: 3 additions & 0 deletions include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ ZMQ_EXPORT int zmq_msg_more (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
ZMQ_EXPORT int zmq_msg_set_routing_id(zmq_msg_t *msg, uint32_t routing_id);
ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);


/******************************************************************************/
Expand All @@ -236,6 +238,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
#define ZMQ_STREAM 11
#define ZMQ_SERVER 12

/* Deprecated aliases */
#define ZMQ_XREQ ZMQ_DEALER
Expand Down
8 changes: 5 additions & 3 deletions src/mechanism.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
{
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
"DEALER", "ROUTER", "PULL", "PUSH",
"XPUB", "XSUB", "STREAM"};
zmq_assert (socket_type >= 0 && socket_type <= 10);
"XPUB", "XSUB", "STREAM", "SERVER"};
zmq_assert (socket_type >= 0 && socket_type <= 12);
return names [socket_type];
}

Expand Down Expand Up @@ -160,7 +160,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
case ZMQ_REP:
return type_ == "REQ" || type_ == "DEALER";
case ZMQ_DEALER:
return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER";
return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER" || type_ == "SERVER";
case ZMQ_ROUTER:
return type_ == "REQ" || type_ == "DEALER" || type_ == "ROUTER";
case ZMQ_PUSH:
Expand All @@ -177,6 +177,8 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
return type_ == "PUB" || type_ == "XPUB";
case ZMQ_PAIR:
return type_ == "PAIR";
case ZMQ_SERVER:
return type_ == "DEALER";
default:
break;
}
Expand Down
17 changes: 17 additions & 0 deletions src/msg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ int zmq::msg_t::init ()
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = 0;
u.vsm.routing_id = 0;
file_desc = -1;
return 0;
}
Expand All @@ -58,11 +59,13 @@ int zmq::msg_t::init_size (size_t size_)
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = (unsigned char) size_;
u.vsm.routing_id = 0;
}
else {
u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.routing_id = 0;
u.lmsg.content =
(content_t*) malloc (sizeof (content_t) + size_);
if (unlikely (!u.lmsg.content)) {
Expand Down Expand Up @@ -95,11 +98,13 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u.cmsg.flags = 0;
u.cmsg.data = data_;
u.cmsg.size = size_;
u.cmsg.routing_id = 0;
}
else {
u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.routing_id = 0;
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) {
errno = ENOMEM;
Expand All @@ -121,6 +126,7 @@ int zmq::msg_t::init_delimiter ()
u.delimiter.metadata = NULL;
u.delimiter.type = type_delimiter;
u.delimiter.flags = 0;
u.delimiter.routing_id = 0;
return 0;
}

Expand Down Expand Up @@ -377,3 +383,14 @@ bool zmq::msg_t::rm_refs (int refs_)

return true;
}

uint32_t zmq::msg_t::get_routing_id()
{
return u.base.routing_id;
}

int zmq::msg_t::set_routing_id(uint32_t routing_id_)
{
u.base.routing_id = routing_id_;
return 0;
}
17 changes: 12 additions & 5 deletions src/msg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ namespace zmq
bool is_delimiter () const;
bool is_vsm ();
bool is_cmsg ();
uint32_t get_routing_id();
int set_routing_id(uint32_t routing_id_);

// After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy.
Expand All @@ -93,7 +95,7 @@ namespace zmq
// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3) };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) };

// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
Expand Down Expand Up @@ -136,38 +138,43 @@ namespace zmq
union {
struct {
metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} base;
struct {
metadata_t *metadata;
unsigned char data [max_vsm_size];
unsigned char size;
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} vsm;
struct {
metadata_t *metadata;
content_t *content;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2)];
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} lmsg;
struct {
metadata_t *metadata;
void* data;
size_t size;
unsigned char unused
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2)];
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} cmsg;
struct {
metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} delimiter;
} u;
};
Expand Down
10 changes: 10 additions & 0 deletions src/pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}

void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
{
routing_id = routing_id_;
}

uint32_t zmq::pipe_t::get_routing_id ()
{
return routing_id;
}

void zmq::pipe_t::set_identity (const blob_t &identity_)
{
identity = identity_;
Expand Down
7 changes: 7 additions & 0 deletions src/pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ namespace zmq
// Specifies the object to send events to.
void set_event_sink (i_pipe_events *sink_);

// Pipe endpoint can store an routing ID to be used by its clients.
void set_routing_id(uint32_t routing_id_);
uint32_t get_routing_id();

// Pipe endpoint can store an opaque ID to be used by its clients.
void set_identity (const blob_t &identity_);
blob_t get_identity ();
Expand Down Expand Up @@ -204,6 +208,9 @@ namespace zmq
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;

// Identity of the writer. Used uniquely by the reader side.
int routing_id;

// Pipe's credential.
blob_t credential;

Expand Down
Loading

0 comments on commit 5632b57

Please sign in to comment.