Skip to content

Commit

Permalink
Merge github.com:grpc/grpc into rollfwd
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jan 24, 2017
2 parents f9623b2 + 52f545f commit 0f2e2e4
Show file tree
Hide file tree
Showing 31 changed files with 360 additions and 149 deletions.
2 changes: 2 additions & 0 deletions grpc.def
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ EXPORTS
grpc_slice_buffer_move_into
grpc_slice_buffer_trim_end
grpc_slice_buffer_move_first
grpc_slice_buffer_move_first_into_buffer
grpc_slice_buffer_take_first
grpc_slice_buffer_undo_take_first
gpr_malloc
gpr_free
gpr_realloc
Expand Down
9 changes: 7 additions & 2 deletions include/grpc/impl/codegen/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,16 @@ struct grpc_slice {
/* Represents an expandable array of slices, to be interpreted as a
single item. */
typedef struct {
/* slices in the array */
/* This is for internal use only. External users (i.e any code outside grpc
* core) MUST NOT use this field */
grpc_slice *base_slices;

/* slices in the array (Points to the first valid grpc_slice in the array) */
grpc_slice *slices;
/* the number of slices in the array */
size_t count;
/* the number of slices allocated in the array */
/* the number of slices allocated in the array. External users (i.e any code
* outside grpc core) MUST NOT use this field */
size_t capacity;
/* the combined length of all slices in the array */
size_t length;
Expand Down
7 changes: 7 additions & 0 deletions include/grpc/slice_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,15 @@ GPRAPI void grpc_slice_buffer_trim_end(grpc_slice_buffer *src, size_t n,
/* move the first n bytes of src into dst */
GPRAPI void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n,
grpc_slice_buffer *dst);
/* move the first n bytes of src into dst (copying them) */
GPRAPI void grpc_slice_buffer_move_first_into_buffer(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer *src,
size_t n, void *dst);
/* take the first slice in the slice buffer */
GPRAPI grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer *src);
/* undo the above with (a possibly different) \a slice */
GPRAPI void grpc_slice_buffer_undo_take_first(grpc_slice_buffer *src,
grpc_slice slice);

#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/iomgr/ev_epoll_linux.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ static polling_island *polling_island_merge(polling_island *p,
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */

workqueue_move_items_to_parent(q);
workqueue_move_items_to_parent(p);
}
/* else if p == q, nothing needs to be done */

