Skip to content

Commit

Permalink
unix_msg: introduce send queue caching
Browse files Browse the repository at this point in the history
This introduces caching of unix datagram send queues. Right now send
queues are only created for peers if the channel to the peer is full and
a send reported EWOULDBLOCK.

At this stage, performance will actually be slightly worse, because now
if there's a cached queue for a peer without queued messages, we don't
attempt direct send anymore until the send queue is removed from the
cache.

The next commit will modify unix_msg to always create a send queue with
the datagram socket in connected mode and again attempt an non-blocking
send on the connected socket first. Then only if that returns
EWOULDBLOCK, the send has to go through the threadpool.

Signed-off-by: Ralph Boehme <[email protected]>
Reviewed-by: Jeremy Allison <[email protected]>
  • Loading branch information
slowfranklin authored and jrasamba committed Sep 12, 2016
1 parent bb526a6 commit 16d0766
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
67 changes: 64 additions & 3 deletions source3/lib/unix_msg/unix_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "lib/util/iov_buf.h"
#include "lib/util/msghdr.h"
#include <fcntl.h>
#include "lib/util/time.h"

/*
* This file implements two abstractions: The "unix_dgram" functions implement
Expand All @@ -51,6 +52,7 @@ struct unix_dgram_send_queue {
struct unix_dgram_ctx *ctx;
int sock;
struct unix_dgram_msg *msgs;
struct poll_timeout *timeout;
char path[];
};

Expand Down Expand Up @@ -364,6 +366,8 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
return 0;
}

static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);

static int unix_dgram_send_queue_init(
struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
struct unix_dgram_send_queue **result)
Expand All @@ -380,6 +384,7 @@ static int unix_dgram_send_queue_init(
}
q->ctx = ctx;
q->msgs = NULL;
q->timeout = NULL;
memcpy(q->path, dst->sun_path, pathlen);

q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
Expand Down Expand Up @@ -411,6 +416,12 @@ static int unix_dgram_send_queue_init(

DLIST_ADD(ctx->send_queues, q);

ret = unix_dgram_sendq_schedule_free(q);
if (ret != 0) {
err = ENOMEM;
goto fail_close;
}

*result = q;
return 0;

Expand All @@ -434,9 +445,59 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
}
close(q->sock);
DLIST_REMOVE(ctx->send_queues, q);
ctx->ev_funcs->timeout_free(q->timeout);
free(q);
}

static void unix_dgram_sendq_scheduled_free_handler(
struct poll_timeout *t, void *private_data);

static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
{
struct unix_dgram_ctx *ctx = q->ctx;
struct timeval timeout;

if (q->timeout != NULL) {
return 0;
}

GetTimeOfDay(&timeout);
timeout.tv_sec += SENDQ_CACHE_TIME_SECS;

q->timeout = ctx->ev_funcs->timeout_new(
ctx->ev_funcs,
timeout,
unix_dgram_sendq_scheduled_free_handler,
q);
if (q->timeout == NULL) {
unix_dgram_send_queue_free(q);
return ENOMEM;
}

return 0;
}

static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
void *private_data)
{
struct unix_dgram_send_queue *q = private_data;
int ret;

q->ctx->ev_funcs->timeout_free(q->timeout);
q->timeout = NULL;

if (q->msgs == NULL) {
unix_dgram_send_queue_free(q);
return;
}

ret = unix_dgram_sendq_schedule_free(q);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return;
}
}

static int find_send_queue(struct unix_dgram_ctx *ctx,
const struct sockaddr_un *dst,
struct unix_dgram_send_queue **ps)
Expand Down Expand Up @@ -555,12 +616,12 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
if (q->msgs != NULL) {
ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
unix_dgram_send_job, q->msgs);
if (ret == 0) {
if (ret != 0) {
unix_dgram_send_queue_free(q);
return;
}
return;
}

unix_dgram_send_queue_free(q);
}

static int unix_dgram_send(struct unix_dgram_ctx *ctx,
Expand Down
2 changes: 2 additions & 0 deletions source3/lib/unix_msg/unix_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,6 @@ int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
*/
int unix_msg_free(struct unix_msg_ctx *ctx);

#define SENDQ_CACHE_TIME_SECS 10

#endif
2 changes: 1 addition & 1 deletion source3/lib/unix_msg/wscript_build
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
source='unix_msg.c',
deps='replace PTHREADPOOL iov_buf msghdr')
deps='replace PTHREADPOOL iov_buf msghdr time-basic')

bld.SAMBA3_BINARY('unix_msg_test',
source='tests.c',
Expand Down

0 comments on commit 16d0766

Please sign in to comment.