Skip to content

Commit

Permalink
IOCP-related evbuffer fixes.
Browse files Browse the repository at this point in the history
- Prevent evbuffer_{add,prepend}_buffer from moving read-pinned chains.
- Fix evbuffer_drain to handle read-pinned chains better.
- Raise the limit on WSABUFs from two to MAX_WSABUFS for overlapped reads.
  • Loading branch information
chris-davis committed Sep 8, 2010
1 parent 4209007 commit 03afa20
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 41 deletions.
122 changes: 106 additions & 16 deletions buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ evbuffer_reserve_space(struct evbuffer *buf, ev_ssize_t size,
} else {
if (_evbuffer_expand_fast(buf, size, n_vecs)<0)
goto done;
n = _evbuffer_read_setup_vecs(buf, size, vec, n_vecs, &chainp, 0);
n = _evbuffer_read_setup_vecs(buf, size, vec, n_vecs,
&chainp, 0);
}

done:
Expand Down Expand Up @@ -670,6 +671,12 @@ evbuffer_commit_space(struct evbuffer *buf,
return result;
}

static inline int
HAS_PINNED_R(struct evbuffer *buf)
{
return (buf->last && CHAIN_PINNED_R(buf->last));
}

static inline void
ZERO_CHAIN(struct evbuffer *dst)
{
Expand All @@ -680,6 +687,71 @@ ZERO_CHAIN(struct evbuffer *dst)
dst->total_len = 0;
}

/* Prepares the contents of src to be moved to another buffer by removing
* read-pinned chains. The first pinned chain is saved in first, and the
* last in last. If src has no read-pinned chains, first and last are set
* to NULL. */
static int
PRESERVE_PINNED(struct evbuffer *src, struct evbuffer_chain **first,
struct evbuffer_chain **last)
{
struct evbuffer_chain *chain, **pinned;

ASSERT_EVBUFFER_LOCKED(src);

if (!HAS_PINNED_R(src)) {
*first = *last = NULL;
return 0;
}

pinned = src->last_with_datap;
if (!CHAIN_PINNED_R(*pinned))
pinned = &(*pinned)->next;
EVUTIL_ASSERT(CHAIN_PINNED_R(*pinned));
chain = *first = *pinned;
*last = src->last;

/* If there's data in the first pinned chain, we need to allocate
* a new chain and copy the data over. */
if (chain->off) {
struct evbuffer_chain *tmp;

EVUTIL_ASSERT(pinned == src->last_with_datap);
tmp = evbuffer_chain_new(chain->off);
if (!tmp)
return -1;
memcpy(tmp->buffer, chain->buffer + chain->misalign,
chain->off);
tmp->off = chain->off;
*src->last_with_datap = tmp;
src->last = tmp;
chain->misalign += chain->off;
chain->off = 0;
} else {
src->last = *src->last_with_datap;
*pinned = NULL;
}

return 0;
}

static inline void
RESTORE_PINNED(struct evbuffer *src, struct evbuffer_chain *pinned,
struct evbuffer_chain *last)
{
ASSERT_EVBUFFER_LOCKED(src);

if (!pinned) {
ZERO_CHAIN(src);
return;
}

src->first = pinned;
src->last = last;
src->last_with_datap = &src->first;
src->total_len = 0;
}

static inline void
COPY_CHAIN(struct evbuffer *dst, struct evbuffer *src)
{
Expand Down Expand Up @@ -729,6 +801,7 @@ PREPEND_CHAIN(struct evbuffer *dst, struct evbuffer *src)
int
evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
{
struct evbuffer_chain *pinned, *last;
size_t in_total_len, out_total_len;
int result = 0;

Expand All @@ -744,6 +817,11 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
goto done;
}

if (PRESERVE_PINNED(inbuf, &pinned, &last) < 0) {
result = -1;
goto done;
}

if (out_total_len == 0) {
/* There might be an empty chain at the start of outbuf; free
* it. */
Expand All @@ -753,8 +831,8 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
APPEND_CHAIN(outbuf, inbuf);
}

/* remove everything from inbuf */
ZERO_CHAIN(inbuf);
RESTORE_PINNED(inbuf, pinned, last);

inbuf->n_del_for_cb += in_total_len;
outbuf->n_add_for_cb += in_total_len;

Expand All @@ -769,6 +847,7 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
int
evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
{
struct evbuffer_chain *pinned, *last;
size_t in_total_len, out_total_len;
int result = 0;

Expand All @@ -785,6 +864,11 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
goto done;
}

if (PRESERVE_PINNED(inbuf, &pinned, &last) < 0) {
result = -1;
goto done;
}

if (out_total_len == 0) {
/* There might be an empty chain at the start of outbuf; free
* it. */
Expand All @@ -794,8 +878,8 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf)
PREPEND_CHAIN(outbuf, inbuf);
}

/* remove everything from inbuf */
ZERO_CHAIN(inbuf);
RESTORE_PINNED(inbuf, pinned, last);

inbuf->n_del_for_cb += in_total_len;
outbuf->n_add_for_cb += in_total_len;

Expand All @@ -810,7 +894,7 @@ int
evbuffer_drain(struct evbuffer *buf, size_t len)
{
struct evbuffer_chain *chain, *next;
size_t old_len;
size_t remaining, old_len;
int result = 0;

EVBUFFER_LOCK(buf);
Expand All @@ -824,12 +908,10 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
goto done;
}


