Skip to content

Commit

Permalink
Merge pull request grpc#2993 from murgatroid99/node_server_graceful_s…
Browse files Browse the repository at this point in the history
…hutdown

Prevent the Node server from locking up when shutting down
  • Loading branch information
tbetbetbe committed Aug 26, 2015
2 parents 495c0d3 + da96957 commit da22c15
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 42 deletions.
51 changes: 27 additions & 24 deletions src/node/ext/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Server::Server(grpc_server *server) : wrapped_server(server) {
Server::~Server() {
this->ShutdownServer();
grpc_completion_queue_shutdown(this->shutdown_queue);
grpc_server_destroy(wrapped_server);
grpc_server_destroy(this->wrapped_server);
grpc_completion_queue_destroy(this->shutdown_queue);
}

Expand All @@ -139,8 +139,11 @@ void Server::Init(Handle<Object> exports) {
NanSetPrototypeTemplate(tpl, "start",
NanNew<FunctionTemplate>(Start)->GetFunction());

NanSetPrototypeTemplate(tpl, "shutdown",
NanNew<FunctionTemplate>(Shutdown)->GetFunction());
NanSetPrototypeTemplate(tpl, "tryShutdown",
NanNew<FunctionTemplate>(TryShutdown)->GetFunction());
NanSetPrototypeTemplate(
tpl, "forceShutdown",
NanNew<FunctionTemplate>(ForceShutdown)->GetFunction());

NanAssignPersistent(fun_tpl, tpl);
Handle<Function> ctr = tpl->GetFunction();
Expand All @@ -153,14 +156,12 @@ bool Server::HasInstance(Handle<Value> val) {
}

void Server::ShutdownServer() {
if (this->wrapped_server != NULL) {
grpc_server_shutdown_and_notify(this->wrapped_server,
this->shutdown_queue,
NULL);
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
this->wrapped_server = NULL;
}
grpc_server_shutdown_and_notify(this->wrapped_server,
this->shutdown_queue,
NULL);
grpc_server_cancel_all_calls(this->wrapped_server);
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}

NAN_METHOD(Server::New) {
Expand Down Expand Up @@ -222,9 +223,6 @@ NAN_METHOD(Server::RequestCall) {
return NanThrowTypeError("requestCall can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
if (server->wrapped_server == NULL) {
return NanThrowError("requestCall cannot be called on a shut down Server");
}
NewCallOp *op = new NewCallOp();
unique_ptr<OpVec> ops(new OpVec());
ops->push_back(unique_ptr<Op>(op));
Expand Down Expand Up @@ -256,10 +254,6 @@ NAN_METHOD(Server::AddHttp2Port) {
"addHttp2Port's second argument must be ServerCredentials");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
if (server->wrapped_server == NULL) {
return NanThrowError(
"addHttp2Port cannot be called on a shut down Server");
}
ServerCredentials *creds_object = ObjectWrap::Unwrap<ServerCredentials>(
args[1]->ToObject());
grpc_server_credentials *creds = creds_object->GetWrappedServerCredentials();
Expand All @@ -281,21 +275,30 @@ NAN_METHOD(Server::Start) {
return NanThrowTypeError("start can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
if (server->wrapped_server == NULL) {
return NanThrowError("start cannot be called on a shut down Server");
}
grpc_server_start(server->wrapped_server);
NanReturnUndefined();
}

NAN_METHOD(ShutdownCallback) {
NAN_METHOD(Server::TryShutdown) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("tryShutdown can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
unique_ptr<OpVec> ops(new OpVec());
grpc_server_shutdown_and_notify(
server->wrapped_server,
CompletionQueueAsyncWorker::GetQueue(),
new struct tag(new NanCallback(args[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
}

NAN_METHOD(Server::Shutdown) {
NAN_METHOD(Server::ForceShutdown) {
NanScope();
if (!HasInstance(args.This())) {
return NanThrowTypeError("shutdown can only be called on a Server");
return NanThrowTypeError("forceShutdown can only be called on a Server");
}
Server *server = ObjectWrap::Unwrap<Server>(args.This());
server->ShutdownServer();
Expand Down
3 changes: 2 additions & 1 deletion src/node/ext/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class Server : public ::node::ObjectWrap {
static NAN_METHOD(RequestCall);
static NAN_METHOD(AddHttp2Port);
static NAN_METHOD(Start);
static NAN_METHOD(Shutdown);
static NAN_METHOD(TryShutdown);
static NAN_METHOD(ForceShutdown);
static NanCallback *constructor;
static v8::Persistent<v8::FunctionTemplate> fun_tpl;

Expand Down
21 changes: 18 additions & 3 deletions src/node/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,26 @@ function Server(options) {
}
server.requestCall(handleNewCall);
};

/**
* Gracefully shuts down the server. The server will stop receiving new calls,
* and any pending calls will complete. The callback will be called when all
* pending calls have completed and the server is fully shut down. This method
* is idempotent with itself and forceShutdown.
* @param {function()} callback The shutdown complete callback
*/
this.tryShutdown = function(callback) {
server.tryShutdown(callback);
};

/**
* Shuts down the server.
* Forcibly shuts down the server. The server will stop receiving new calls
* and cancel all pending calls. When it returns, the server has shut down.
* This method is idempotent with itself and tryShutdown, and it will trigger
* any outstanding tryShutdown callbacks.
*/
this.shutdown = function() {
server.shutdown();
this.forceShutdown = function() {
server.forceShutdown();
};
}

Expand Down
2 changes: 1 addition & 1 deletion src/node/test/call_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ describe('call', function() {
channel = new grpc.Channel('localhost:' + port, insecureCreds);
});
after(function() {
server.shutdown();
server.forceShutdown();
});
describe('constructor', function() {
it('should reject anything less than 3 arguments', function() {
Expand Down
2 changes: 1 addition & 1 deletion src/node/test/end_to_end_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ describe('end-to-end', function() {
channel = new grpc.Channel('localhost:' + port_num, insecureCreds);
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('should start and end a request without error', function(complete) {
var done = multiDone(complete, 2);
Expand Down
2 changes: 1 addition & 1 deletion src/node/test/health_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('Health Checking', function() {
grpc.Credentials.createInsecure());
});
after(function() {
healthServer.shutdown();
healthServer.forceShutdown();
});
it('should say an enabled service is SERVING', function(done) {
healthClient.check({service: ''}, function(err, response) {
Expand Down
2 changes: 1 addition & 1 deletion src/node/test/interop_sanity_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('Interop tests', function() {
done();
});
after(function() {
server.shutdown();
server.forceShutdown();
});
// This depends on not using a binary stream
it('should pass empty_unary', function(done) {
Expand Down
2 changes: 1 addition & 1 deletion src/node/test/math_client_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('Math client', function() {
done();
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('should handle a single request', function(done) {
var arg = {dividend: 7, divisor: 4};
Expand Down
31 changes: 30 additions & 1 deletion src/node/test/server_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,41 @@ describe('server', function() {
server.addHttp2Port('0.0.0.0:0', grpc.ServerCredentials.createInsecure());
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('should start without error', function() {
assert.doesNotThrow(function() {
server.start();
});
});
});
describe('shutdown', function() {
var server;
beforeEach(function() {
server = new grpc.Server();
server.addHttp2Port('0.0.0.0:0', grpc.ServerCredentials.createInsecure());
server.start();
});
afterEach(function() {
server.forceShutdown();
});
it('tryShutdown should shutdown successfully', function(done) {
server.tryShutdown(done);
});
it('forceShutdown should shutdown successfully', function() {
server.forceShutdown();
});
it('tryShutdown should be idempotent', function(done) {
server.tryShutdown(done);
server.tryShutdown(function() {});
});
it('forceShutdown should be idempotent', function() {
server.forceShutdown();
server.forceShutdown();
});
it('forceShutdown should trigger tryShutdown', function(done) {
server.tryShutdown(done);
server.forceShutdown();
});
});
});
16 changes: 8 additions & 8 deletions src/node/test/surface_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ describe('Server.prototype.addProtoService', function() {
server = new grpc.Server();
});
afterEach(function() {
server.shutdown();
server.forceShutdown();
});
it('Should succeed with a single service', function() {
assert.doesNotThrow(function() {
Expand Down Expand Up @@ -148,7 +148,7 @@ describe('Client#$waitForReady', function() {
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('should complete when called alone', function(done) {
client.$waitForReady(Infinity, function(error) {
Expand Down Expand Up @@ -203,7 +203,7 @@ describe('Echo service', function() {
server.start();
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('should echo the recieved message directly', function(done) {
client.echo({value: 'test value', value2: 3}, function(error, response) {
Expand Down Expand Up @@ -248,7 +248,7 @@ describe('Generic client and server', function() {
grpc.Credentials.createInsecure());
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('Should respond with a capitalized string', function(done) {
client.capitalize('abc', function(err, response) {
Expand Down Expand Up @@ -296,7 +296,7 @@ describe('Echo metadata', function() {
server.start();
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('with unary call', function(done) {
var call = client.unary({}, function(err, data) {
Expand Down Expand Up @@ -419,7 +419,7 @@ describe('Other conditions', function() {
server.start();
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('channel.getTarget should be available', function() {
assert.strictEqual(typeof client.channel.getTarget(), 'string');
Expand Down Expand Up @@ -681,7 +681,7 @@ describe('Other conditions', function() {
});
afterEach(function() {
console.log('Shutting down server');
proxy.shutdown();
proxy.forceShutdown();
});
describe('Cancellation', function() {
it('With a unary call', function(done) {
Expand Down Expand Up @@ -847,7 +847,7 @@ describe('Cancelling surface client', function() {
server.start();
});
after(function() {
server.shutdown();
server.forceShutdown();
});
it('Should correctly cancel a unary call', function(done) {
var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
Expand Down

0 comments on commit da22c15

Please sign in to comment.