diff --git a/src/node/performance/benchmark_client.js b/src/node/performance/benchmark_client.js index 620aecde97b22..80bec0b73e008 100644 --- a/src/node/performance/benchmark_client.js +++ b/src/node/performance/benchmark_client.js @@ -45,6 +45,9 @@ var EventEmitter = require('events'); var _ = require('lodash'); var PoissonProcess = require('poisson-process'); var Histogram = require('./histogram'); + +var genericService = require('./generic_service'); + var grpc = require('../../../'); var serviceProto = grpc.load({ root: __dirname + '/../../..', @@ -104,10 +107,14 @@ function BenchmarkClient(server_targets, channels, histogram_params, } this.clients = []; + var GenericClient = grpc.makeGenericClientConstructor(genericService); + this.genericClients = []; for (var i = 0; i < channels; i++) { this.clients[i] = new serviceProto.BenchmarkService( server_targets[i % server_targets.length], creds, options); + this.genericClients[i] = new GenericClient( + server_targets[i % server_targets.length], creds, options); } this.histogram = new Histogram(histogram_params.resolution, @@ -130,9 +137,11 @@ util.inherits(BenchmarkClient, EventEmitter); * 'STREAMING' * @param {number} req_size The size of the payload to send with each request * @param {number} resp_size The size of payload to request be sent in responses + * @param {boolean} generic Indicates that the generic (non-proto) clients + * should be used */ BenchmarkClient.prototype.startClosedLoop = function( - outstanding_rpcs_per_channel, rpc_type, req_size, resp_size) { + outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) { var self = this; self.running = true; @@ -141,12 +150,20 @@ BenchmarkClient.prototype.startClosedLoop = function( var makeCall; - var argument = { - response_size: resp_size, - payload: { - body: zeroBuffer(req_size) - } - }; + var argument; + var client_list; + if (generic) { + argument = zeroBuffer(req_size); + client_list = self.genericClients; + } else { + argument = { + response_size: resp_size, + payload: { + body: zeroBuffer(req_size) + } + }; + client_list = self.clients; + } if (rpc_type == 'UNARY') { makeCall = function(client) { @@ -195,7 +212,7 @@ BenchmarkClient.prototype.startClosedLoop = function( }; } - _.each(self.clients, function(client) { + _.each(client_list, function(client) { _.times(outstanding_rpcs_per_channel, function() { makeCall(client); }); @@ -213,9 +230,12 @@ BenchmarkClient.prototype.startClosedLoop = function( * @param {number} req_size The size of the payload to send with each request * @param {number} resp_size The size of payload to request be sent in responses * @param {number} offered_load The load parameter for the Poisson process + * @param {boolean} generic Indicates that the generic (non-proto) clients + * should be used */ BenchmarkClient.prototype.startPoisson = function( - outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load) { + outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load, + generic) { var self = this; self.running = true; @@ -224,12 +244,20 @@ BenchmarkClient.prototype.startPoisson = function( var makeCall; - var argument = { - response_size: resp_size, - payload: { - body: zeroBuffer(req_size) - } - }; + var argument; + var client_list; + if (generic) { + argument = zeroBuffer(req_size); + client_list = self.genericClients; + } else { + argument = { + response_size: resp_size, + payload: { + body: zeroBuffer(req_size) + } + }; + client_list = self.clients; + } if (rpc_type == 'UNARY') { makeCall = function(client, poisson) { @@ -282,7 +310,7 @@ BenchmarkClient.prototype.startPoisson = function( var averageIntervalMs = (1 / offered_load) * 1000; - _.each(self.clients, function(client) { + _.each(client_list, function(client) { _.times(outstanding_rpcs_per_channel, function() { var p = PoissonProcess.create(averageIntervalMs, function() { makeCall(client, p); diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js index e48acd48f5d0f..b1b0bd12abb0b 100644 --- a/src/node/performance/benchmark_server.js +++ b/src/node/performance/benchmark_server.js @@ -41,6 +41,8 @@ var fs = require('fs'); var path = require('path'); +var genericService = require('./generic_service'); + var grpc = require('../../../'); var serviceProto = grpc.load({ root: __dirname + '/../../..', @@ -84,14 +86,28 @@ function streamingCall(call) { }); } +function makeStreamingGenericCall(response_size) { + var response = zeroBuffer(response_size); + return function streamingGenericCall(call) { + call.on('data', function(value) { + call.write(response); + }); + call.on('end', function() { + call.end(); + }); + }; +} + /** * BenchmarkServer class. Constructed based on parameters from the driver and * stores statistics. * @param {string} host The host to serve on * @param {number} port The port to listen to - * @param {tls} Indicates whether TLS should be used + * @param {boolean} tls Indicates whether TLS should be used + * @param {boolean} generic Indicates whether to use the generic service + * @param {number=} response_size The response size for the generic service */ -function BenchmarkServer(host, port, tls) { +function BenchmarkServer(host, port, tls, generic, response_size) { var server_creds; var host_override; if (tls) { @@ -109,10 +125,16 @@ function BenchmarkServer(host, port, tls) { var server = new grpc.Server(); this.port = server.bind(host + ':' + port, server_creds); - server.addProtoService(serviceProto.BenchmarkService.service, { - unaryCall: unaryCall, - streamingCall: streamingCall - }); + if (generic) { + server.addService(genericService, { + streamingCall: makeStreamingGenericCall(response_size) + }); + } else { + server.addProtoService(serviceProto.BenchmarkService.service, { + unaryCall: unaryCall, + streamingCall: streamingCall + }); + } this.server = server; } diff --git a/src/node/performance/generic_service.js b/src/node/performance/generic_service.js new file mode 100644 index 0000000000000..ce09cc4336cd6 --- /dev/null +++ b/src/node/performance/generic_service.js @@ -0,0 +1,46 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +var _ = require('lodash'); + +module.exports = { + 'streamingCall' : { + path: '/grpc.testing/BenchmarkService', + requestStream: true, + responseStream: true, + requestSerialize: _.identity, + requestDeserialize: _.identity, + responseSerialize: _.identity, + responseDeserialize: _.identity + } +}; diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js index 1439249878405..2c4651370f29f 100644 --- a/src/node/performance/worker_service_impl.js +++ b/src/node/performance/worker_service_impl.js @@ -56,18 +56,31 @@ exports.runClient = function runClient(call) { client.on('error', function(error) { call.emit('error', error); }); + var req_size, resp_size, generic; + switch (setup.payload_config.payload) { + case 'bytebuf_params': + req_size = setup.payload_config.bytebuf_params.req_size; + resp_size = setup.payload_config.bytebuf_params.resp_size; + generic = true; + break; + case 'simple_params': + req_size = setup.payload_config.simple_params.req_size; + resp_size = setup.payload_config.simple_params.resp_size; + generic = false; + break; + default: + call.emit('error', new Error('Unsupported PayloadConfig type' + + setup.payload_config.payload)); + } switch (setup.load_params.load) { case 'closed_loop': client.startClosedLoop(setup.outstanding_rpcs_per_channel, - setup.rpc_type, - setup.payload_config.simple_params.req_size, - setup.payload_config.simple_params.resp_size); + setup.rpc_type, req_size, resp_size, generic); break; case 'poisson': client.startPoisson(setup.outstanding_rpcs_per_channel, - setup.rpc_type, setup.payload_config.req_size, - setup.payload_config.resp_size, - setup.load_params.poisson.offered_load); + setup.rpc_type, req_size, resp_size, + setup.load_params.poisson.offered_load, generic); break; default: call.emit('error', new Error('Unsupported LoadParams type' + diff --git a/test/core/end2end/invalid_call_argument_test.c b/test/core/end2end/invalid_call_argument_test.c index ab90c4cf74a41..642b044ed8e2b 100644 --- a/test/core/end2end/invalid_call_argument_test.c +++ b/test/core/end2end/invalid_call_argument_test.c @@ -63,7 +63,7 @@ struct test_state { static struct test_state g_state; static void prepare_test(int is_client) { - int port; + int port = grpc_pick_unused_port_or_die(); char *server_hostport; grpc_op *op; g_state.is_client = is_client; @@ -85,7 +85,6 @@ static void prepare_test(int is_client) { } else { g_state.server = grpc_server_create(NULL, NULL); grpc_server_register_completion_queue(g_state.server, g_state.cq, NULL); - port = grpc_pick_unused_port_or_die(); gpr_join_host_port(&server_hostport, "0.0.0.0", port); grpc_server_add_insecure_http2_port(g_state.server, server_hostport); grpc_server_start(g_state.server);