Skip to content

Commit

Permalink
fix(bigtable): crashes during client shutdown (googleapis#5701)
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan authored Jan 14, 2021
1 parent 94fb2c5 commit f0b71d1
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 46 deletions.
9 changes: 9 additions & 0 deletions google/cloud/completion_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/completion_queue.h"
#include "google/cloud/internal/async_connection_ready.h"
#include "google/cloud/internal/default_completion_queue_impl.h"

namespace google {
Expand All @@ -22,6 +23,14 @@ inline namespace GOOGLE_CLOUD_CPP_NS {
CompletionQueue::CompletionQueue()
: impl_(new internal::DefaultCompletionQueueImpl) {}

future<Status> CompletionQueue::AsyncWaitConnectionReady(
std::shared_ptr<grpc::Channel> channel,
std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<internal::AsyncConnectionReadyFuture>(
impl_, std::move(channel), deadline);
return op->Start();
}

} // namespace GOOGLE_CLOUD_CPP_NS
} // namespace cloud
} // namespace google
8 changes: 1 addition & 7 deletions google/cloud/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H

#include "google/cloud/future.h"
#include "google/cloud/internal/async_connection_ready.h"
#include "google/cloud/internal/async_read_stream_impl.h"
#include "google/cloud/internal/async_rpc_details.h"
#include "google/cloud/internal/completion_queue_impl.h"
Expand Down Expand Up @@ -246,12 +245,7 @@ class CompletionQueue {
*/
future<Status> AsyncWaitConnectionReady(
std::shared_ptr<grpc::Channel> channel,
std::chrono::system_clock::time_point deadline) {
auto op = std::make_shared<internal::AsyncConnectionReadyFuture>(
std::move(channel), deadline);
impl_->StartOperation(op, [&](void* tag) { op->Start(impl_->cq(), tag); });
return op->GetFuture();
}
std::chrono::system_clock::time_point deadline);

private:
friend std::shared_ptr<internal::CompletionQueueImpl>
Expand Down
46 changes: 31 additions & 15 deletions google/cloud/internal/async_connection_ready.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,59 @@ inline namespace GOOGLE_CLOUD_CPP_NS {
namespace internal {

AsyncConnectionReadyFuture::AsyncConnectionReadyFuture(
std::shared_ptr<google::cloud::internal::CompletionQueueImpl> cq,
std::shared_ptr<grpc::Channel> channel,
std::chrono::system_clock::time_point deadline)
: channel_(std::move(channel)), deadline_(deadline) {}
: cq_(std::move(cq)), channel_(std::move(channel)), deadline_(deadline) {}

void AsyncConnectionReadyFuture::Start(grpc::CompletionQueue& cq, void* tag) {
tag_ = tag;
cq_ = &cq;
HandleSingleStateChange();
future<Status> AsyncConnectionReadyFuture::Start() {
RunIteration(channel_->GetState(true));
return promise_.get_future();
}

bool AsyncConnectionReadyFuture::Notify(bool ok) {
void AsyncConnectionReadyFuture::Notify(bool ok) {
if (!ok) {
promise_.set_value(
Status(StatusCode::kDeadlineExceeded,
"Connection couldn't connect before requested deadline"));
return true;
return;
}
return HandleSingleStateChange();
}

bool AsyncConnectionReadyFuture::HandleSingleStateChange() {
auto state = channel_->GetState(true);
if (state == GRPC_CHANNEL_READY) {
promise_.set_value(Status{});
return true;
return;
}
if (state == GRPC_CHANNEL_SHUTDOWN) {
promise_.set_value(
Status(StatusCode::kCancelled,
"Connection will never succeed because it's shut down."));
return true;
return;
}
// If connection was idle, GetState(true) triggered an attempt to connect.
// Otherwise it is either in state CONNECTING or TRANSIENT_FAILURE, so let's
// register for a state change.
channel_->NotifyOnStateChange(state, deadline_, cq_, tag_);
return false;
RunIteration(state);
}

void AsyncConnectionReadyFuture::RunIteration(ChannelStateType state) {
class OnStateChange : public AsyncGrpcOperation {
public:
explicit OnStateChange(std::shared_ptr<AsyncConnectionReadyFuture> s)
: self_(std::move(s)) {}
bool Notify(bool ok) override {
self_->Notify(ok);
return true;
}
void Cancel() override {}

private:
std::shared_ptr<AsyncConnectionReadyFuture> const self_;
};

auto op = std::make_shared<OnStateChange>(shared_from_this());
cq_->StartOperation(op, [this, state](void* tag) {
channel_->NotifyOnStateChange(state, deadline_, &cq_->cq(), tag);
});
}

} // namespace internal
Expand Down
40 changes: 16 additions & 24 deletions google/cloud/internal/async_connection_ready.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_CONNECTION_READY_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_ASYNC_CONNECTION_READY_H

#include "google/cloud/async_operation.h"
#include "google/cloud/future.h"
#include "google/cloud/internal/completion_queue_impl.h"
#include "google/cloud/status.h"
#include "google/cloud/version.h"
#include <grpcpp/channel.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/generic/generic_stub.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <chrono>
#include <memory>

Expand All @@ -40,30 +35,27 @@ namespace internal {
* well be hidden away from the header, but is useful in
* `FakeCompletionQueueImpl`.
*/
class AsyncConnectionReadyFuture : public internal::AsyncGrpcOperation {
class AsyncConnectionReadyFuture
: public std::enable_shared_from_this<AsyncConnectionReadyFuture> {
public:
AsyncConnectionReadyFuture(std::shared_ptr<grpc::Channel> channel,
std::chrono::system_clock::time_point deadline);
AsyncConnectionReadyFuture(
std::shared_ptr<google::cloud::internal::CompletionQueueImpl> cq,
std::shared_ptr<grpc::Channel> channel,
std::chrono::system_clock::time_point deadline);

void Start(grpc::CompletionQueue& cq, void* tag);
// There doesn't seem to be a way to cancel this operation:
// https://github.com/grpc/grpc/issues/3064
void Cancel() override {}
/// The future will be set to whether the state changed (false means timeout).
future<Status> GetFuture() { return promise_.get_future(); }
future<Status> Start();

private:
bool Notify(bool ok) override;
// Returns whether to register for state change.
bool HandleSingleStateChange();
void Notify(bool ok);

std::shared_ptr<grpc::Channel> channel_;
std::chrono::system_clock::time_point deadline_;
// gRPC uses an anonymous type for the gRPC channel state enum :shrug:.
using ChannelStateType = decltype(GRPC_CHANNEL_READY);
void RunIteration(ChannelStateType state);

std::shared_ptr<google::cloud::internal::CompletionQueueImpl> const cq_;
std::shared_ptr<grpc::Channel> const channel_;
std::chrono::system_clock::time_point const deadline_;
promise<Status> promise_;
// CompletionQueue and tag are memorized on start of the operation so that it
// can be re-registered in the completion queue for another state change.
grpc::CompletionQueue* cq_;
void* tag_;
};

} // namespace internal
Expand Down
1 change: 1 addition & 0 deletions google/cloud/internal/async_connection_ready_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "google/cloud/completion_queue.h"
#include "google/cloud/testing_util/assert_ok.h"
#include <gmock/gmock.h>
#include <grpcpp/generic/async_generic_service.h>

namespace google {
namespace cloud {
Expand Down

0 comments on commit f0b71d1

Please sign in to comment.