Skip to content

Commit

Permalink
Merge pull request grpc#7024 from markdroth/filter_call_init_failure
Browse files Browse the repository at this point in the history
Extend filter API to allow call initialization to return an error.
  • Loading branch information
kpayson64 authored Aug 2, 2016
2 parents 52f3b67 + cca4a19 commit 212a03e
Show file tree
Hide file tree
Showing 31 changed files with 1,079 additions and 83 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6518,6 +6518,7 @@ LIBEND2END_TESTS_SRC = \
test/core/end2end/tests/default_host.c \
test/core/end2end/tests/disappearing_server.c \
test/core/end2end/tests/empty_batch.c \
test/core/end2end/tests/filter_call_init_fails.c \
test/core/end2end/tests/filter_causes_close.c \
test/core/end2end/tests/graceful_server_shutdown.c \
test/core/end2end/tests/high_initial_seqno.c \
Expand Down Expand Up @@ -6597,6 +6598,7 @@ LIBEND2END_NOSEC_TESTS_SRC = \
test/core/end2end/tests/default_host.c \
test/core/end2end/tests/disappearing_server.c \
test/core/end2end/tests/empty_batch.c \
test/core/end2end/tests/filter_call_init_fails.c \
test/core/end2end/tests/filter_causes_close.c \
test/core/end2end/tests/graceful_server_shutdown.c \
test/core/end2end/tests/high_initial_seqno.c \
Expand Down
14 changes: 8 additions & 6 deletions src/core/ext/census/grpc_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,14 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}

static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
return GRPC_ERROR_NONE;
}

static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
Expand All @@ -145,15 +146,16 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
}

static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */
grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
return GRPC_ERROR_NONE;
}

static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
Expand Down
6 changes: 4 additions & 2 deletions src/core/ext/client_config/client_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}

/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
args->call_stack);
return GRPC_ERROR_NONE;
}

/* Destructor for call_data */
Expand Down
25 changes: 16 additions & 9 deletions src/core/ext/client_config/subchannel.c
Original file line number Diff line number Diff line change
Expand Up @@ -702,19 +702,26 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
return GET_CONNECTED_SUBCHANNEL(c, acq);
}

grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_polling_entity *pollent) {
grpc_polling_entity *pollent, grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
NULL, NULL, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
grpc_error_free_string(error_string);
gpr_free(*call);
return error;
}
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
NULL, NULL, callstk);
grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
return call;
return GRPC_ERROR_NONE;
}

grpc_call_stack *grpc_subchannel_call_get_call_stack(
Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/client_config/subchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);

/** construct a subchannel call */
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
grpc_polling_entity *pollent);
grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);

/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
Expand Down
38 changes: 29 additions & 9 deletions src/core/ext/client_config/subchannel_call_holder.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
gpr_free(holder->waiting_ops);
}

// The logic here is fairly complicated, due to (a) the fact that we
// need to handle the case where we receive the send op before the
// initial metadata op, and (b) the need for efficiency, especially in
// the streaming case.
// TODO(ctiller): Explain this more thoroughly.
void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op) {
Expand Down Expand Up @@ -121,7 +126,8 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_error != GRPC_ERROR_NONE) {
if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
if (!gpr_atm_rel_cas(&holder->subchannel_call, 0,
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
switch (holder->creation_phase) {
Expand Down Expand Up @@ -158,10 +164,17 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
holder->connected_subchannel != NULL) {
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
exec_ctx, holder->connected_subchannel, holder->pollent));
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, holder->connected_subchannel, holder->pollent,
&subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, holder, GRPC_ERROR_REF(error));
grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
}
gpr_atm_rel_store(&holder->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
Expand Down Expand Up @@ -189,10 +202,17 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING(
"Cancelled before creating subchannel", &error, 1));
} else {
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
exec_ctx, holder->connected_subchannel, holder->pollent));
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, holder->connected_subchannel, holder->pollent,
&subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, holder, new_error);
}
gpr_atm_rel_store(&holder->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
Expand Down
7 changes: 5 additions & 2 deletions src/core/ext/load_reporting/load_reporting_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
}

/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));

Expand All @@ -125,6 +126,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
NULL,
NULL};
*/

return GRPC_ERROR_NONE;
}

/* Destructor for call_data */
Expand Down
25 changes: 18 additions & 7 deletions src/core/lib/channel/channel_stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}

void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack) {
grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
Expand All @@ -178,17 +179,27 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));

/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
for (i = 0; i < count; i++) {
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
grpc_error *error =
call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
if (error != GRPC_ERROR_NONE) {
if (first_error == GRPC_ERROR_NONE) {
first_error = error;
} else {
GRPC_ERROR_UNREF(error);
}
}
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
return first_error;
}

void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
Expand Down
18 changes: 10 additions & 8 deletions src/core/lib/channel/channel_stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ typedef struct {
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
argument. */
void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args);
grpc_error *(*init_call_elem)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args);
void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
Expand Down Expand Up @@ -215,12 +216,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack, int initial_refs,
grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack);
grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
Expand Down
7 changes: 5 additions & 2 deletions src/core/lib/channel/compress_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}

/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;

Expand All @@ -266,6 +267,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
calld->has_compression_algorithm = 0;
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);

return GRPC_ERROR_NONE;
}

/* Destructor for call_data */
Expand Down
12 changes: 6 additions & 6 deletions src/core/lib/channel/connected_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,16 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
}

/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;

r = grpc_transport_init_stream(
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data);
GPR_ASSERT(r == 0);
return r == 0 ? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("transport stream initialization failed");
}

static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
Expand Down
6 changes: 4 additions & 2 deletions src/core/lib/channel/http_client_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,13 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
}

/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
calld->on_done_recv = NULL;
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
return GRPC_ERROR_NONE;
}

/* Destructor for call_data */
Expand Down
6 changes: 4 additions & 2 deletions src/core/lib/channel/http_server_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,15 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
}

/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
return GRPC_ERROR_NONE;
}

/* Destructor for call_data */
Expand Down
2 changes: 2 additions & 0 deletions src/core/lib/iomgr/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
gpr_timespec value) GRPC_MUST_USE_RESULT;
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) GRPC_MUST_USE_RESULT;
/// Returns NULL if the specified string is not set.
/// Caller does NOT own return value.
const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which);
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
Expand Down
6 changes: 4 additions & 2 deletions src/core/lib/security/transport/client_auth_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,12 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}

/* Constructor for call_data */
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(*calld));
return GRPC_ERROR_NONE;
}

static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
Expand Down
Loading

0 comments on commit 212a03e

Please sign in to comment.