Skip to content

Commit

Permalink
Improve UDP test coverage, fix numerous issues found.
Browse files Browse the repository at this point in the history
We introduced richer, deeper tests for UDP functionality.
These tests uncovered a number of issues which this commit fixes.

The Windows IOCP code needs to support multiple aios on a single
nni_win_event.  A redesign of the IOCP handling addresses that.

The POSIX UDP code also needed fixes; foremost among them is the
fact that the UDP file descriptor is not placed into non-blocking
mode, leading to potential hangs.

A number of race conditions and bugs along the implementation of
the above items were uncovered and fixed.  To the best of our knowledge
the current code is bug-free.
  • Loading branch information
gdamore committed Oct 6, 2017
1 parent 5579644 commit b0f31f5
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 157 deletions.
7 changes: 7 additions & 0 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,13 @@ nni_aio_list_append(nni_list *list, nni_aio *aio)
nni_list_append(list, aio);
}

void
nni_aio_list_prepend(nni_list *list, nni_aio *aio)
{
nni_aio_list_remove(aio);
nni_list_prepend(list, aio);
}

void
nni_aio_list_remove(nni_aio *aio)
{
Expand Down
1 change: 1 addition & 0 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ extern void nni_aio_wait(nni_aio *);
// and append will perform any necessary remove first.
extern void nni_aio_list_init(nni_list *);
extern void nni_aio_list_append(nni_list *, nni_aio *);
extern void nni_aio_list_prepend(nni_list *, nni_aio *);
extern void nni_aio_list_remove(nni_aio *);
extern int nni_aio_list_active(nni_aio *);

Expand Down
11 changes: 3 additions & 8 deletions src/platform/posix/posix_pollq_poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,11 @@ nni_posix_pollq_arm(nni_posix_pollq_node *node, int events)
// on the polled list. The polled list would be the case where
// the index is set to a positive value.
if ((oevents == 0) && (events != 0) && (node->index < 1)) {
if (nni_list_node_active(&node->node)) {
nni_list_node_remove(&node->node);
}
nni_list_node_remove(&node->node);
nni_list_append(&pq->armed, node);
}
if ((events != 0) && (oevents != events)) {
// Possibly wake up the poller since we're looking for
// new events.
// Possibly wake up poller since we're looking for new events.
if (pq->inpoll) {
nni_plat_pipe_raise(pq->wakewfd);
}
Expand All @@ -295,9 +292,7 @@ nni_posix_pollq_disarm(nni_posix_pollq_node *node, int events)
oevents = node->events;
node->events &= ~events;
if ((node->events == 0) && (oevents != 0)) {
if (nni_list_node_active(&node->node)) {
nni_list_node_remove(&node->node);
}
nni_list_node_remove(&node->node);
nni_list_append(&pq->idle, node);
}
// No need to wake anything, we might get a spurious wake up but
Expand Down
6 changes: 6 additions & 0 deletions src/platform/posix/posix_sockaddr.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ nni_posix_nn2sockaddr(void *sa, const nni_sockaddr *na)
const nng_sockaddr_path *nspath;
size_t sz;

if ((sa == NULL) || (na == NULL)) {
return (-1);
}
switch (na->s_un.s_family) {
case NNG_AF_INET:
sin = (void *) sa;
Expand Down Expand Up @@ -80,6 +83,9 @@ nni_posix_sockaddr2nn(nni_sockaddr *na, const void *sa)
nng_sockaddr_in6 * nsin6;
nng_sockaddr_path * nspath;

if ((na == NULL) || (sa == NULL)) {
return (-1);
}
switch (((struct sockaddr *) sa)->sa_family) {
case AF_INET:
sin = (void *) sa;
Expand Down
142 changes: 50 additions & 92 deletions src/platform/posix/posix_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
struct nni_plat_udp {
nni_posix_pollq_node udp_pitem;
int udp_fd;
int udp_closed;
nni_list udp_recvq;
nni_list udp_sendq;
nni_mtx udp_mtx;
Expand All @@ -48,7 +47,6 @@ nni_posix_udp_doclose(nni_plat_udp *udp)
{
nni_aio *aio;

udp->udp_closed = 1;
while (((aio = nni_list_first(&udp->udp_recvq)) != NULL) ||
((aio = nni_list_first(&udp->udp_sendq)) != NULL)) {
nni_aio_list_remove(aio);
Expand All @@ -62,106 +60,89 @@ nni_posix_udp_dorecv(nni_plat_udp *udp)
{
nni_aio * aio;
nni_list *q = &udp->udp_recvq;

// While we're able to recv, do so.
while ((aio = nni_list_first(q)) != NULL) {
nni_list_remove(q, aio);
struct iovec iov[4]; // never have more than 4
int niov;
struct sockaddr_storage ss;
struct msghdr hdr;
int rv;
int rv = 0;
int cnt = 0;

hdr.msg_iov = iov;
for (niov = 0; niov < aio->a_niov; niov++) {
iov[niov].iov_base = aio->a_iov[niov].iov_buf;
iov[niov].iov_len = aio->a_iov[niov].iov_len;
}
hdr.msg_iov = iov;
hdr.msg_iovlen = niov;
hdr.msg_name = &ss;
hdr.msg_namelen = sizeof(ss);
hdr.msg_flags = 0;
hdr.msg_control = NULL;
hdr.msg_controllen = 0;
rv = recvmsg(udp->udp_fd, &hdr, 0);
if (rv < 0) {

if ((cnt = recvmsg(udp->udp_fd, &hdr, 0)) < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// No data available at socket. Return
// the AIO to the head of the queue.
nni_list_prepend(q, aio);
// No data available at socket. Leave
// the AIO at the head of the queue.
return;
}
rv = nni_plat_errno(errno);
nni_aio_finish_error(aio, rv);
continue;
}

// We need to store the address information.
// It is incumbent on the AIO submitter to supply
// storage for the address.
if (aio->a_addr != NULL) {
} else if (aio->a_addr != NULL) {
// We need to store the address information.
// It is incumbent on the AIO submitter to supply
// storage for the address.
nni_posix_sockaddr2nn(aio->a_addr, (void *) &ss);
}

nni_aio_finish(aio, 0, rv);
nni_list_remove(q, aio);
nni_aio_finish(aio, rv, cnt);
}
}

static void
nni_posix_udp_dosend(nni_plat_udp *udp)
{
// XXX: TBD.
nni_aio * aio;
nni_list *q = &udp->udp_sendq;
int x = 0;

// While we're able to send, do so.
while ((aio = nni_list_first(q)) != NULL) {
struct sockaddr_storage ss;
struct msghdr hdr;
struct iovec iov[4];
int niov;
int rv;
int len;

nni_list_remove(q, aio);

if (aio->a_addr == NULL) {
// No outgoing address?
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
}
len = nni_posix_nn2sockaddr(&ss, aio->a_addr);
if (len < 0) {
nni_aio_finish_error(aio, NNG_EADDRINVAL);
return;
}

hdr.msg_iov = iov;
for (niov = 0; niov < aio->a_niov; niov++) {
iov[niov].iov_base = aio->a_iov[niov].iov_buf;
iov[niov].iov_len = aio->a_iov[niov].iov_len;
}
hdr.msg_iovlen = niov;
hdr.msg_name = &ss;
hdr.msg_namelen = len;
hdr.msg_flags = NNI_MSG_NOSIGNAL;
hdr.msg_control = NULL;
hdr.msg_controllen = 0;

rv = sendmsg(udp->udp_fd, &hdr, 0);
if (rv < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// Cannot send (buffers full), return to
// head of queue.
nni_list_prepend(q, aio);
return;
int rv = 0;
int cnt = 0;

if ((len = nni_posix_nn2sockaddr(&ss, aio->a_addr)) < 0) {
rv = NNG_EADDRINVAL;
} else {
for (niov = 0; niov < aio->a_niov; niov++) {
iov[niov].iov_base = aio->a_iov[niov].iov_buf;
iov[niov].iov_len = aio->a_iov[niov].iov_len;
}
hdr.msg_iov = iov;
hdr.msg_iovlen = niov;
hdr.msg_name = &ss;
hdr.msg_namelen = len;
hdr.msg_flags = NNI_MSG_NOSIGNAL;
hdr.msg_control = NULL;
hdr.msg_controllen = 0;

if ((cnt = sendmsg(udp->udp_fd, &hdr, 0)) < 0) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
// Cannot send now, leave at head.
return;
}
rv = nni_plat_errno(errno);
}
rv = nni_plat_errno(errno);
nni_aio_finish_error(aio, rv);
continue;
}

nni_aio_finish(aio, 0, rv);
nni_list_remove(q, aio);
nni_aio_finish(aio, rv, cnt);
}
}

Expand Down Expand Up @@ -234,6 +215,8 @@ nni_plat_udp_open(nni_plat_udp **upp, nni_sockaddr *bindaddr)
udp->udp_pitem.cb = nni_posix_udp_cb;
udp->udp_pitem.data = udp;

(void) fcntl(udp->udp_fd, F_SETFL, O_NONBLOCK);

nni_aio_list_init(&udp->udp_recvq);
nni_aio_list_init(&udp->udp_sendq);

Expand All @@ -255,17 +238,10 @@ nni_plat_udp_close(nni_plat_udp *udp)
{
nni_aio *aio;

nni_mtx_lock(&udp->udp_mtx);
if (udp->udp_closed) {
// The only way this happens is in response to a callback that
// is being canceled. Double close from user code is a bug.
nni_mtx_unlock(&udp->udp_mtx);
return;
}

// We're no longer interested in events.
nni_posix_pollq_remove(&udp->udp_pitem);

nni_mtx_lock(&udp->udp_mtx);
nni_posix_udp_doclose(udp);
nni_mtx_unlock(&udp->udp_mtx);

Expand All @@ -291,39 +267,21 @@ void
nni_plat_udp_recv(nni_plat_udp *udp, nni_aio *aio)
{
nni_mtx_lock(&udp->udp_mtx);
if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) {
nni_mtx_unlock(&udp->udp_mtx);
return;
}

if (udp->udp_closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&udp->udp_mtx);
return;
if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
nni_list_append(&udp->udp_recvq, aio);
nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
}

nni_list_append(&udp->udp_recvq, aio);
nni_posix_pollq_arm(&udp->udp_pitem, POLLIN);
nni_mtx_unlock(&udp->udp_mtx);
}

void
nni_plat_udp_send(nni_plat_udp *udp, nni_aio *aio)
{
nni_mtx_lock(&udp->udp_mtx);
if (nni_aio_start(aio, nni_plat_udp_cancel, udp) != 0) {
nni_mtx_unlock(&udp->udp_mtx);
return;
if (nni_aio_start(aio, nni_plat_udp_cancel, udp) == 0) {
nni_list_append(&udp->udp_sendq, aio);
nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
}

if (udp->udp_closed) {
nni_aio_finish_error(aio, NNG_ECLOSED);
nni_mtx_unlock(&udp->udp_mtx);
return;
}

nni_list_append(&udp->udp_sendq, aio);
nni_posix_pollq_arm(&udp->udp_pitem, POLLOUT);
nni_mtx_unlock(&udp->udp_mtx);
}

Expand Down
13 changes: 9 additions & 4 deletions src/platform/windows/win_debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ nni_plat_errno(int errnum)
static struct {
int win_err;
int nng_err;
} nni_win_errnos[] = {
// clang-format off
} nni_win_errnos[] =
{
// clang-format off
{ ERROR_FILE_NOT_FOUND, NNG_ENOENT },
{ ERROR_ACCESS_DENIED, NNG_EPERM },
{ ERROR_INVALID_HANDLE, NNG_ECLOSED },
Expand Down Expand Up @@ -114,6 +115,10 @@ static struct {
{ WSAENOPROTOOPT, NNG_ENOTSUP },
{ WSAEPROTONOSUPPORT, NNG_ENOTSUP },
{ WSAEPROTONOSUPPORT, NNG_ENOTSUP },
{ WSAESOCKTNOSUPPORT, NNG_ENOTSUP },
{ WSAEOPNOTSUPP, NNG_ENOTSUP },
{ WSAEPFNOSUPPORT, NNG_ENOTSUP },
{ WSAEAFNOSUPPORT, NNG_ENOTSUP },
{ WSAEADDRINUSE, NNG_EADDRINUSE },
{ WSAEADDRNOTAVAIL, NNG_EADDRINVAL },
{ WSAENETDOWN, NNG_EUNREACHABLE },
Expand All @@ -137,8 +142,8 @@ static struct {

// Must be Last!!
{ 0, 0 },
// clang-format on
};
// clang-format on
};

// This converts a Windows API error (from GetLastError()) to an
// nng standard error code.
Expand Down
4 changes: 3 additions & 1 deletion src/platform/windows/win_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ struct nni_win_event_ops {
struct nni_win_event {
OVERLAPPED olpd;
void * ptr;
nni_aio * aio;
nni_mtx mtx;
nni_cv cv;
unsigned run : 1;
unsigned fini : 1;
unsigned closed : 1;
unsigned count;
int status;
nni_list aios;
nni_aio * active;
nni_win_event_ops ops;
};

Expand Down
Loading

0 comments on commit b0f31f5

Please sign in to comment.