Skip to content

Commit

Permalink
Only process up to MAX_DEFERRED deferred_cbs at a time.
Browse files Browse the repository at this point in the history
If threads queue callbacks while event_process_deferred_callbacks is
running, the loop may spin long enough to significantly skew timers.
A unit test stressing this behavior is also in this commit.
  • Loading branch information
chris-davis committed Sep 8, 2010
1 parent 2447fe8 commit 17a14f1
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 16 deletions.
15 changes: 5 additions & 10 deletions bufferevent.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)

#define SCHEDULE_DEFERRED(bevp) \
do { \
bufferevent_incref(&(bevp)->bev); \
event_deferred_cb_schedule( \
event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
&(bevp)->deferred); \
} while (0);
} while (0)


void
Expand All @@ -222,10 +223,8 @@ _bufferevent_run_readcb(struct bufferevent *bufev)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
}
} else {
bufev->readcb(bufev, bufev->cbarg);
}
Expand All @@ -241,10 +240,8 @@ _bufferevent_run_writecb(struct bufferevent *bufev)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
}
} else {
bufev->writecb(bufev, bufev->cbarg);
}
Expand All @@ -261,10 +258,8 @@ _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
if (!p->deferred.queued) {
bufferevent_incref(bufev);
if (!p->deferred.queued)
SCHEDULE_DEFERRED(p);
}
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}
Expand Down
16 changes: 10 additions & 6 deletions event.c
Original file line number Diff line number Diff line change
Expand Up @@ -1285,29 +1285,33 @@ event_process_active_single_queue(struct event_base *base,
}

/*
Process all the defered_cb entries in 'queue'. If *breakptr becomes set to
1, stop. Requires that we start out holding the lock on 'queue'; releases
the lock around 'queue' for each deferred_cb we process.
Process up to MAX_DEFERRED of the defered_cb entries in 'queue'. If
*breakptr becomes set to 1, stop. Requires that we start out holding
the lock on 'queue'; releases the lock around 'queue' for each deferred_cb
we process.
*/
static int
event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
{
int count = 0;
struct deferred_cb *cb;

#define MAX_DEFERRED 16
while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0;
TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
--queue->active_count;
UNLOCK_DEFERRED_QUEUE(queue);

cb->cb(cb, cb->arg);
++count;
if (*breakptr)
return -1;

LOCK_DEFERRED_QUEUE(queue);
if (*breakptr)
return -1;
if (++count == MAX_DEFERRED)
break;
}
#undef MAX_DEFERRED
return count;
}

Expand Down
94 changes: 94 additions & 0 deletions test/regress_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <unistd.h>
#endif

#ifdef _EVENT_HAVE_PTHREADS
#include <pthread.h>
Expand All @@ -46,6 +49,7 @@
#include "event2/event_struct.h"
#include "event2/thread.h"
#include "evthread-internal.h"
#include "defer-internal.h"
#include "regress.h"
#include "tinytest_macros.h"

Expand Down Expand Up @@ -312,12 +316,102 @@ thread_conditions_simple(void *arg)
;
}

#define CB_COUNT 128
#define QUEUE_THREAD_COUNT 8

#ifdef WIN32
#define SLEEP_MS(ms) Sleep(ms)
#else
#define SLEEP_MS(ms) usleep((ms) * 1000)
#endif

struct deferred_test_data {
struct deferred_cb cbs[CB_COUNT];
struct deferred_cb_queue *queue;
};

static time_t timer_start = 0;
static time_t timer_end = 0;
static unsigned callback_count = 0;
static THREAD_T load_threads[QUEUE_THREAD_COUNT];
static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT];

static void
deferred_callback(struct deferred_cb *cb, void *arg)
{
SLEEP_MS(1);
callback_count += 1;
}

static THREAD_FN
load_deferred_queue(void *arg)
{
struct deferred_test_data *data = arg;
size_t i;

for (i = 0; i < CB_COUNT; ++i) {
event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL);
event_deferred_cb_schedule(data->queue, &data->cbs[i]);
SLEEP_MS(1);
}

THREAD_RETURN();
}

static void
timer_callback(evutil_socket_t fd, short what, void *arg)
{
timer_end = time(NULL);
}

static void
start_threads_callback(evutil_socket_t fd, short what, void *arg)
{
int i;

for (i = 0; i < QUEUE_THREAD_COUNT; ++i) {
THREAD_START(load_threads[i], load_deferred_queue,
&deferred_data[i]);
}
}

static void
thread_deferred_cb_skew(void *arg)
{
struct basic_test_data *data = arg;
struct timeval tv_timer = {4, 0};
struct event event_threads;
struct deferred_cb_queue *queue;
int i;

queue = event_base_get_deferred_cb_queue(data->base);
tt_assert(queue);

for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
deferred_data[i].queue = queue;

timer_start = time(NULL);
event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL,
&tv_timer);
event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback,
NULL, NULL);
event_base_dispatch(data->base);

TT_BLATHER(("callback count, %u", callback_count));
tt_int_op(timer_end - timer_start, ==, 4);

end:
for (i = 0; i < QUEUE_THREAD_COUNT; ++i)
THREAD_JOIN(load_threads[i]);
}

#define TEST(name) \
{ #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \
&basic_setup, NULL }
struct testcase_t thread_testcases[] = {
TEST(basic),
TEST(conditions_simple),
TEST(deferred_cb_skew),
END_OF_TESTCASES
};

0 comments on commit 17a14f1

Please sign in to comment.