Skip to content

Commit

Permalink
Merge pull request grpc#9268 from soltanmm-google/diddy-kong-racing
Browse files Browse the repository at this point in the history
Use `grpc_closure`s in `grpc_timer`s
  • Loading branch information
soltanmm-google authored Jan 6, 2017
2 parents aeddd38 + b5b4372 commit 071562d
Show file tree
Hide file tree
Showing 18 changed files with 104 additions and 60 deletions.
5 changes: 4 additions & 1 deletion src/core/ext/client_channel/channel_connectivity.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
grpc_closure on_complete;
grpc_closure on_timeout;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
Expand Down Expand Up @@ -200,6 +201,8 @@ void grpc_channel_watch_connectivity_state(
gpr_mu_init(&w->mu);
grpc_closure_init(&w->on_complete, watch_complete, w,
grpc_schedule_on_exec_ctx);
grpc_closure_init(&w->on_timeout, timeout_complete, w,
grpc_schedule_on_exec_ctx);
w->phase = WAITING;
w->state = last_observed_state;
w->cq = cq;
Expand All @@ -208,7 +211,7 @@ void grpc_channel_watch_connectivity_state(

grpc_timer_init(&exec_ctx, &w->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
&w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));

if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
Expand Down
6 changes: 5 additions & 1 deletion src/core/ext/client_channel/subchannel.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ struct grpc_subchannel {
/** callback for connection finishing */
grpc_closure connected;

/** callback for our alarm */
grpc_closure on_alarm;

/** pollset_set tracking who's interested in a connection
being setup */
grpc_pollset_set *pollset_set;
Expand Down Expand Up @@ -483,7 +486,8 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds",
time_til_next.tv_sec, time_til_next.tv_nsec);
}
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
grpc_closure_init(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now);
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/core/ext/lb_policy/grpclb/grpclb.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ typedef struct glb_lb_policy {
/* A response from the LB server has been received. Process it */
grpc_closure lb_on_response_received;

/* LB call retry timer callback. */
grpc_closure lb_on_call_retry;

grpc_call *lb_call; /* streaming call to the LB server, */

grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
Expand Down Expand Up @@ -1364,8 +1367,10 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
}
}
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer,
glb_policy, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
lb_call_on_retry_timer, glb_policy, now);
&glb_policy->lb_on_call_retry, now);
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
Expand Down
6 changes: 4 additions & 2 deletions src/core/ext/resolver/dns/native/dns_resolver.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ typedef struct {
/** retry timer */
bool have_retry_timer;
grpc_timer retry_timer;
grpc_closure on_retry;
/** retry backoff state */
gpr_backoff backoff_state;

Expand Down Expand Up @@ -199,8 +200,9 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
} else {
gpr_log(GPR_DEBUG, "retrying immediately");
}
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r,
now);
grpc_closure_init(&r->on_retry, dns_on_retry_timer, r,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now);
}
if (r->resolved_result != NULL) {
grpc_channel_args_destroy(exec_ctx, r->resolved_result);
Expand Down
7 changes: 5 additions & 2 deletions src/core/lib/channel/deadline_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
// Take a reference to the call stack, to be owned by the timer.
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
deadline_state->timer_pending = true;
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
elem, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline,
&deadline_state->timer_callback,
gpr_now(GPR_CLOCK_MONOTONIC));
}
}
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
Expand Down
1 change: 1 addition & 0 deletions src/core/lib/channel/deadline_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ typedef struct grpc_deadline_state {
bool timer_pending;
// The deadline timer.
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
// We use this to cancel the timer.
grpc_closure on_complete;
Expand Down
5 changes: 4 additions & 1 deletion src/core/lib/channel/handshaker.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct grpc_handshake_manager {
grpc_tcp_server_acceptor* acceptor;
// Deadline timer across all handshakers.
grpc_timer deadline_timer;
grpc_closure on_timeout;
// The final callback and user_data to invoke after the last handshaker.
grpc_closure on_handshake_done;
void* user_data;
Expand Down Expand Up @@ -224,9 +225,11 @@ void grpc_handshake_manager_do_handshake(
grpc_schedule_on_exec_ctx);
// Start deadline timer, which owns a ref.
gpr_ref(&mgr->refs);
grpc_closure_init(&mgr->on_timeout, on_timeout, mgr,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &mgr->deadline_timer,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC));
&mgr->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
// Start first handshaker, which also owns a ref.
gpr_ref(&mgr->refs);
bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE);
Expand Down
4 changes: 3 additions & 1 deletion src/core/lib/iomgr/tcp_client_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ typedef struct {
grpc_fd *fd;
gpr_timespec deadline;
grpc_timer alarm;
grpc_closure on_alarm;
int refs;
grpc_closure write_closure;
grpc_pollset_set *interested_parties;
Expand Down Expand Up @@ -352,9 +353,10 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}

gpr_mu_lock(&ac->mu);
grpc_closure_init(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
&ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);

Expand Down
5 changes: 4 additions & 1 deletion src/core/lib/iomgr/tcp_client_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
typedef struct grpc_uv_tcp_connect {
uv_connect_t connect_req;
grpc_timer alarm;
grpc_closure on_alarm;
uv_tcp_t *tcp_handle;
grpc_closure *closure;
grpc_endpoint **endpoint;
Expand Down Expand Up @@ -148,9 +149,11 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
(const struct sockaddr *)resolved_addr->addr,
uv_tc_on_connect);
grpc_closure_init(&connect->on_alarm, uv_tc_on_alarm, connect,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &connect->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
uv_tc_on_alarm, connect, gpr_now(GPR_CLOCK_MONOTONIC));
&connect->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
}

