Skip to content

Commit

Permalink
Update with the new ping implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
y-zeng committed Feb 27, 2017
1 parent 990d9fe commit efdf5a3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 79 deletions.
170 changes: 95 additions & 75 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3

/** keepalive-relevant functions */
static void keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void keepalive_ping_ack_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);

Expand Down Expand Up @@ -224,73 +226,6 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
#endif

static void keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
t->keepalive_ping_id = send_ping_locked(
exec_ctx, t,
grpc_closure_create(keepalive_ping_ack_locked, t,
grpc_combiner_scheduler(t->combiner, false)));
if (t->keepalive_permit_without_calls || t->stream_map.count > 0)
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
grpc_timer_init(
exec_ctx, &t->keepalive_watchdog_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout),
grpc_closure_create(keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner, false)),
gpr_now(GPR_CLOCK_MONOTONIC));
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping");
}

static void keepalive_ping_ack_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
t->keepalive_ping_id = NULL;
grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
grpc_closure_create(keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false)),
gpr_now(GPR_CLOCK_MONOTONIC));
}
} else {
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_DYING);
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end");
}

static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
if (t->keepalive_ping_id != NULL) {
grpc_chttp2_ack_ping(exec_ctx, t, t->keepalive_ping_id);
}
close_transport_locked(exec_ctx, t,
GRPC_ERROR_CREATE("keepalive watchdog timeout"));
}
} else {
// GRPC_LOG_IF_ERROR("keepalive_watchdog", error);
if (error != GRPC_ERROR_CANCELLED) {
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog");
}

static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
const grpc_channel_args *channel_args,
grpc_endpoint *ep, bool is_client) {
Expand Down Expand Up @@ -337,6 +272,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->start_keepalive_ping_locked,
start_keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->finish_keepalive_ping_locked,
finish_keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false));

grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
Expand Down Expand Up @@ -400,9 +341,14 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,

/* client-side keepalive setting */
t->keepalive_time =
gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN);
DEFAULT_KEEPALIVE_TIME_SECOND == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN);
t->keepalive_timeout =
gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, GPR_TIMESPAN);
DEFAULT_KEEPALIVE_TIMEOUT_SECOND == INT_MAX
? gpr_inf_future(GPR_TIMESPAN)
: gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND,
GPR_TIMESPAN);
t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;

if (channel_args) {
Expand Down Expand Up @@ -529,11 +475,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
// Start client-side keepalive pings
if (t->is_client) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping");
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
grpc_closure_create(keepalive_ping_locked, t,
grpc_closure_create(init_keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false)),
gpr_now(GPR_CLOCK_MONOTONIC));
}
Expand Down Expand Up @@ -2127,6 +2073,80 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
}

static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) {
if (t->keepalive_permit_without_calls || t->stream_map.count > 0) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE,
&t->start_keepalive_ping_locked,
&t->finish_keepalive_ping_locked);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
grpc_closure_create(init_keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false)),
gpr_now(GPR_CLOCK_MONOTONIC));
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping");
}

static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
grpc_timer_init(
exec_ctx, &t->keepalive_watchdog_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout),
grpc_closure_create(keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner, false)),
gpr_now(GPR_CLOCK_MONOTONIC));
}

static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer);
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(
exec_ctx, &t->keepalive_ping_timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time),
grpc_closure_create(init_keepalive_ping_locked, t,
grpc_combiner_scheduler(t->combiner, false)),
gpr_now(GPR_CLOCK_MONOTONIC));
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end");
}

static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
if (error == GRPC_ERROR_NONE) {
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
close_transport_locked(exec_ctx, t,
GRPC_ERROR_CREATE("keepalive watchdog timeout"));
}
} else {
// The watchdog timer should have been cancelled.
if (error != GRPC_ERROR_CANCELLED) {
gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)",
t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING);
}
}
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog");
}

/*******************************************************************************
* CALLBACK LOOP
*/
Expand Down
2 changes: 2 additions & 0 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ struct grpc_chttp2_transport {
grpc_closure destructive_reclaimer_locked;

/* keep-alive ping support */
grpc_closure start_keepalive_ping_locked;
grpc_closure finish_keepalive_ping_locked;
/** timer to initiate ping events */
grpc_timer keepalive_ping_timer;
/** watchdog to kill the transport when waiting for the keepalive ping */
Expand Down
8 changes: 4 additions & 4 deletions test/core/end2end/tests/keepalive_timeout.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
}

static gpr_timespec n_seconds_time(int n) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
return grpc_timeout_seconds_to_deadline(n);
}

static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
Expand All @@ -76,9 +76,9 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(
f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
.type == GRPC_OP_COMPLETE);
GPR_ASSERT(
grpc_completion_queue_pluck(f->cq, tag(1000), five_seconds_time(), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}
Expand Down

0 comments on commit efdf5a3

Please sign in to comment.