Skip to content

Commit

Permalink
Merge pull request grpc#9252 from ctiller/write_buffering
Browse files Browse the repository at this point in the history
Fix write buffering, add tests
  • Loading branch information
ctiller authored Jan 17, 2017
2 parents 1e27758 + 0a5a318 commit c2079f3
Show file tree
Hide file tree
Showing 16 changed files with 2,160 additions and 63 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7303,6 +7303,8 @@ LIBEND2END_TESTS_SRC = \
test/core/end2end/tests/simple_request.c \
test/core/end2end/tests/streaming_error_response.c \
test/core/end2end/tests/trailing_metadata.c \
test/core/end2end/tests/write_buffering.c \
test/core/end2end/tests/write_buffering_at_end.c \

PUBLIC_HEADERS_C += \

Expand Down Expand Up @@ -7389,6 +7391,8 @@ LIBEND2END_NOSEC_TESTS_SRC = \
test/core/end2end/tests/simple_request.c \
test/core/end2end/tests/streaming_error_response.c \
test/core/end2end/tests/trailing_metadata.c \
test/core/end2end/tests/write_buffering.c \
test/core/end2end/tests/write_buffering_at_end.c \

PUBLIC_HEADERS_C += \

Expand Down
3 changes: 3 additions & 0 deletions include/grpc/impl/codegen/grpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ typedef struct {
Larger values give lower CPU usage for large messages, but more head of line
blocking for small messages. */
#define GRPC_ARG_HTTP2_MAX_FRAME_SIZE "grpc.http2.max_frame_size"
/** How much data are we willing to queue up per stream if
GRPC_WRITE_BUFFER_HINT is set? This is an upper bound */
#define GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE "grpc.http2.write_buffer_size"
/** Default authority to pass if none specified on call construction. A string.
* */
#define GRPC_ARG_DEFAULT_AUTHORITY "grpc.default_authority"
Expand Down
33 changes: 23 additions & 10 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu

#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)

#define MAX_CLIENT_STREAM_ID 0x7fffffffu
Expand Down Expand Up @@ -271,6 +271,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
t->write_buffer_size = DEFAULT_WINDOW;

if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
Expand Down Expand Up @@ -321,6 +322,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_set_max_usable_size(&t->hpack_compressor,
(uint32_t)value);
}
} else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else {
static const struct {
const char *channel_arg_name;
Expand Down Expand Up @@ -899,15 +905,22 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) {
return false;
}

static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
}
}

static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
s->fetched_send_message_length +=
(uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice);
grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
if (s->id != 0) {
grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
}
maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}

static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
Expand Down Expand Up @@ -1100,14 +1113,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
(int64_t)len;
s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
/* allow up to 64kb to be buffered */
/* TODO(ctiller): make this configurable */
s->next_message_end_offset -= 65536;
s->next_message_end_offset -= t->write_buffer_size;
s->write_buffering = true;
} else {
s->write_buffering = false;
}
continue_fetching_send_locked(exec_ctx, t, s);
if (s->id != 0) {
grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
}
maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
}

