Skip to content

Commit

Permalink
Add an option to trigger bufferevent I/O callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
mistotebe committed Dec 3, 2013
1 parent 4ce242b commit 61ee18b
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 56 deletions.
10 changes: 8 additions & 2 deletions bufferevent-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,20 @@ int bufferevent_decref_and_unlock_(struct bufferevent *bufev);

/** Internal: If callbacks are deferred and we have a read callback, schedule
* a readcb. Otherwise just run the readcb. */
void bufferevent_run_readcb_(struct bufferevent *bufev);
void bufferevent_run_readcb_(struct bufferevent *bufev, int options);
/** Internal: If callbacks are deferred and we have a write callback, schedule
* a writecb. Otherwise just run the writecb. */
void bufferevent_run_writecb_(struct bufferevent *bufev);
void bufferevent_run_writecb_(struct bufferevent *bufev, int options);
/** Internal: If callbacks are deferred and we have an eventcb, schedule
* it to run with events "what". Otherwise just run the eventcb. */
void bufferevent_run_eventcb_(struct bufferevent *bufev, short what);

/** Internal: Run or schedule (if deferred or options contain
* BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype.
* Must already hold the bufev lock. */
void bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options);


/** Internal: Add the event 'ev' with timeout tv, unless tv is set to 0, in
* which case add ev with no timeout. */
int bufferevent_add_event_(struct event *ev, const struct timeval *tv);
Expand Down
55 changes: 37 additions & 18 deletions bufferevent.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,15 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg


void
bufferevent_run_readcb_(struct bufferevent *bufev)
bufferevent_run_readcb_(struct bufferevent *bufev, int options)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->readcb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
(options & BEV_TRIG_DEFER_CALLBACKS)) {
p->readcb_pending = 1;
SCHEDULE_DEFERRED(p);
} else {
Expand All @@ -235,21 +236,41 @@ bufferevent_run_readcb_(struct bufferevent *bufev)
}

void
bufferevent_run_writecb_(struct bufferevent *bufev)
bufferevent_run_writecb_(struct bufferevent *bufev, int options)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->writecb == NULL)
return;
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
(options & BEV_TRIG_DEFER_CALLBACKS)) {
p->writecb_pending = 1;
SCHEDULE_DEFERRED(p);
} else {
bufev->writecb(bufev, bufev->cbarg);
}
}

void
bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options)
{
if ((iotype & EV_READ) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
evbuffer_get_length(bufev->input) >= bufev->wm_read.low))
bufferevent_run_readcb_(bufev, options);
if ((iotype & EV_WRITE) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
evbuffer_get_length(bufev->output) <= bufev->wm_write.low))
bufferevent_run_writecb_(bufev, options);
}

void
bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
{
bufferevent_incref_and_lock_(bufev);
bufferevent_trigger_nolock_(bufev, iotype, options);
bufferevent_decref_and_unlock_(bufev);
}

void
bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
{
Expand Down Expand Up @@ -322,20 +343,18 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
return -1;
}
if (options & BEV_OPT_DEFER_CALLBACKS) {
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_locked,
bufev_private);
}
if (options & BEV_OPT_UNLOCK_CALLBACKS)
event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_unlocked,
bufev_private);
else
event_deferred_cb_init_(
&bufev_private->deferred,
event_base_get_npriorities(base) / 2,
bufferevent_run_deferred_callbacks_locked,
bufev_private);

bufev_private->options = options;

Expand Down
7 changes: 2 additions & 5 deletions bufferevent_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
bufferevent_run_readcb_(bev);
bufferevent_trigger_nolock_(bev, EV_READ, 0);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
Expand Down Expand Up @@ -502,9 +501,7 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (evbuffer_get_length(bev->output) <=
bev->wm_write.low)
bufferevent_run_writecb_(bev);
bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
Expand Down
10 changes: 4 additions & 6 deletions bufferevent_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
/* Or if we have filled the underlying output buffer. */
!be_underlying_writebuf_full(bevf,state));

