Skip to content

Commit

Permalink
Removed server-wide metadata handler, individual handlers can now sen…
Browse files Browse the repository at this point in the history
…d metadata
  • Loading branch information
murgatroid99 committed Jul 16, 2015
1 parent 366e64d commit 380b90a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/node/interop/interop_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ function getServer(port, tls) {
key_data,
pem_data);
}
var server = new grpc.Server(null, options);
var server = new grpc.Server(options);
server.addProtoService(testProto.TestService.service, {
emptyCall: handleEmpty,
unaryCall: handleUnary,
Expand Down
72 changes: 55 additions & 17 deletions src/node/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ function handleError(call, error) {
status.metadata = error.metadata;
}
var error_batch = {};
if (!call.metadataSent) {
error_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
}
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
}
Expand Down Expand Up @@ -115,6 +118,10 @@ function sendUnaryResponse(call, value, serialize, metadata) {
if (metadata) {
status.metadata = metadata;
}
if (!call.metadataSent) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
call.metadataSent = true;
}
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
Expand All @@ -136,6 +143,10 @@ function setUpWritable(stream, serialize) {
stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
var batch = {};
if (!stream.call.metadataSent) {
stream.call.metadataSent = true;
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
}
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){});
}
Expand Down Expand Up @@ -239,6 +250,10 @@ function ServerWritableStream(call, serialize) {
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
if (!this.call.metadataSent) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
this.call.metadataSent = true;
}
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, value) {
if (err) {
Expand All @@ -251,6 +266,23 @@ function _write(chunk, encoding, callback) {

ServerWritableStream.prototype._write = _write;

function sendMetadata(responseMetadata) {
/* jshint validthis: true */
if (!this.call.metadataSent) {
this.call.metadataSent = true;
var batch = [];
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
this.call.startBatch(batch, function(err) {
if (err) {
this.emit('error', err);
return;
}
});
}
}

ServerWritableStream.prototype.sendMetadata = sendMetadata;

util.inherits(ServerReadableStream, Readable);

/**
Expand Down Expand Up @@ -339,6 +371,7 @@ function ServerDuplexStream(call, serialize, deserialize) {

ServerDuplexStream.prototype._read = _read;
ServerDuplexStream.prototype._write = _write;
ServerDuplexStream.prototype.sendMetadata = sendMetadata;

/**
* Fully handle a unary call
Expand All @@ -348,12 +381,20 @@ ServerDuplexStream.prototype._write = _write;
*/
function handleUnary(call, handler, metadata) {
var emitter = new EventEmitter();
emitter.sendMetadata = function(responseMetadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
call.startBatch(batch, function() {});
}
};
emitter.on('error', function(error) {
handleError(call, error);
});
emitter.metadata = metadata;
waitForCancel(call, emitter);
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
Expand Down Expand Up @@ -392,8 +433,8 @@ function handleUnary(call, handler, metadata) {
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize);
waitForCancel(call, stream);
stream.metadata = metadata;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
Expand All @@ -419,13 +460,19 @@ function handleServerStreaming(call, handler, metadata) {
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
stream.sendMetadata = function(responseMetadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
call.startBatch(batch, function() {});
}
};
stream.on('error', function(error) {
handleError(call, error);
});
waitForCancel(call, stream);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
stream.metadata = metadata;
handler.func(stream, function(err, value, trailer) {
stream.terminate();
if (err) {
Expand All @@ -449,9 +496,7 @@ function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize,
handler.deserialize);
waitForCancel(call, stream);
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
call.startBatch(metadata_batch, function() {});
stream.metadata = metadata;
handler.func(stream);
}

Expand All @@ -466,13 +511,10 @@ var streamHandlers = {
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
* @constructor
* @param {function(string, Object<string, Array<Buffer>>):
Object<string, Array<Buffer|string>>=} getMetadata Callback that gets
* metatada for a given method
* @param {Object=} options Options that should be passed to the internal server
* implementation
*/
function Server(getMetadata, options) {
function Server(options) {
this.handlers = {};
var handlers = this.handlers;
var server = new grpc.Server(options);
Expand Down Expand Up @@ -525,11 +567,7 @@ function Server(getMetadata, options) {
call.startBatch(batch, function() {});
return;
}
var response_metadata = {};
if (getMetadata) {
response_metadata = getMetadata(method, metadata);
}
streamHandlers[handler.type](call, handler, response_metadata);
streamHandlers[handler.type](call, handler, metadata);
}
server.requestCall(handleNewCall);
};
Expand Down

0 comments on commit 380b90a

Please sign in to comment.