Skip to content

Commit

Permalink
Merge pull request grpc#7319 from yang-g/wait_async
Browse files Browse the repository at this point in the history
Make Server::Wait work for async only server.
  • Loading branch information
kpayson64 authored Jul 15, 2016
2 parents 7f6c779 + 6ec11f2 commit 27a02b5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 2 deletions.
3 changes: 3 additions & 0 deletions include/grpc++/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibraryCodegen {
grpc::mutex mu_;
bool started_;
bool shutdown_;
bool shutdown_notified_;
// The number of threads which are running callbacks.
int num_running_cb_;
grpc::condition_variable callback_cv_;

grpc::condition_variable shutdown_cv_;

std::shared_ptr<GlobalCallbacks> global_callbacks_;

std::list<SyncRequest>* sync_methods_;
Expand Down
8 changes: 6 additions & 2 deletions src/cpp/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
: max_message_size_(max_message_size),
started_(false),
shutdown_(false),
shutdown_notified_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
Expand Down Expand Up @@ -462,13 +463,16 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
while (num_running_cb_ != 0) {
callback_cv_.wait(lock);
}

shutdown_notified_ = true;
shutdown_cv_.notify_all();
}
}

void Server::Wait() {
grpc::unique_lock<grpc::mutex> lock(mu_);
while (num_running_cb_ != 0) {
callback_cv_.wait(lock);
while (started_ && !shutdown_notified_) {
shutdown_cv_.wait(lock);
}
}

Expand Down
25 changes: 25 additions & 0 deletions test/cpp/end2end/async_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,31 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}

// We do not need to protect notify because the use is synchronized.
void ServerWait(Server* server, int* notify) {
server->Wait();
*notify = 1;
}
TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
int notify = 0;
std::thread* wait_thread =
new std::thread(&ServerWait, server_.get(), &notify);
ResetStub();
SendRpc(1);
EXPECT_EQ(0, notify);
server_->Shutdown();
wait_thread->join();
EXPECT_EQ(1, notify);
delete wait_thread;
}

TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
ResetStub();
SendRpc(1);
server_->Shutdown();
server_->Wait();
}

// Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ResetStub();
Expand Down

0 comments on commit 27a02b5

Please sign in to comment.