Skip to content

Commit

Permalink
fixes nanomsg#112 Need to move some stuff from socket to message queues
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Oct 23, 2017
1 parent fdb73b6 commit 3585000
Show file tree
Hide file tree
Showing 17 changed files with 537 additions and 302 deletions.
113 changes: 65 additions & 48 deletions src/core/msgqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ struct nni_msgq {
int mq_puterr;
int mq_geterr;
int mq_draining;
int mq_besteffort;
nni_msg **mq_msgs;

nni_list mq_aio_putq;
nni_list mq_aio_getq;
nni_list mq_aio_notify_get;
nni_list mq_aio_notify_put;

// Filters.
nni_msgq_filter mq_filter_fn;
void * mq_filter_arg;
};

int
Expand Down Expand Up @@ -157,6 +162,13 @@ nni_msgq_set_error(nni_msgq *mq, int error)
nni_mtx_unlock(&mq->mq_lock);
}

void
nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg)
{
mq->mq_filter_fn = filter;
mq->mq_filter_arg = arg;
}

static void
nni_msgq_run_putq(nni_msgq *mq)
{
Expand All @@ -173,12 +185,19 @@ nni_msgq_run_putq(nni_msgq *mq)
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
nni_aio_set_msg(waio, NULL);

nni_aio_list_remove(raio);
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);

if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
}

nni_aio_finish(waio, 0, len);
nni_aio_finish_msg(raio, msg);
continue;
}

Expand All @@ -195,40 +214,76 @@ nni_msgq_run_putq(nni_msgq *mq)
continue;
}

// If we are in best effort mode, just drop the message
// as if we delivered.
if (mq->mq_besteffort) {
nni_list_remove(&mq->mq_aio_putq, waio);
nni_aio_set_msg(waio, NULL);
nni_msg_free(msg);
nni_aio_finish(waio, 0, len);
continue;
}

// Unable to make progress, leave the aio where it is.
break;
}
}

void
nni_msgq_set_best_effort(nni_msgq *mq, int on)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_besteffort = on;
if (on) {
nni_msgq_run_putq(mq);
}
nni_mtx_unlock(&mq->mq_lock);
}

static void
nni_msgq_run_getq(nni_msgq *mq)
{
nni_aio *raio;
nni_aio *waio;
nni_msg *msg;

while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
msg = mq->mq_msgs[mq->mq_get++];
nni_msg *msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);

if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
}
continue;
}

// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
nni_msg *msg;
size_t len;
msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);

nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
nni_aio_list_remove(raio);

nni_aio_finish(waio, 0, nni_msg_len(msg));
nni_aio_finish_msg(raio, msg);
if (mq->mq_filter_fn != NULL) {
msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
}
if (msg != NULL) {
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
}

nni_aio_finish(waio, 0, len);
continue;
}

Expand Down Expand Up @@ -393,44 +448,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_EAGAIN);
}

int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
nni_aio *aio;
int rv;

if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
return (rv);
}
nni_aio_set_timeout(aio, expire);
nni_msgq_aio_get(mq, aio);
nni_aio_wait(aio);
if ((rv = nni_aio_result(aio)) == 0) {
*msgp = nni_aio_get_msg(aio);
nni_aio_set_msg(aio, NULL);
}
nni_aio_fini(aio);
return (rv);
}

int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
nni_aio *aio;
int rv;

if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
return (rv);
}
nni_aio_set_timeout(aio, expire);
nni_aio_set_msg(aio, msg);
nni_msgq_aio_put(mq, aio);
nni_aio_wait(aio);
rv = nni_aio_result(aio);
nni_aio_fini(aio);
return (rv);
}

void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
Expand Down
36 changes: 19 additions & 17 deletions src/core/msgqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,11 @@ extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);

// nni_msgq_put puts the message to the queue. It blocks until it
// was able to do so, or the queue is closed, or a timeout is reached.
// It returns 0 on success, NNG_ECLOSED if the queue was closed, or
// NNG_ETIMEDOUT if the timeout is reached. If an error is returned,
// the caller is responsible for freeing the message with nni_msg_free(),
// otherwise the message is "owned" by the queue, and the caller is not
// permitted to access it further.
extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time);

// nni_msgq_tryput performs a non-blocking attempt to put a message on
// the message queue. It is the same as calling nng_msgq_put_until with
// a zero time.
extern int nni_msgq_tryput(nni_msgq *, nni_msg *);

// nni_msgq_get_until gets a message from the queue. It blocks until a
// message is available, the queue is closed, or time out is reached.
// It returns 0 on success, NNG_ECLOSED if the queue was closed, or
// NNG_ETIMEDOUT if the timeout is reached. On successful return,
// the caller assumes ownership of the message and must call
// nni_msg_free() when it is finished with it.
extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time);

// nni_msgq_set_error sets an error condition on the message queue,
// which causes all current and future readers/writes to return the
// given error condition (if non-zero). Threads waiting to put or get
Expand All @@ -81,6 +64,25 @@ extern void nni_msgq_set_put_error(nni_msgq *, int);
// Readers (nni_msgq_put*) are unaffected.
extern void nni_msgq_set_get_error(nni_msgq *, int);

// nni_msgq_set_best_effort marks the message queue best effort on send.
// What this does is treat the message queue condition as if it were
// successful, returning 0, and discarding the message. If zero is
// passed then this mode is reset to normal.
extern void nni_msgq_set_best_effort(nni_msgq *, int);

// nni_msgq_filter is a callback function used to filter messages.
// The function is called on entry (put) or exit (get). The void
// argument is an opaque pointer supplied with the function at registration
// time. The primary use for these functions is to support the protocol
// socket needs.
typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *);

// nni_msgq_set_filter sets the filter on the queue. Messages
// are filtered through this just before they are returned via the get
// functions. If the filter returns NULL, then the message is silently
// discarded instead, and any get waiters remain waiting.
extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *);

// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
Expand Down
16 changes: 9 additions & 7 deletions src/core/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ struct nni_proto_sock_ops {
// it can signal the socket worker threads to exit.
void (*sock_close)(void *);

// Receive filter. This may be NULL, but if it isn't, then
// Send a message.
void (*sock_send)(void *, nni_aio *);

// Receive a message.
void (*sock_recv)(void *, nni_aio *);

// Message filter. This may be NULL, but if it isn't, then
// messages coming into the system are routed here just before being
// delivered to the application. To drop the message, the prtocol
// delivered to the application. To drop the message, the protocol
// should return NULL, otherwise the message (possibly modified).
nni_msg *(*sock_rfilter)(void *, nni_msg *);

// Send filter. This may be NULL, but if it isn't, then messages
// here are filtered just after they come from the application.
nni_msg *(*sock_sfilter)(void *, nni_msg *);
nni_msg *(*sock_filter)(void *, nni_msg *);

// Options. Must not be NULL. Final entry should have NULL name.
nni_proto_sock_option *sock_options;
Expand Down
Loading

0 comments on commit 3585000

Please sign in to comment.