Expand All @@ -1116,6 +1128,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
s->write_buffering = false;
const size_t metadata_size =
grpc_metadata_batch_size(op->send_trailing_metadata);
const size_t metadata_peer_limit =
Expand Down
6 changes: 6 additions & 0 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ struct grpc_chttp2_transport {
int64_t announce_incoming_window;
/** how much window would we like to have for incoming_window */
uint32_t connection_window_target;
/** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
*/
uint32_t write_buffer_size;

/** have we seen a goaway */
uint8_t seen_goaway;
Expand Down Expand Up @@ -403,6 +406,9 @@ struct grpc_chttp2_stream {
/** Has this stream seen an error.
If true, then pending incoming frames can be thrown away. */
bool seen_error;
/** Are we buffering writes on this stream? If yes, we won't become writable
until there's enough queued up in the flow_controlled_buffer */
bool write_buffering;

/** the error that resulted in this stream being read-closed */
grpc_error *read_closed_error;
Expand Down
16 changes: 16 additions & 0 deletions test/core/end2end/end2end_nosec_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ extern void streaming_error_response(grpc_end2end_test_config config);
extern void streaming_error_response_pre_init(void);
extern void trailing_metadata(grpc_end2end_test_config config);
extern void trailing_metadata_pre_init(void);
extern void write_buffering(grpc_end2end_test_config config);
extern void write_buffering_pre_init(void);
extern void write_buffering_at_end(grpc_end2end_test_config config);
extern void write_buffering_at_end_pre_init(void);

void grpc_end2end_tests_pre_init(void) {
GPR_ASSERT(!g_pre_init_called);
Expand Down Expand Up @@ -185,6 +189,8 @@ void grpc_end2end_tests_pre_init(void) {
simple_request_pre_init();
streaming_error_response_pre_init();
trailing_metadata_pre_init();
write_buffering_pre_init();
write_buffering_at_end_pre_init();
}

void grpc_end2end_tests(int argc, char **argv,
Expand Down Expand Up @@ -240,6 +246,8 @@ void grpc_end2end_tests(int argc, char **argv,
simple_request(config);
streaming_error_response(config);
trailing_metadata(config);
write_buffering(config);
write_buffering_at_end(config);
return;
}

Expand Down Expand Up @@ -428,6 +436,14 @@ void grpc_end2end_tests(int argc, char **argv,
trailing_metadata(config);
continue;
}
if (0 == strcmp("write_buffering", argv[i])) {
write_buffering(config);
continue;
}
if (0 == strcmp("write_buffering_at_end", argv[i])) {
write_buffering_at_end(config);
continue;
}
gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]);
abort();
}
Expand Down
16 changes: 16 additions & 0 deletions test/core/end2end/end2end_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ extern void streaming_error_response(grpc_end2end_test_config config);
extern void streaming_error_response_pre_init(void);
extern void trailing_metadata(grpc_end2end_test_config config);
extern void trailing_metadata_pre_init(void);
extern void write_buffering(grpc_end2end_test_config config);
extern void write_buffering_pre_init(void);
extern void write_buffering_at_end(grpc_end2end_test_config config);
extern void write_buffering_at_end_pre_init(void);

void grpc_end2end_tests_pre_init(void) {
GPR_ASSERT(!g_pre_init_called);
Expand Down Expand Up @@ -188,6 +192,8 @@ void grpc_end2end_tests_pre_init(void) {
simple_request_pre_init();
streaming_error_response_pre_init();
trailing_metadata_pre_init();
write_buffering_pre_init();
write_buffering_at_end_pre_init();
}

void grpc_end2end_tests(int argc, char **argv,
Expand Down Expand Up @@ -244,6 +250,8 @@ void grpc_end2end_tests(int argc, char **argv,
simple_request(config);
streaming_error_response(config);
trailing_metadata(config);
write_buffering(config);
write_buffering_at_end(config);
return;
}

Expand Down Expand Up @@ -436,6 +444,14 @@ void grpc_end2end_tests(int argc, char **argv,
trailing_metadata(config);
continue;
}
if (0 == strcmp("write_buffering", argv[i])) {
write_buffering(config);
continue;
}
if (0 == strcmp("write_buffering_at_end", argv[i])) {
write_buffering_at_end(config);
continue;
}
gpr_log(GPR_DEBUG, "not a test: '%s'", argv[i]);
abort();
}
Expand Down
2 changes: 2 additions & 0 deletions test/core/end2end/gen_build_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@
'streaming_error_response': default_test_options,
'trailing_metadata': default_test_options,
'authority_not_supported': default_test_options,
'write_buffering': default_test_options,
'write_buffering_at_end': default_test_options,
}


Expand Down
2 changes: 2 additions & 0 deletions test/core/end2end/generate_tests.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ END2END_TESTS = {
'trailing_metadata': test_options(),
'authority_not_supported': test_options(),
'filter_latency': test_options(),
'write_buffering': test_options(),
'write_buffering_at_end': test_options(),
}


Expand Down
Loading

0 comments on commit c2079f3

Please sign in to comment.