// overridden by api_fuzzer.c
Expand Down
4 changes: 3 additions & 1 deletion src/core/lib/iomgr/tcp_client_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ typedef struct {
grpc_winsocket *socket;
gpr_timespec deadline;
grpc_timer alarm;
grpc_closure on_alarm;
char *addr_name;
int refs;
grpc_closure on_connect;
Expand Down Expand Up @@ -229,7 +230,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
ac->resource_quota = resource_quota;
grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);

grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm,
gpr_now(GPR_CLOCK_MONOTONIC));
grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect);
return;
Expand Down
16 changes: 8 additions & 8 deletions src/core/lib/iomgr/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@

typedef struct grpc_timer grpc_timer;

/* Initialize *timer. When expired or canceled, timer_cb will be called with
*timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or
was canceled (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called
exactly once, and application code should check the error to determine
how it was invoked. The application callback is also responsible for
maintaining information about when to free up any user-level state. */
/* Initialize *timer. When expired or canceled, closure will be called with
error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled
(GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called exactly once, and
application code should check the error to determine how it was invoked. The
application callback is also responsible for maintaining information about
when to free up any user-level state. */
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
void *timer_cb_arg, gpr_timespec now);
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now);

/* Note that there is no timer destroy function. This is because the
timer is a one-time occurrence with a guarantee that the callback will
Expand Down
15 changes: 7 additions & 8 deletions src/core/lib/iomgr/timer_generic.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,28 +178,27 @@ static void note_deadline_change(shard_type *shard) {
}

void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
void *timer_cb_arg, gpr_timespec now) {
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
int is_first_timer = 0;
shard_type *shard = &g_shards[shard_idx(timer)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg,
grpc_schedule_on_exec_ctx);
timer->closure = closure;
timer->deadline = deadline;
timer->triggered = 0;

if (!g_initialized) {
timer->triggered = 1;
grpc_closure_sched(
exec_ctx, &timer->closure,
exec_ctx, timer->closure,
GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
return;
}

if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}

Expand Down Expand Up @@ -251,7 +250,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED);
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
Expand Down Expand Up @@ -317,7 +316,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error));
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/iomgr/timer_generic.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct grpc_timer {
int triggered;
struct grpc_timer *next;
struct grpc_timer *prev;
grpc_closure closure;
grpc_closure *closure;
};

#endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */
13 changes: 6 additions & 7 deletions src/core/lib/iomgr/timer_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,20 @@ void run_expired_timer(uv_timer_t *handle) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(!timer->triggered);
timer->triggered = 1;
grpc_closure_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE);
grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
}

void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
void *timer_cb_arg, gpr_timespec now) {
gpr_timespec deadline, grpc_closure *closure,
gpr_timespec now) {
uint64_t timeout;
uv_timer_t *uv_timer;
grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg,
grpc_schedule_on_exec_ctx);
timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}
timer->triggered = 0;
Expand All @@ -84,7 +83,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (!timer->triggered) {
timer->triggered = 1;
grpc_closure_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED);
grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/iomgr/timer_uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"

struct grpc_timer {
grpc_closure closure;
grpc_closure *closure;
/* This is actually a uv_timer_t*, but we want to keep platform-specific
types out of headers */
void *uv_timer;
Expand Down
5 changes: 4 additions & 1 deletion src/core/lib/surface/alarm.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

struct grpc_alarm {
grpc_timer alarm;
grpc_closure on_alarm;
grpc_cq_completion completion;
/** completion queue where events about this alarm will be posted */
grpc_completion_queue *cq;
Expand All @@ -64,9 +65,11 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->tag = tag;

grpc_cq_begin_op(cq, tag);
grpc_closure_init(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
alarm_cb, alarm, gpr_now(GPR_CLOCK_MONOTONIC));
&alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_exec_ctx_finish(&exec_ctx);
return alarm;
}
Expand Down
18 changes: 10 additions & 8 deletions test/core/end2end/fuzzers/api_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,11 @@ void my_resolve_address(grpc_exec_ctx *exec_ctx, const char *addr,
r->addr = gpr_strdup(addr);
r->on_done = on_done;
r->addrs = addresses;
grpc_timer_init(exec_ctx, &r->timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(1, GPR_TIMESPAN)),
finish_resolve, r, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(
exec_ctx, &r->timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(1, GPR_TIMESPAN)),
grpc_closure_create(finish_resolve, r, grpc_schedule_on_exec_ctx),
gpr_now(GPR_CLOCK_MONOTONIC));
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -430,10 +431,11 @@ static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
fc->closure = closure;
fc->ep = ep;
fc->deadline = deadline;
grpc_timer_init(exec_ctx, &fc->timer,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(1, GPR_TIMESPAN)),
do_connect, fc, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_timer_init(
exec_ctx, &fc->timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(1, GPR_TIMESPAN)),
grpc_closure_create(do_connect, fc, grpc_schedule_on_exec_ctx),
gpr_now(GPR_CLOCK_MONOTONIC));
}

static void my_tcp_client_connect(grpc_exec_ctx *exec_ctx,
Expand Down
Loading

0 comments on commit 071562d

Please sign in to comment.