if (len >= old_len && !(buf->last && CHAIN_PINNED_R(buf->last))) {
if (len >= old_len && !HAS_PINNED_R(buf)) {
len = old_len;
for (chain = buf->first; chain != NULL; chain = next) {
next = chain->next;

evbuffer_chain_free(chain);
}

Expand All @@ -839,25 +921,33 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
len = old_len;

buf->total_len -= len;

for (chain = buf->first; len >= chain->off; chain = next) {
remaining = len;
for (chain = buf->first;
remaining >= chain->off;
chain = next) {
next = chain->next;
len -= chain->off;
remaining -= chain->off;

if (chain == *buf->last_with_datap) {
buf->last_with_datap = &buf->first;
}
if (&chain->next == buf->last_with_datap)
buf->last_with_datap = &buf->first;

if (len == 0 && CHAIN_PINNED_R(chain))
if (CHAIN_PINNED_R(chain)) {
EVUTIL_ASSERT(remaining == 0);
chain->misalign += chain->off;
chain->off = 0;
break;
evbuffer_chain_free(chain);
} else
evbuffer_chain_free(chain);
}

buf->first = chain;
chain->misalign += len;
chain->off -= len;
if (chain) {
chain->misalign += remaining;
chain->off -= remaining;
}
}

buf->n_del_for_cb += len;
Expand Down
51 changes: 27 additions & 24 deletions buffer_iocp.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,46 +82,51 @@ static void
pin_release(struct evbuffer_overlapped *eo, unsigned flag)
{
int i;
struct evbuffer_chain *chain = eo->first_pinned;
struct evbuffer_chain *next, *chain = eo->first_pinned;

for (i = 0; i < eo->n_buffers; ++i) {
EVUTIL_ASSERT(chain);
next = chain->next;
_evbuffer_chain_unpin(chain, flag);
chain = chain->next;
chain = next;
}
}

void
evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
{
struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
struct evbuffer_iovec iov[2];
int n_vec;
struct evbuffer_chain **chainp;
size_t remaining, len;
unsigned i;

EVBUFFER_LOCK(evbuf);
EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
EVUTIL_ASSERT(nBytes >= 0); /* XXXX Can this be false? */

evbuffer_unfreeze(evbuf, 0);

iov[0].iov_base = buf->buffers[0].buf;
if ((size_t)nBytes <= buf->buffers[0].len) {
iov[0].iov_len = nBytes;
n_vec = 1;
} else {
iov[0].iov_len = buf->buffers[0].len;
iov[1].iov_base = buf->buffers[1].buf;
iov[1].iov_len = nBytes - iov[0].iov_len;
n_vec = 2;
chainp = evbuf->last_with_datap;
if (!((*chainp)->flags & EVBUFFER_MEM_PINNED_R))
chainp = &(*chainp)->next;
remaining = nBytes;
for (i = 0; remaining > 0 && i < buf->n_buffers; ++i) {
EVUTIL_ASSERT(*chainp);
len = buf->buffers[i].len;
if (remaining < len)
len = remaining;
(*chainp)->off += len;
evbuf->last_with_datap = chainp;
remaining -= len;
chainp = &(*chainp)->next;
}

if (evbuffer_commit_space(evbuf, iov, n_vec) < 0)
EVUTIL_ASSERT(0); /* XXXX fail nicer. */

pin_release(buf, EVBUFFER_MEM_PINNED_R);

buf->read_in_progress = 0;

evbuf->total_len += nBytes;

_evbuffer_decref_and_unlock(evbuf);
}

Expand Down Expand Up @@ -184,7 +189,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
}
evbuffer_freeze(buf, 1);

buf_o->first_pinned = 0;
buf_o->first_pinned = NULL;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));

Expand Down Expand Up @@ -246,27 +251,25 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
if (buf->freeze_end || buf_o->read_in_progress)
goto done;

buf_o->first_pinned = 0;
buf_o->first_pinned = NULL;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));

if (_evbuffer_expand_fast(buf, at_most, 2) == -1)
if (_evbuffer_expand_fast(buf, at_most, MAX_WSABUFS) == -1)
goto done;
evbuffer_freeze(buf, 0);

/* XXX This and evbuffer_read_setup_vecs() should say MAX_WSABUFS,
* not "2". But commit_read() above can't handle more than two
* buffers yet. */
nvecs = _evbuffer_read_setup_vecs(buf, at_most,
vecs, 2, &chainp, 1);
vecs, MAX_WSABUFS, &chainp, 1);
for (i=0;i<nvecs;++i) {
WSABUF_FROM_EVBUFFER_IOV(
&buf_o->buffers[i],
&vecs[i]);
}

buf_o->n_buffers = nvecs;
buf_o->first_pinned = chain= *chainp;
buf_o->first_pinned = chain = *chainp;

npin=0;
for ( ; chain; chain = chain->next) {
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R);
Expand Down
3 changes: 2 additions & 1 deletion evbuffer-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ int _evbuffer_expand_fast(struct evbuffer *, size_t, int);
* Returns the number of vecs used.
*/
int _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,
struct evbuffer_iovec *vecs, int n_vecs, struct evbuffer_chain ***chainp, int exact);
struct evbuffer_iovec *vecs, int n_vecs, struct evbuffer_chain ***chainp,
int exact);

/* Helper macro: copies an evbuffer_iovec in ei to a win32 WSABUF in i. */
#define WSABUF_FROM_EVBUFFER_IOV(i,ei) do { \
Expand Down

0 comments on commit 03afa20

Please sign in to comment.