Skip to content

Commit

Permalink
Convert client channel connectivity watch code to C++. (grpc#25987)
Browse files Browse the repository at this point in the history
* Convert client channel connectivity watch code to C++.

* clang-format
  • Loading branch information
markdroth authored Apr 21, 2021
1 parent 61b2398 commit efe3c99
Showing 1 changed file with 169 additions and 178 deletions.
347 changes: 169 additions & 178 deletions src/core/ext/filters/client_channel/channel_connectivity.cc
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include <grpc/support/port_platform.h>

#include "src/core/lib/surface/channel.h"

#include <inttypes.h>

#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

#include "src/core/ext/filters/client_channel/client_channel.h"
Expand All @@ -49,157 +44,182 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
return client_channel->CheckConnectivityState(try_to_connect);
}

typedef enum {
WAITING,
READY_TO_CALL_BACK,
CALLING_BACK_AND_FINISHED,
} callback_phase;

namespace {
struct state_watcher {
gpr_mu mu;
callback_phase phase;
grpc_closure on_complete;
grpc_closure on_timeout;
grpc_closure watcher_timer_init;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue* cq;
grpc_cq_completion completion_storage;
grpc_channel* channel;
grpc_error* error;
void* tag;
};
} // namespace
int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) {
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
if (client_channel == nullptr) {
gpr_log(GPR_ERROR,
"grpc_channel_num_external_connectivity_watchers called on "
"something that is not a client channel");
return 0;
}
return client_channel->NumExternalConnectivityWatchers();
}

static void delete_state_watcher(state_watcher* w) {
GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_channel_connectivity");
gpr_mu_destroy(&w->mu);
gpr_free(w);
int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
return grpc_core::ClientChannel::GetFromChannel(channel) != nullptr;
}

