Skip to content

Commit

Permalink
Refactor kqueue to support multiple event loops
Browse files Browse the repository at this point in the history
  • Loading branch information
droe committed Jul 25, 2018
1 parent 41652aa commit e934dad
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 87 deletions.
33 changes: 18 additions & 15 deletions evtloop.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ evtloop_run(config_t *cfg) {
kevent_ctx_t auef_ctx = KEVENT_CTX_FD_READ(auef_readable, cfg);
kevent_ctx_t sttm_ctx = KEVENT_CTX_TIMER(stats_timer_fired, cfg);
kevent_ctx_t cftm_ctx = KEVENT_CTX_TIMER(config_timer_fired, cfg);
kqueue_t *kq = NULL;
int pidc;
pid_t *pidv;
int rv;
Expand Down Expand Up @@ -898,50 +899,51 @@ evtloop_run(config_t *cfg) {
hackmon_init(cfg);

/* open kqueue */
if (kqueue_init() == -1) {
fprintf(stderr, "kqueue_init() failed: %s (%i)\n",
kq = kqueue_new();
if (!kq) {
fprintf(stderr, "kqueue_new() failed: %s (%i)\n",
strerror(errno), errno);
rv = -1;
goto errout_silent;
}

/* install kevent-based signal handlers */
rv = kqueue_add_signal(SIGQUIT, &sigquit_ctx);
rv = kqueue_add_signal(kq, SIGQUIT, &sigquit_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_signal(SIGQUIT) failed: %s (%i)\n",
strerror(errno), errno);
rv = -1;
goto errout_silent;
}
rv = kqueue_add_signal(SIGTERM, &sigquit_ctx);
rv = kqueue_add_signal(kq, SIGTERM, &sigquit_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_signal(SIGTERM) failed: %s (%i)\n",
strerror(errno), errno);
rv = -1;
goto errout_silent;
}
rv = kqueue_add_signal(SIGINT, &sigquit_ctx);
rv = kqueue_add_signal(kq, SIGINT, &sigquit_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_signal(SIGINT) failed: %s (%i)\n",
strerror(errno), errno);
rv = -1;
goto errout_silent;
}
rv = kqueue_add_signal(SIGINFO, &siginfo_ctx);
rv = kqueue_add_signal(kq, SIGINFO, &siginfo_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_signal(SIGINFO) failed: %s (%i)\n",
strerror(errno), errno);
rv = -1;
goto errout_silent;
}
rv = kqueue_add_signal(SIGHUP, &sighup_ctx);
rv = kqueue_add_signal(kq, SIGHUP, &sighup_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_signal(SIGHUP) failed: %s (%i)\n",
strerror(errno), errno);
rv = -1;
goto errout_silent;
}
rv = kqueue_add_signal(SIGUSR1, &sigusr1_ctx);
rv = kqueue_add_signal(kq, SIGUSR1, &sigusr1_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_signal(SIGUSR1) failed: %s (%i)\n",
strerror(errno), errno);
Expand Down Expand Up @@ -979,7 +981,7 @@ evtloop_run(config_t *cfg) {
fprintf(stderr, "Proceeding without kext\n");
cfg->kextlevel = 0;
} else {
rv = kqueue_add_signal(SIGTSTP, &sigtstp_ctx);
rv = kqueue_add_signal(kq, SIGTSTP, &sigtstp_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_signal(SIGTSTP) "
"failed: %s (%i)\n",
Expand All @@ -1000,7 +1002,7 @@ evtloop_run(config_t *cfg) {

/* add kextctl to kqueue */
if (kefd != -1) {
rv = kqueue_add_fd_read(kefd, &kefd_ctx);
rv = kqueue_add_fd_read(kq, kefd, &kefd_ctx);
if (rv == -1) {
fprintf(stderr,
"kqueue_add_fd_read(/dev/xnumon) failed: "
Expand All @@ -1011,7 +1013,7 @@ evtloop_run(config_t *cfg) {
}

/* add auditpipe to kqueue */
rv = kqueue_add_fd_read(fileno(auef), &auef_ctx);
rv = kqueue_add_fd_read(kq, fileno(auef), &auef_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_fd_read(/dev/auditpipe) failed: "
"%s (%i)\n", strerror(errno), errno);
Expand All @@ -1020,7 +1022,7 @@ evtloop_run(config_t *cfg) {
}

/* start stats timer */
rv = kqueue_add_timer(TIMER_STATS, cfg->stats_interval, &sttm_ctx);
rv = kqueue_add_timer(kq, TIMER_STATS, cfg->stats_interval, &sttm_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_timer(1) failed: %s (%i)\n",
strerror(errno), errno);
Expand All @@ -1030,7 +1032,7 @@ evtloop_run(config_t *cfg) {

if (cfg->launchd_mode) {
/* start config file timer */
rv = kqueue_add_timer(TIMER_CONFIG, 300, &cftm_ctx);
rv = kqueue_add_timer(kq, TIMER_CONFIG, 300, &cftm_ctx);
if (rv == -1) {
fprintf(stderr, "kqueue_add_timer(2) failed: %s (%i)\n",
strerror(errno), errno);
Expand All @@ -1043,7 +1045,7 @@ evtloop_run(config_t *cfg) {
DEBUG(cfg->debug, "xnumon_start", "init complete");
running = true;
for (;;) {
rv = kqueue_dispatch();
rv = kqueue_dispatch(kq);
if (rv != 0) {
if (!running)
break;
Expand All @@ -1065,7 +1067,8 @@ evtloop_run(config_t *cfg) {
}

errout_silent:
kqueue_fini();
if (kq)
kqueue_free(kq);
if (kefd != -1) {
close(kefd);
kefd = -1;
Expand Down
116 changes: 59 additions & 57 deletions kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,49 +27,51 @@
* the kqueue fd for signals and timers. For now, only support 10.11+.
*/

static int kqfd = -1;
static struct kevent *ke;
static int nke;

static int
ke_enlarge(void) {
nke++;
ke = realloc(ke, nke * sizeof(struct kevent));
if (!ke)
kqueue_enlarge(kqueue_t *kq) {
kq->nke++;
kq->ke = realloc(kq->ke, kq->nke * sizeof(struct kevent));
if (!kq->ke)
return -1;
return 0;
}

int
kqueue_init(void) {
kqfd = kqueue();
if (kqfd == -1)
return -1;
nke = 0;
ke = NULL;
return 0;
kqueue_t *
kqueue_new(void) {
kqueue_t *kq;

kq = malloc(sizeof(kqueue_t));
if (!kq)
return NULL;
bzero(kq, sizeof(kqueue_t));
kq->fd = kqueue();
if (kq->fd == -1) {
free(kq);
return NULL;
}
return kq;
}

void
kqueue_fini(void) {
if (kqfd != -1)
close(kqfd);
if (ke) {
free(ke);
ke = NULL;
}
kqueue_free(kqueue_t *kq) {
if (kq->fd != -1)
close(kq->fd);
if (kq->ke)
free(kq->ke);
free(kq);
}

int
kqueue_dispatch(void) {
int nev, i, rv;
kqueue_dispatch(kqueue_t *kq) {
kevent_ctx_t *ctx;
int nev;
int rv;

if (!ke)
if (!kq->ke)
return -1;

retry:
nev = kevent(kqfd, NULL, 0, ke, nke, NULL);
nev = kevent(kq->fd, NULL, 0, kq->ke, kq->nke, NULL);
if (nev == -1) {
if (errno == EINTR)
goto retry;
Expand All @@ -79,70 +81,70 @@ kqueue_dispatch(void) {
}

/* process signals */
for (i = 0; i < nev; i++) {
if (ke[i].filter != EVFILT_SIGNAL)
for (size_t i = 0; i < (size_t)nev; i++) {
if (kq->ke[i].filter != EVFILT_SIGNAL)
continue;
ctx = (kevent_ctx_t *)ke[i].udata;
ctx = (kevent_ctx_t *)kq->ke[i].udata;
assert(ctx);
assert(ctx->signal);
if (ctx->signal((int)ke[i].ident, ctx->udata) == -1)
if (ctx->signal((int)kq->ke[i].ident, ctx->udata) == -1)
return -1;
}

/* process timers */
for (i = 0; i < nev; i++) {
if (ke[i].filter != EVFILT_TIMER)
for (size_t i = 0; i < (size_t)nev; i++) {
if (kq->ke[i].filter != EVFILT_TIMER)
continue;
ctx = (kevent_ctx_t *)ke[i].udata;
ctx = (kevent_ctx_t *)kq->ke[i].udata;
assert(ctx);
assert(ctx->timer);
if (ctx->timer((int)ke[i].ident, ctx->udata) == -1)
if (ctx->timer((int)kq->ke[i].ident, ctx->udata) == -1)
return -1;
}

/* process prioritiy file descriptors */
for (i = 0; i < nev; i++) {
if (ke[i].filter != EVFILT_READ)
for (size_t i = 0; i < (size_t)nev; i++) {
if (kq->ke[i].filter != EVFILT_READ)
continue;
ctx = (kevent_ctx_t *)ke[i].udata;
ctx = (kevent_ctx_t *)kq->ke[i].udata;
assert(ctx);
if (!ctx->fd_prio)
continue;
rv = ctx->fd_prio((int)ke[i].ident, ctx->udata);
rv = ctx->fd_prio((int)kq->ke[i].ident, ctx->udata);
if (rv == -1)
return -1;
if (rv == 0)
continue;
assert(ctx->fd_read);
if (ctx->fd_read((int)ke[i].ident, ctx->udata) == -1)
if (ctx->fd_read((int)kq->ke[i].ident, ctx->udata) == -1)
return -1;
return 0; /* drain */
}

/* process file descriptors without priority function */
for (i = 0; i < nev; i++) {
if (ke[i].filter != EVFILT_READ)
for (size_t i = 0; i < (size_t)nev; i++) {
if (kq->ke[i].filter != EVFILT_READ)
continue;
ctx = (kevent_ctx_t *)ke[i].udata;
ctx = (kevent_ctx_t *)kq->ke[i].udata;
assert(ctx);
if (ctx->fd_prio)
continue;
assert(ctx->fd_read);
if (ctx->fd_read((int)ke[i].ident, ctx->udata) == -1)
if (ctx->fd_read((int)kq->ke[i].ident, ctx->udata) == -1)
return -1;
return 0; /* drain */
}

/* process file descriptors with priority function, but no priority */
for (i = 0; i < nev; i++) {
if (ke[i].filter != EVFILT_READ)
for (size_t i = 0; i < (size_t)nev; i++) {
if (kq->ke[i].filter != EVFILT_READ)
continue;
ctx = (kevent_ctx_t *)ke[i].udata;
ctx = (kevent_ctx_t *)kq->ke[i].udata;
assert(ctx);
if (!ctx->fd_prio)
continue;
assert(ctx->fd_read);
if (ctx->fd_read((int)ke[i].ident, ctx->udata) == -1)
if (ctx->fd_read((int)kq->ke[i].ident, ctx->udata) == -1)
return -1;
return 0; /* drain */
}
Expand All @@ -151,33 +153,33 @@ kqueue_dispatch(void) {
}

int
kqueue_add_fd_read(int fd, kevent_ctx_t *ctx) {
kqueue_add_fd_read(kqueue_t *kq, int fd, kevent_ctx_t *ctx) {
struct kevent ke;

if (ke_enlarge() == -1)
if (kqueue_enlarge(kq) == -1)
return -1;
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, NOTE_LOWAT, 1, ctx);
return kevent(kqfd, &ke, 1, NULL, 0, NULL);
return kevent(kq->fd, &ke, 1, NULL, 0, NULL);
}

int
kqueue_add_signal(int sig, kevent_ctx_t *ctx) {
kqueue_add_signal(kqueue_t *kq, int sig, kevent_ctx_t *ctx) {
struct kevent ke;

if (ke_enlarge() == -1)
if (kqueue_enlarge(kq) == -1)
return -1;
signal(sig, SIG_IGN);
EV_SET(&ke, sig, EVFILT_SIGNAL, EV_ADD, 0, 0, ctx);
return kevent(kqfd, &ke, 1, NULL, 0, NULL);
return kevent(kq->fd, &ke, 1, NULL, 0, NULL);
}

int
kqueue_add_timer(int ident, int secs, kevent_ctx_t *ctx) {
kqueue_add_timer(kqueue_t *kq, int ident, int secs, kevent_ctx_t *ctx) {
struct kevent ke;

if (ke_enlarge() == -1)
if (kqueue_enlarge(kq) == -1)
return -1;
EV_SET(&ke, ident, EVFILT_TIMER, EV_ADD, NOTE_SECONDS, secs, ctx);
return kevent(kqfd, &ke, 1, NULL, 0, NULL);
return kevent(kq->fd, &ke, 1, NULL, 0, NULL);
}

33 changes: 18 additions & 15 deletions kqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,30 @@ typedef bool (*kevent_fd_prio_func_t)(int, void *);
typedef int (*kevent_signal_func_t)(int, void *);
typedef int (*kevent_timer_func_t)(int, void *);

typedef struct kevent_ctx {
typedef struct {
kevent_fd_read_func_t fd_read;
kevent_fd_prio_func_t fd_prio;
kevent_signal_func_t signal;
kevent_timer_func_t timer;
void *udata;
tommy_node node;
} kevent_ctx_t;

#define TOMMY_NODE_INIT {NULL, NULL, NULL, 0}

#define KEVENT_CTX_SIGNAL(S,U) {NULL,NULL,(S),NULL,(U),TOMMY_NODE_INIT}
#define KEVENT_CTX_FD_READ(R,U) {(R),NULL,NULL,NULL,(U),TOMMY_NODE_INIT}
#define KEVENT_CTX_FD_READ_PRIO(R,P,U) {(R),(P),NULL,NULL,(U),TOMMY_NODE_INIT}
#define KEVENT_CTX_TIMER(T,U) {NULL,NULL,NULL,(T),(U),TOMMY_NODE_INIT}

int kqueue_init(void) WUNRES;
void kqueue_fini(void);
int kqueue_dispatch(void) WUNRES;
int kqueue_add_fd_read(int, kevent_ctx_t *) NONNULL(2) WUNRES;
int kqueue_add_signal(int, kevent_ctx_t *) NONNULL(2) WUNRES;
int kqueue_add_timer(int, int, kevent_ctx_t *) NONNULL(3) WUNRES;
#define KEVENT_CTX_SIGNAL(SF,UD) {NULL, NULL, (SF), NULL, (UD)}
#define KEVENT_CTX_FD_READ(RF,UD) {(RF), NULL, NULL, NULL, (UD)}
#define KEVENT_CTX_FD_READ_PRIO(RF,PF,UD) {(RF), (PF), NULL, NULL, (UD)}
#define KEVENT_CTX_TIMER(TF,UD) {NULL, NULL, NULL, (TF), (UD)}

typedef struct {
int fd;
struct kevent *ke;
size_t nke;
} kqueue_t;

kqueue_t * kqueue_new(void) MALLOC;
void kqueue_free(kqueue_t *) NONNULL(1);
int kqueue_dispatch(kqueue_t *) NONNULL(1) WUNRES;
int kqueue_add_fd_read(kqueue_t *, int, kevent_ctx_t *) NONNULL(1,3) WUNRES;
int kqueue_add_signal(kqueue_t *, int, kevent_ctx_t *) NONNULL(1,3) WUNRES;
int kqueue_add_timer(kqueue_t *, int, int, kevent_ctx_t *) NONNULL(1,4) WUNRES;

#endif

0 comments on commit e934dad

Please sign in to comment.