Skip to content

Commit

Permalink
Merge pull request grpc#10845 from markdroth/client_channel_cleanup
Browse files Browse the repository at this point in the history
Code clean-up in client_channel.
  • Loading branch information
markdroth authored May 4, 2017
2 parents 98e8d59 + 9221083 commit f0f147a
Showing 1 changed file with 46 additions and 59 deletions.
105 changes: 46 additions & 59 deletions src/core/ext/filters/client_channel/client_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,

#define CANCELLED_CALL ((grpc_subchannel_call *)1)

typedef enum {
/* zero so that it can be default-initialized */
GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0,
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
} subchannel_creation_phase;

/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
Expand Down Expand Up @@ -793,7 +787,7 @@ typedef struct client_channel_call_data {
gpr_atm subchannel_call;
gpr_arena *arena;

subchannel_creation_phase creation_phase;
bool pick_pending;
grpc_connected_subchannel *connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
grpc_polling_entity *pollent;
Expand Down Expand Up @@ -915,11 +909,10 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *elem = arg;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(calld->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
GPR_ASSERT(calld->pick_pending);
calld->pick_pending = false;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (calld->connected_subchannel == NULL) {
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
fail_locked(exec_ctx, calld,
Expand Down Expand Up @@ -988,8 +981,7 @@ static bool pick_subchannel_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready,
grpc_error *error);
grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready);

static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
Expand All @@ -1002,49 +994,48 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (pick_subchannel_locked(
exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags, cpa->connected_subchannel,
cpa->subchannel_call_context, cpa->on_ready, GRPC_ERROR_NONE)) {
cpa->subchannel_call_context, cpa->on_ready)) {
grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
}
}
gpr_free(cpa);
}

static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_error *error) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
&calld->connected_subchannel,
GRPC_ERROR_REF(error));
}
for (grpc_closure *closure = chand->waiting_for_config_closures.head;
closure != NULL; closure = closure->next_data.next) {
continue_picking_args *cpa = closure->cb_arg;
if (cpa->connected_subchannel == &calld->connected_subchannel) {
cpa->connected_subchannel = NULL;
grpc_closure_sched(exec_ctx, cpa->on_ready,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
}
}
GRPC_ERROR_UNREF(error);
}

static bool pick_subchannel_locked(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready,
grpc_error *error) {
grpc_call_context_element *subchannel_call_context,
grpc_closure *on_ready) {
GPR_TIMER_BEGIN("pick_subchannel", 0);

channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
continue_picking_args *cpa;
grpc_closure *closure;

GPR_ASSERT(connected_subchannel);

if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy,
connected_subchannel,
GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
grpc_closure_sched(exec_ctx, cpa->on_ready,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
}
}
GPR_TIMER_END("pick_subchannel", 0);
GRPC_ERROR_UNREF(error);
return true;
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
apply_final_configuration_locked(exec_ctx, elem);
grpc_lb_policy *lb_policy = chand->lb_policy;
Expand Down Expand Up @@ -1101,7 +1092,7 @@ static bool pick_subchannel_locked(
&chand->on_resolver_result_changed);
}
if (chand->resolver != NULL) {
cpa = gpr_malloc(sizeof(*cpa));
continue_picking_args *cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
cpa->initial_metadata_flags = initial_metadata_flags;
cpa->connected_subchannel = connected_subchannel;
Expand Down Expand Up @@ -1157,16 +1148,13 @@ static void start_transport_stream_op_batch_locked_inner(
error to the caller when the first op does get passed down. */
calld->cancel_error =
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, calld,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel_locked(
exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, NULL,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
break;
if (calld->pick_pending) {
cancel_pick_locked(
exec_ctx, elem,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
} else {
fail_locked(exec_ctx, calld,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
}
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op,
Expand All @@ -1176,9 +1164,9 @@ static void start_transport_stream_op_batch_locked_inner(
}
}
/* if we don't have a subchannel, try to get one */
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel == NULL && op->send_initial_metadata) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
if (!calld->pick_pending && calld->connected_subchannel == NULL &&
op->send_initial_metadata) {
calld->pick_pending = true;
grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
grpc_combiner_scheduler(chand->combiner, true));
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
Expand All @@ -1190,17 +1178,16 @@ static void start_transport_stream_op_batch_locked_inner(
op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags,
&calld->connected_subchannel, calld->subchannel_call_context,
&calld->next_step, GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
&calld->next_step)) {
calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
} else {
grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel != NULL) {
if (!calld->pick_pending && calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
const grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
Expand Down Expand Up @@ -1357,7 +1344,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
then_schedule_closure = NULL;
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
}
GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
GPR_ASSERT(!calld->pick_pending);
GPR_ASSERT(calld->waiting_ops_count == 0);
if (calld->connected_subchannel != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel,
Expand Down Expand Up @@ -1464,12 +1451,12 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,

void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete) {
grpc_connectivity_state *state, grpc_closure *closure) {
channel_data *chand = elem->channel_data;
external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
w->chand = chand;
w->pollset = pollset;
w->on_complete = on_complete;
w->on_complete = closure;
w->state = state;
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
Expand Down

0 comments on commit f0f147a

Please sign in to comment.