Expand Down
4 changes: 1 addition & 3 deletions src/core/lib/iomgr/ev_poll_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
const char *reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != NULL;
if (!fd->released) {
shutdown(fd->fd, SHUT_RDWR);
} else {
if (fd->released) {
*release_fd = fd->fd;
}
gpr_mu_lock(&fd->mu);
Expand Down
131 changes: 92 additions & 39 deletions src/core/lib/slice/slice_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,42 @@
#define GROW(x) (3 * (x) / 2)

static void maybe_embiggen(grpc_slice_buffer *sb) {
if (sb->count == sb->capacity) {
if (sb->base_slices != sb->slices) {
memmove(sb->base_slices, sb->slices, sb->count * sizeof(grpc_slice));
sb->slices = sb->base_slices;
}

/* How far away from sb->base_slices is sb->slices pointer */
size_t slice_offset = (size_t)(sb->slices - sb->base_slices);
size_t slice_count = sb->count + slice_offset;

if (slice_count == sb->capacity) {
sb->capacity = GROW(sb->capacity);
GPR_ASSERT(sb->capacity > sb->count);
if (sb->slices == sb->inlined) {
sb->slices = gpr_malloc(sb->capacity * sizeof(grpc_slice));
memcpy(sb->slices, sb->inlined, sb->count * sizeof(grpc_slice));
GPR_ASSERT(sb->capacity > slice_count);
if (sb->base_slices == sb->inlined) {
sb->base_slices = gpr_malloc(sb->capacity * sizeof(grpc_slice));
memcpy(sb->base_slices, sb->inlined, slice_count * sizeof(grpc_slice));
} else {
sb->slices = gpr_realloc(sb->slices, sb->capacity * sizeof(grpc_slice));
sb->base_slices =
gpr_realloc(sb->base_slices, sb->capacity * sizeof(grpc_slice));
}

sb->slices = sb->base_slices + slice_offset;
}
}

void grpc_slice_buffer_init(grpc_slice_buffer *sb) {
sb->count = 0;
sb->length = 0;
sb->capacity = GRPC_SLICE_BUFFER_INLINE_ELEMENTS;
sb->slices = sb->inlined;
sb->base_slices = sb->slices = sb->inlined;
}

void grpc_slice_buffer_destroy_internal(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer *sb) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, sb);
if (sb->slices != sb->inlined) {
gpr_free(sb->slices);
if (sb->base_slices != sb->inlined) {
gpr_free(sb->base_slices);
}
}

Expand Down Expand Up @@ -166,7 +178,6 @@ void grpc_slice_buffer_pop(grpc_slice_buffer *sb) {
void grpc_slice_buffer_reset_and_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer *sb) {
size_t i;

for (i = 0; i < sb->count; i++) {
grpc_slice_unref_internal(exec_ctx, sb->slices[i]);
}
Expand All @@ -182,32 +193,45 @@ void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer *sb) {
}

void grpc_slice_buffer_swap(grpc_slice_buffer *a, grpc_slice_buffer *b) {
GPR_SWAP(size_t, a->count, b->count);
GPR_SWAP(size_t, a->capacity, b->capacity);
GPR_SWAP(size_t, a->length, b->length);
size_t a_offset = (size_t)(a->slices - a->base_slices);
size_t b_offset = (size_t)(b->slices - b->base_slices);

if (a->slices == a->inlined) {
if (b->slices == b->inlined) {
size_t a_count = a->count + a_offset;
size_t b_count = b->count + b_offset;

if (a->base_slices == a->inlined) {
if (b->base_slices == b->inlined) {
/* swap contents of inlined buffer */
grpc_slice temp[GRPC_SLICE_BUFFER_INLINE_ELEMENTS];
memcpy(temp, a->slices, b->count * sizeof(grpc_slice));
memcpy(a->slices, b->slices, a->count * sizeof(grpc_slice));
memcpy(b->slices, temp, b->count * sizeof(grpc_slice));
memcpy(temp, a->base_slices, a_count * sizeof(grpc_slice));
memcpy(a->base_slices, b->base_slices, b_count * sizeof(grpc_slice));
memcpy(b->base_slices, temp, a_count * sizeof(grpc_slice));
} else {
/* a is inlined, b is not - copy a inlined into b, fix pointers */
a->slices = b->slices;
b->slices = b->inlined;
memcpy(b->slices, a->inlined, b->count * sizeof(grpc_slice));
a->base_slices = b->base_slices;
b->base_slices = b->inlined;
memcpy(b->base_slices, a->inlined, a_count * sizeof(grpc_slice));
}
} else if (b->slices == b->inlined) {
} else if (b->base_slices == b->inlined) {
/* b is inlined, a is not - copy b inlined int a, fix pointers */
b->slices = a->slices;
a->slices = a->inlined;
memcpy(a->slices, b->inlined, a->count * sizeof(grpc_slice));
b->base_slices = a->base_slices;
a->base_slices = a->inlined;
memcpy(a->base_slices, b->inlined, b_count * sizeof(grpc_slice));
} else {
/* no inlining: easy swap */
GPR_SWAP(grpc_slice *, a->slices, b->slices);
GPR_SWAP(grpc_slice *, a->base_slices, b->base_slices);
}

/* Update the slices pointers (cannot do a GPR_SWAP on slices fields here).
* Also note that since the base_slices pointers are already swapped we need
* use 'b_offset' for 'a->base_slices' and vice versa */
a->slices = a->base_slices + b_offset;
b->slices = b->base_slices + a_offset;

/* base_slices and slices fields are correctly set. Swap all other fields */
GPR_SWAP(size_t, a->count, b->count);
GPR_SWAP(size_t, a->capacity, b->capacity);
GPR_SWAP(size_t, a->length, b->length);
}

void grpc_slice_buffer_move_into(grpc_slice_buffer *src,
Expand All @@ -229,42 +253,62 @@ void grpc_slice_buffer_move_into(grpc_slice_buffer *src,

void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n,
grpc_slice_buffer *dst) {
size_t src_idx;
size_t output_len = dst->length + n;
size_t new_input_len = src->length - n;
GPR_ASSERT(src->length >= n);
if (src->length == n) {
grpc_slice_buffer_move_into(src, dst);
return;
}
src_idx = 0;
while (src_idx < src->capacity) {
grpc_slice slice = src->slices[src_idx];

while (src->count > 0) {
grpc_slice slice = grpc_slice_buffer_take_first(src);
size_t slice_len = GRPC_SLICE_LENGTH(slice);
if (n > slice_len) {
grpc_slice_buffer_add(dst, slice);
n -= slice_len;
src_idx++;
} else if (n == slice_len) {
grpc_slice_buffer_add(dst, slice);
src_idx++;
break;
} else { /* n < slice_len */
src->slices[src_idx] = grpc_slice_split_tail(&slice, n);
grpc_slice_buffer_undo_take_first(src, grpc_slice_split_tail(&slice, n));
GPR_ASSERT(GRPC_SLICE_LENGTH(slice) == n);
GPR_ASSERT(GRPC_SLICE_LENGTH(src->slices[src_idx]) == slice_len - n);
grpc_slice_buffer_add(dst, slice);
break;
}
}
GPR_ASSERT(dst->length == output_len);
memmove(src->slices, src->slices + src_idx,
sizeof(grpc_slice) * (src->count - src_idx));
src->count -= src_idx;
src->length = new_input_len;
GPR_ASSERT(src->length == new_input_len);
GPR_ASSERT(src->count > 0);
}

void grpc_slice_buffer_move_first_into_buffer(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer *src, size_t n,
void *dst) {
char *dstp = dst;
GPR_ASSERT(src->length >= n);

while (n > 0) {
grpc_slice slice = grpc_slice_buffer_take_first(src);
size_t slice_len = GRPC_SLICE_LENGTH(slice);
if (slice_len > n) {
memcpy(dstp, GRPC_SLICE_START_PTR(slice), n);
grpc_slice_buffer_undo_take_first(
src, grpc_slice_sub_no_ref(slice, n, slice_len));
n = 0;
} else if (slice_len == n) {
memcpy(dstp, GRPC_SLICE_START_PTR(slice), n);
grpc_slice_unref_internal(exec_ctx, slice);
n = 0;
} else {
memcpy(dstp, GRPC_SLICE_START_PTR(slice), slice_len);
dstp += slice_len;
n -= slice_len;
grpc_slice_unref_internal(exec_ctx, slice);
}
}
}

void grpc_slice_buffer_trim_end(grpc_slice_buffer *sb, size_t n,
grpc_slice_buffer *garbage) {
GPR_ASSERT(n <= sb->length);
Expand Down Expand Up @@ -293,8 +337,17 @@ grpc_slice grpc_slice_buffer_take_first(grpc_slice_buffer *sb) {
grpc_slice slice;
GPR_ASSERT(sb->count > 0);
slice = sb->slices[0];
memmove(&sb->slices[0], &sb->slices[1], (sb->count - 1) * sizeof(grpc_slice));
sb->slices++;
sb->count--;
sb->length -= GRPC_SLICE_LENGTH(slice);

return slice;
}

void grpc_slice_buffer_undo_take_first(grpc_slice_buffer *sb,
grpc_slice slice) {
sb->slices--;
sb->slices[0] = slice;
sb->count++;
sb->length += GRPC_SLICE_LENGTH(slice);
}
12 changes: 6 additions & 6 deletions src/node/ext/call_credentials.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include <nan.h>
#include <uv.h>

#include <list>
#include <queue>

#include "grpc/grpc.h"
#include "grpc/grpc_security.h"
Expand Down Expand Up @@ -170,7 +170,7 @@ NAN_METHOD(CallCredentials::CreateFromPlugin) {
grpc_metadata_credentials_plugin plugin;
plugin_state *state = new plugin_state;
state->callback = new Nan::Callback(info[0].As<Function>());
state->pending_callbacks = new std::list<plugin_callback_data*>();
state->pending_callbacks = new std::queue<plugin_callback_data*>();
uv_mutex_init(&state->plugin_mutex);
uv_async_init(uv_default_loop(),
&state->plugin_async,
Expand Down Expand Up @@ -231,13 +231,13 @@ NAN_METHOD(PluginCallback) {
NAUV_WORK_CB(SendPluginCallback) {
Nan::HandleScope scope;
plugin_state *state = reinterpret_cast<plugin_state*>(async->data);
std::list<plugin_callback_data*> callbacks;
std::queue<plugin_callback_data*> callbacks;
uv_mutex_lock(&state->plugin_mutex);
callbacks.splice(callbacks.begin(), *state->pending_callbacks);
state->pending_callbacks->swap(callbacks);
uv_mutex_unlock(&state->plugin_mutex);
while (!callbacks.empty()) {
plugin_callback_data *data = callbacks.front();
callbacks.pop_front();
callbacks.pop();
Local<Object> callback_data = Nan::New<Object>();
Nan::Set(callback_data, Nan::New("cb").ToLocalChecked(),
Nan::New<v8::External>(reinterpret_cast<void*>(data->cb)));
Expand Down Expand Up @@ -266,7 +266,7 @@ void plugin_get_metadata(void *state, grpc_auth_metadata_context context,
data->user_data = user_data;

uv_mutex_lock(&p_state->plugin_mutex);
p_state->pending_callbacks->push_back(data);
p_state->pending_callbacks->push(data);
uv_mutex_unlock(&p_state->plugin_mutex);

uv_async_send(&p_state->plugin_async);
Expand Down
4 changes: 2 additions & 2 deletions src/node/ext/call_credentials.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#ifndef GRPC_NODE_CALL_CREDENTIALS_H_
#define GRPC_NODE_CALL_CREDENTIALS_H_

#include <list>
#include <queue>

#include <node.h>
#include <nan.h>
Expand Down Expand Up @@ -84,7 +84,7 @@ typedef struct plugin_callback_data {

typedef struct plugin_state {
Nan::Callback *callback;
std::list<plugin_callback_data*> *pending_callbacks;
std::queue<plugin_callback_data*> *pending_callbacks;
uv_mutex_t plugin_mutex;
// async.data == this
uv_async_t plugin_async;
Expand Down
14 changes: 7 additions & 7 deletions src/node/ext/node_grpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
*/

#include <list>
#include <queue>

#include <node.h>
#include <nan.h>
Expand Down Expand Up @@ -77,7 +77,7 @@ typedef struct log_args {

typedef struct logger_state {
Nan::Callback *callback;
std::list<log_args *> *pending_args;
std::queue<log_args *> *pending_args;
uv_mutex_t mutex;
uv_async_t async;
// Indicates that a logger has been set
Expand Down Expand Up @@ -338,14 +338,14 @@ NAN_METHOD(SetDefaultRootsPem) {

NAUV_WORK_CB(LogMessagesCallback) {
Nan::HandleScope scope;
std::list<log_args *> args;
std::queue<log_args *> args;
uv_mutex_lock(&grpc_logger_state.mutex);
args.splice(args.begin(), *grpc_logger_state.pending_args);
grpc_logger_state.pending_args->swap(args);
uv_mutex_unlock(&grpc_logger_state.mutex);
/* Call the callback with each log message */
while (!args.empty()) {
log_args *arg = args.front();
args.pop_front();
args.pop();
Local<Value> file = Nan::New(arg->core_args.file).ToLocalChecked();
Local<Value> line = Nan::New<Uint32, uint32_t>(arg->core_args.line);
Local<Value> severity = Nan::New(
Expand All @@ -372,15 +372,15 @@ void node_log_func(gpr_log_func_args *args) {
args_copy->timestamp = gpr_now(GPR_CLOCK_REALTIME);

uv_mutex_lock(&grpc_logger_state.mutex);
grpc_logger_state.pending_args->push_back(args_copy);
grpc_logger_state.pending_args->push(args_copy);
uv_mutex_unlock(&grpc_logger_state.mutex);

uv_async_send(&grpc_logger_state.async);
}

void init_logger() {
memset(&grpc_logger_state, 0, sizeof(logger_state));
grpc_logger_state.pending_args = new std::list<log_args *>();
grpc_logger_state.pending_args = new std::queue<log_args *>();
uv_mutex_init(&grpc_logger_state.mutex);
uv_async_init(uv_default_loop(),
&grpc_logger_state.async,
Expand Down
Loading

0 comments on commit 0f2e2e4

Please sign in to comment.