Skip to content

Commit

Permalink
add proxy control flow and test
Browse files Browse the repository at this point in the history
  • Loading branch information
lalebarde committed Oct 18, 2013
1 parent 3b628fd commit 1b75d1e
Show file tree
Hide file tree
Showing 8 changed files with 344 additions and 9 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Joe Thornber <[email protected]>
Jon Dyte <[email protected]>
Kamil Shakirov <[email protected]>
Ken Steele <[email protected]>
Laurent Alebarde <[email protected]>
Marc Rossi <[email protected]>
Martin Hurton <[email protected]>
Martin Lucina <[email protected]>
Expand Down
2 changes: 1 addition & 1 deletion include/zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);

/* Built-in message proxy (3-way) */

ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture, void *control);

/* Encode a binary key as printable text using ZMQ RFC 32 */
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
Expand Down
56 changes: 50 additions & 6 deletions src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_)
class socket_base_t *capture_,
class socket_base_t *control_)
{
msg_t msg;
int rc = msg.init ();
Expand All @@ -71,16 +72,59 @@ int zmq::proxy (
size_t moresz;
zmq_pollitem_t items [] = {
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 }
{ backend_, 0, ZMQ_POLLIN, 0 },
{ control_, 0, ZMQ_POLLIN, 0 }
};
while (true) {
int qt_poll_items = (control_ ? 3 : 2);
enum {suspend, resume, terminate} state = resume;
while (state != terminate) {
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], 2, -1);
rc = zmq_poll (&items [0], qt_poll_items, -1);
if (unlikely (rc < 0))
return -1;

// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;

moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more)
return -1;

// Copy message to capture socket if any
if (capture_) {
msg_t ctrl;
rc = ctrl.init ();
if (unlikely (rc < 0))
return -1;
rc = ctrl.copy (msg);
if (unlikely (rc < 0))
return -1;
rc = capture_->send (&ctrl, 0);
if (unlikely (rc < 0))
return -1;
}

// process control command
int size = msg.size();
char* message = (char*) malloc(size + 1);
memcpy(message, msg.data(), size);
message[size] = '\0';
if (size == 8 && !memcmp(message, "SUSPEND", 8))
state = suspend;
else if (size == 7 && !memcmp(message, "RESUME", 7))
state = resume;
else if (size == 10 && !memcmp(message, "TERMINATE", 10))
state = terminate;
else
fprintf(stderr, "Warning : \"%s\" bad command received by proxy\n", message); // prefered compared to "return -1"
free (message);
}
// Process a request
if (items [0].revents & ZMQ_POLLIN) {
if (state == resume && items [0].revents & ZMQ_POLLIN) {
while (true) {
rc = frontend_->recv (&msg, 0);
if (unlikely (rc < 0))
Expand Down Expand Up @@ -112,7 +156,7 @@ int zmq::proxy (
}
}
// Process a reply
if (items [1].revents & ZMQ_POLLIN) {
if (state == resume && items [1].revents & ZMQ_POLLIN) {
while (true) {
rc = backend_->recv (&msg, 0);
if (unlikely (rc < 0))
Expand Down
3 changes: 2 additions & 1 deletion src/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace zmq
int proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *control_);
class socket_base_t *capture_,
class socket_base_t *control_ = NULL); // backward compatibility without this argument
}

#endif
3 changes: 2 additions & 1 deletion src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)

// The proxy functionality

int zmq_proxy (void *frontend_, void *backend_, void *control_)
int zmq_proxy (void *frontend_, void *backend_, void *capture_, void *control_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
Expand All @@ -1025,6 +1025,7 @@ int zmq_proxy (void *frontend_, void *backend_, void *control_)
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_);
}

Expand Down
2 changes: 2 additions & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ noinst_PROGRAMS = test_system \
test_conflate \
test_inproc_connect \
test_issue_566 \
test_proxy \
test_abstract_ipc

if !ON_MINGW
Expand Down Expand Up @@ -85,6 +86,7 @@ test_req_relaxed_SOURCES = test_req_relaxed.cpp
test_conflate_SOURCES = test_conflate.cpp
test_inproc_connect_SOURCES = test_inproc_connect.cpp
test_issue_566_SOURCES = test_issue_566.cpp
test_proxy_SOURCES = test_proxy.cpp
test_abstract_ipc_SOURCES = test_abstract_ipc.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
Expand Down
Loading

0 comments on commit 1b75d1e

Please sign in to comment.