static void finished_completion(void* pw, grpc_cq_completion* /*ignored*/) {
bool should_delete = false;
state_watcher* w = static_cast<state_watcher*>(pw);
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
case CALLING_BACK_AND_FINISHED:
should_delete = true;
break;
}
gpr_mu_unlock(&w->mu);
namespace grpc_core {
namespace {

if (should_delete) {
delete_state_watcher(w);
class StateWatcher {
public:
StateWatcher(grpc_channel* channel, grpc_completion_queue* cq, void* tag,
grpc_connectivity_state last_observed_state,
gpr_timespec deadline)
: channel_(channel), cq_(cq), tag_(tag), state_(last_observed_state) {
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
GRPC_CLOSURE_INIT(&on_timeout_, TimeoutComplete, this, nullptr);
auto* watcher_timer_init_state = new WatcherTimerInitState(
this, grpc_timespec_to_millis_round_up(deadline));
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel);
GPR_ASSERT(client_channel != nullptr);
client_channel->AddExternalConnectivityWatcher(
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &state_,
&on_complete_, watcher_timer_init_state->closure());
}
}

static void partly_done(state_watcher* w, bool due_to_completion,
grpc_error* error) {
bool end_op = false;
void* end_op_tag = nullptr;
grpc_error* end_op_error = nullptr;
grpc_completion_queue* end_op_cq = nullptr;
grpc_cq_completion* end_op_completion_storage = nullptr;

if (due_to_completion) {
grpc_timer_cancel(&w->alarm);
} else {
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(w->channel);
GPR_ASSERT(client_channel != nullptr);
client_channel->CancelExternalConnectivityWatcher(&w->on_complete);
~StateWatcher() {
GRPC_CHANNEL_INTERNAL_UNREF(channel_, "watch_channel_connectivity");
}

gpr_mu_lock(&w->mu);
private:
// A fire-and-forget object used to delay starting the timer until the
// ClientChannel actually starts the watch.
class WatcherTimerInitState {
public:
WatcherTimerInitState(StateWatcher* state_watcher, grpc_millis deadline)
: state_watcher_(state_watcher), deadline_(deadline) {
GRPC_CLOSURE_INIT(&closure_, WatcherTimerInit, this, nullptr);
}

grpc_closure* closure() { return &closure_; }

if (due_to_completion) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
private:
static void WatcherTimerInit(void* arg, grpc_error* /*error*/) {
auto* self = static_cast<WatcherTimerInitState*>(arg);
grpc_timer_init(&self->state_watcher_->timer_, self->deadline_,
&self->state_watcher_->on_timeout_);
delete self;
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Timed out waiting for connection state change");
} else if (error == GRPC_ERROR_CANCELLED) {
error = GRPC_ERROR_NONE;

StateWatcher* state_watcher_;
grpc_millis deadline_;
grpc_closure closure_;
};

enum CallbackPhase { kWaiting, kReadyToCallBack, kCallingBackAndFinished };

// Called when the completion is returned to the CQ.
static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
auto* self = static_cast<StateWatcher*>(arg);
bool should_delete = false;
{
MutexLock lock(&self->mu_);
switch (self->phase_) {
case kWaiting:
case kReadyToCallBack:
GPR_UNREACHABLE_CODE(return );
case kCallingBackAndFinished:
should_delete = true;
}
}
if (should_delete) delete self;
}
switch (w->phase) {
case WAITING:
GRPC_ERROR_REF(error);
w->error = error;
w->phase = READY_TO_CALL_BACK;
break;
case READY_TO_CALL_BACK:
if (error != GRPC_ERROR_NONE) {
GPR_ASSERT(!due_to_completion);
GRPC_ERROR_UNREF(w->error);
GRPC_ERROR_REF(error);
w->error = error;

void PartlyDone(bool due_to_completion, grpc_error* error) {
bool end_op = false;
void* end_op_tag = nullptr;
grpc_error* end_op_error = nullptr;
grpc_completion_queue* end_op_cq = nullptr;
grpc_cq_completion* end_op_completion_storage = nullptr;
if (due_to_completion) {
grpc_timer_cancel(&timer_);
} else {
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel_);
GPR_ASSERT(client_channel != nullptr);
client_channel->CancelExternalConnectivityWatcher(&on_complete_);
}
{
MutexLock lock(&mu_);
if (due_to_completion) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Timed out waiting for connection state change");
} else if (error == GRPC_ERROR_CANCELLED) {
error = GRPC_ERROR_NONE;
}
}
w->phase = CALLING_BACK_AND_FINISHED;
end_op = true;
end_op_cq = w->cq;
end_op_tag = w->tag;
end_op_error = w->error;
end_op_completion_storage = &w->completion_storage;
break;
case CALLING_BACK_AND_FINISHED:
GPR_UNREACHABLE_CODE(return );
break;
switch (phase_) {
case kWaiting:
GRPC_ERROR_REF(error);
error_ = error;
phase_ = kReadyToCallBack;
break;
case kReadyToCallBack:
if (error != GRPC_ERROR_NONE) {
GPR_ASSERT(!due_to_completion);
GRPC_ERROR_UNREF(error_);
GRPC_ERROR_REF(error);
error_ = error;
}
phase_ = kCallingBackAndFinished;
end_op = true;
end_op_cq = cq_;
end_op_tag = tag_;
end_op_error = error_;
end_op_completion_storage = &completion_storage_;
break;
case kCallingBackAndFinished:
GPR_UNREACHABLE_CODE(return );
}
}
if (end_op) {
grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, FinishedCompletion,
this, end_op_completion_storage);
}
GRPC_ERROR_UNREF(error);
}
gpr_mu_unlock(&w->mu);

if (end_op) {
grpc_cq_end_op(end_op_cq, end_op_tag, end_op_error, finished_completion, w,
end_op_completion_storage);
static void WatchComplete(void* arg, grpc_error* error) {
auto* self = static_cast<StateWatcher*>(arg);
self->PartlyDone(/*due_to_completion=*/true, GRPC_ERROR_REF(error));
}

GRPC_ERROR_UNREF(error);
}
static void TimeoutComplete(void* arg, grpc_error* error) {
auto* self = static_cast<StateWatcher*>(arg);
self->PartlyDone(/*due_to_completion=*/false, GRPC_ERROR_REF(error));
}

static void watch_complete(void* pw, grpc_error* error) {
partly_done(static_cast<state_watcher*>(pw), true, GRPC_ERROR_REF(error));
}
grpc_channel* channel_;
grpc_completion_queue* cq_;
void* tag_;

static void timeout_complete(void* pw, grpc_error* error) {
partly_done(static_cast<state_watcher*>(pw), false, GRPC_ERROR_REF(error));
}
grpc_connectivity_state state_;

int grpc_channel_num_external_connectivity_watchers(grpc_channel* channel) {
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
if (client_channel == nullptr) {
gpr_log(GPR_ERROR,
"grpc_channel_num_external_connectivity_watchers called on "
"something that is not a client channel");
return 0;
}
return client_channel->NumExternalConnectivityWatchers();
}
grpc_cq_completion completion_storage_;

typedef struct watcher_timer_init_arg {
state_watcher* w;
gpr_timespec deadline;
} watcher_timer_init_arg;
grpc_closure on_complete_;
grpc_timer timer_;
grpc_closure on_timeout_;

static void watcher_timer_init(void* arg, grpc_error* /*error_ignored*/) {
watcher_timer_init_arg* wa = static_cast<watcher_timer_init_arg*>(arg);

grpc_timer_init(&wa->w->alarm, grpc_timespec_to_millis_round_up(wa->deadline),
&wa->w->on_timeout);
gpr_free(wa);
}
Mutex mu_;
CallbackPhase phase_ ABSL_GUARDED_BY(mu_) = kWaiting;
grpc_error* error_ ABSL_GUARDED_BY(mu_) = GRPC_ERROR_NONE;
};

int grpc_channel_support_connectivity_watcher(grpc_channel* channel) {
return grpc_core::ClientChannel::GetFromChannel(channel) != nullptr;
}
} // namespace
} // namespace grpc_core

void grpc_channel_watch_connectivity_state(
grpc_channel* channel, grpc_connectivity_state last_observed_state,
Expand All @@ -215,34 +235,5 @@ void grpc_channel_watch_connectivity_state(
7,
(channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));

GPR_ASSERT(grpc_cq_begin_op(cq, tag));

state_watcher* w = static_cast<state_watcher*>(gpr_malloc(sizeof(*w)));
gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w,
grpc_schedule_on_exec_ctx);
w->phase = WAITING;
w->state = last_observed_state;
w->cq = cq;
w->tag = tag;
w->channel = channel;
w->error = nullptr;

watcher_timer_init_arg* wa = static_cast<watcher_timer_init_arg*>(
gpr_malloc(sizeof(watcher_timer_init_arg)));
wa->w = w;
wa->deadline = deadline;
GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa,
grpc_schedule_on_exec_ctx);

GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
grpc_core::ClientChannel* client_channel =
grpc_core::ClientChannel::GetFromChannel(channel);
GPR_ASSERT(client_channel != nullptr);
client_channel->AddExternalConnectivityWatcher(
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state,
&w->on_complete, &w->watcher_timer_init);
new grpc_core::StateWatcher(channel, cq, tag, last_observed_state, deadline);
}

0 comments on commit efe3c99

Please sign in to comment.