if (processed &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
if (processed) {
/* call the write callback.*/
bufferevent_run_writecb_(bufev);
bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);

if (res == BEV_OK &&
(bufev->enabled & EV_WRITE) &&
Expand Down Expand Up @@ -442,9 +441,8 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
/* XXX This should be in process_input, not here. There are
* other places that can call process-input, and they should
* force readcb calls as needed. */
if (processed_any &&
evbuffer_get_length(bufev->input) >= bufev->wm_read.low)
bufferevent_run_readcb_(bufev);
if (processed_any)
bufferevent_trigger_nolock_(bufev, EV_READ, 0);

bufferevent_decref_and_unlock_(bufev);
}
Expand Down
13 changes: 3 additions & 10 deletions bufferevent_openssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,7 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
if (bev_ssl->underlying)
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);

if (evbuffer_get_length(output) <= bev->wm_write.low)
bufferevent_run_writecb_(bev);
bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
}
return result;
}
Expand Down Expand Up @@ -824,11 +823,8 @@ consider_reading(struct bufferevent_openssl *bev_ssl)

if (all_result_flags & OP_MADE_PROGRESS) {
struct bufferevent *bev = &bev_ssl->bev.bev;
struct evbuffer *input = bev->input;

if (evbuffer_get_length(input) >= bev->wm_read.low) {
bufferevent_run_readcb_(bev);
}
bufferevent_trigger_nolock_(bev, EV_READ, 0);
}

if (!bev_ssl->underlying) {
Expand All @@ -852,11 +848,8 @@ consider_writing(struct bufferevent_openssl *bev_ssl)
r = do_read(bev_ssl, 1024); /* XXXX 1024 is a hack */
if (r & OP_MADE_PROGRESS) {
struct bufferevent *bev = &bev_ssl->bev.bev;
struct evbuffer *input = bev->input;

if (evbuffer_get_length(input) >= bev->wm_read.low) {
bufferevent_run_readcb_(bev);
}
bufferevent_trigger_nolock_(bev, EV_READ, 0);
}
if (r & (OP_ERR|OP_BLOCKED))
break;
Expand Down
13 changes: 3 additions & 10 deletions bufferevent_pair.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ static void
be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
int ignore_wm)
{
size_t src_size, dst_size;
size_t dst_size;
size_t n;

evbuffer_unfreeze(src->output, 1);
Expand Down Expand Up @@ -182,15 +182,8 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
}

src_size = evbuffer_get_length(src->output);
dst_size = evbuffer_get_length(dst->input);

if (dst_size >= dst->wm_read.low) {
bufferevent_run_readcb_(dst);
}
if (src_size <= src->wm_write.low) {
bufferevent_run_writecb_(src);
}
bufferevent_trigger_nolock_(dst, EV_READ, 0);
bufferevent_trigger_nolock_(src, EV_WRITE, 0);
done:
evbuffer_freeze(src->output, 1);
evbuffer_freeze(dst->input, 0);
Expand Down
8 changes: 3 additions & 5 deletions bufferevent_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
bufferevent_decrement_read_buckets_(bufev_p, res);

/* Invoke the user callback - must always be called last */
if (evbuffer_get_length(input) >= bufev->wm_read.low)
bufferevent_run_readcb_(bufev);
bufferevent_trigger_nolock_(bufev, EV_READ, 0);

goto done;

Expand Down Expand Up @@ -294,9 +293,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
if ((res || !connected) &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
bufferevent_run_writecb_(bufev);
if (res || !connected) {
bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
}

goto done;
Expand Down
26 changes: 26 additions & 0 deletions include/event2/bufferevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,32 @@ int bufferevent_flush(struct bufferevent *bufev,
short iotype,
enum bufferevent_flush_mode mode);

/**
Flags for bufferevent_trigger(_event) that modify when and how to trigger
the callback.
*/
enum bufferevent_trigger_options {
/** trigger the callback regardless of the watermarks */
BEV_TRIG_IGNORE_WATERMARKS = (1<<0),

/** defer even if the callbacks are not */
BEV_TRIG_DEFER_CALLBACKS = (1<<1),
};

/**
Triggers bufferevent data callbacks.
The function will honor watermarks unless options contain
BEV_TRIG_IGNORE_WATERMARKS. If the options contain BEV_OPT_DEFER_CALLBACKS,
the callbacks are deferred.
@param bufev the bufferevent object
@param iotype either EV_READ or EV_WRITE or both.
@param options
*/
void bufferevent_trigger(struct bufferevent *bufev, short iotype,
int options);

/**
@name Filtering support
Expand Down
Loading

0 comments on commit 61ee18b

Please sign in to comment.