Skip to content

Commit

Permalink
fixes nanomsg#132 Implement saner notification for file descriptors
Browse files Browse the repository at this point in the history
This eliminates the "quasi-functional" notify API altogether.
The aio framework will be coming soon to replace it.

As a bonus, apps (legacy apps) that use the notification FDs
will see improved performance, since we don't have to context
switch to give them a notification.
  • Loading branch information
gdamore committed Oct 24, 2017
1 parent a7b20c3 commit 4250fc1
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 505 deletions.
2 changes: 0 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ set (NNG_SOURCES
core/device.h
core/endpt.c
core/endpt.h
core/event.c
core/event.h
core/idhash.c
core/idhash.h
core/init.c
Expand Down
27 changes: 0 additions & 27 deletions src/core/event.c

This file was deleted.

35 changes: 0 additions & 35 deletions src/core/event.h

This file was deleted.

74 changes: 32 additions & 42 deletions src/core/msgqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ struct nni_msgq {

nni_list mq_aio_putq;
nni_list mq_aio_getq;
nni_list mq_aio_notify_get;
nni_list mq_aio_notify_put;

// Callback - this function is executed with the lock held, and
// provides information about the current queue state anytime
// a message enters or leaves the queue, or a waiter is blocked.
nni_msgq_cb mq_cb_fn;
void * mq_cb_arg;

// Filters.
nni_msgq_filter mq_filter_fn;
Expand Down Expand Up @@ -63,9 +67,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)

nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_aio_list_init(&mq->mq_aio_notify_get);
nni_aio_list_init(&mq->mq_aio_notify_put);

nni_mtx_init(&mq->mq_lock);
nni_cv_init(&mq->mq_drained, &mq->mq_lock);

Expand Down Expand Up @@ -297,22 +298,25 @@ static void
nni_msgq_run_notify(nni_msgq *mq)
{
nni_aio *aio;
if (mq->mq_cb_fn != NULL) {
int flags = 0;

if (mq->mq_closed) {
return;
}
if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) {
NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
if (mq->mq_closed) {
flags |= nni_msgq_f_closed;
}
}

if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
if (mq->mq_len == 0) {
flags |= nni_msgq_f_empty;
} else if (mq->mq_len == mq->mq_cap) {
flags |= nni_msgq_f_full;
}
if (mq->mq_len < mq->mq_cap ||
!nni_list_empty(&mq->mq_aio_getq)) {
flags |= nni_msgq_f_can_put;
}
if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
flags |= nni_msgq_f_can_get;
}
mq->mq_cb_fn(mq->mq_cb_arg, flags);
}

if (mq->mq_draining) {
Expand All @@ -322,6 +326,16 @@ nni_msgq_run_notify(nni_msgq *mq)
}
}

void
nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg)
{
nni_mtx_lock(&mq->mq_lock);
mq->mq_cb_fn = fn;
mq->mq_cb_arg = arg;
nni_msgq_run_notify(mq);
nni_mtx_unlock(&mq->mq_lock);
}

static void
nni_msgq_cancel(nni_aio *aio, int rv)
{
Expand All @@ -335,30 +349,6 @@ nni_msgq_cancel(nni_aio *aio, int rv)
nni_mtx_unlock(&mq->mq_lock);
}

void
nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
return;
}
nni_aio_list_append(&mq->mq_aio_notify_put, aio);
nni_mtx_unlock(&mq->mq_lock);
}

void
nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
nni_mtx_unlock(&mq->mq_lock);
return;
}
nni_aio_list_append(&mq->mq_aio_notify_get, aio);
nni_mtx_unlock(&mq->mq_lock);
}

void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
Expand Down
24 changes: 22 additions & 2 deletions src/core/msgqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ extern void nni_msgq_fini(nni_msgq *);

extern void nni_msgq_aio_put(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);

// nni_msgq_tryput performs a non-blocking attempt to put a message on
// the message queue. It is the same as calling nng_msgq_put_until with
Expand Down Expand Up @@ -83,6 +81,28 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *);
// discarded instead, and any get waiters remain waiting.
extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *);

// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb.
enum nni_msgq_cb_flags {
nni_msgq_f_full = 1,
nni_msgq_f_empty = 2,
nni_msgq_f_can_get = 4,
nni_msgq_f_can_put = 8,
nni_msgq_f_closed = 16,
};

// nni_msgq_cb is a callback function used by sockets to monitor
// the status of the queue. It is called with the lock held for
// performance reasons so consumers must not re-enter the queue.
// The purpose is to enable file descriptor notifications on the socket,
// which don't need to reenter the msgq. The integer is a mask of
// flags that are true for the given message queue.
typedef void (*nni_msgq_cb)(void *, int);

// nni_msgq_set_cb sets the callback and argument for the callback
// which will be called on state changes in the message queue. Only
// one callback can be registered on a message queue at a time.
extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *);

// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
Expand Down
1 change: 0 additions & 1 deletion src/core/nng_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

// These have to come after the others - particularly transport.h
#include "core/endpt.h"
#include "core/event.h"
#include "core/pipe.h"
#include "core/socket.h"

Expand Down
61 changes: 1 addition & 60 deletions src/core/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,63 +220,4 @@ nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep)
memcpy(val, &len, sz);
*sizep = sizeof(len);
return (0);
}

static void
nni_notifyfd_push(struct nng_event *ev, void *arg)
{
nni_notifyfd *fd = arg;

NNI_ARG_UNUSED(ev);

nni_plat_pipe_raise(fd->sn_wfd);
}

int
nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp)
{
int rv;
uint32_t flags;

if ((*szp < sizeof(int))) {
return (NNG_EINVAL);
}

flags = nni_sock_flags(s);

switch (mask) {
case NNG_EV_CAN_SND:
if ((flags & NNI_PROTO_FLAG_SND) == 0) {
return (NNG_ENOTSUP);
}
break;
case NNG_EV_CAN_RCV:
if ((flags & NNI_PROTO_FLAG_RCV) == 0) {
return (NNG_ENOTSUP);
}
break;
default:
return (NNG_ENOTSUP);
}

// If we already inited this, just give back the same file descriptor.
if (fd->sn_init) {
memcpy(val, &fd->sn_rfd, sizeof(int));
*szp = sizeof(int);
return (0);
}

if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) {
return (rv);
}

if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
return (NNG_ENOMEM);
}

fd->sn_init = 1;
*szp = sizeof(int);
memcpy(val, &fd->sn_rfd, sizeof(int));
return (0);
}
}
3 changes: 0 additions & 3 deletions src/core/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t);
// nni_getopt_size obtains a size_t option.
extern int nni_getopt_size(size_t, void *, size_t *);

// nni_getopt_fd obtains a notification file descriptor.
extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *);

extern int nni_chkopt_ms(const void *, size_t);
extern int nni_chkopt_int(const void *, size_t, int, int);
extern int nni_chkopt_size(const void *, size_t, size_t, size_t);
Expand Down
Loading

0 comments on commit 4250fc1

Please sign in to comment.