Skip to content

Commit

Permalink
gRPC CLI batch mode
Browse files Browse the repository at this point in the history
  • Loading branch information
y-zeng committed Sep 29, 2017
1 parent 9c15cb9 commit b5eaf77
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 5 deletions.
125 changes: 120 additions & 5 deletions test/cpp/util/grpc_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ DEFINE_string(protofiles, "", "Name of the proto file.");
DEFINE_bool(binary_input, false, "Input in binary format");
DEFINE_bool(binary_output, false, "Output in binary format");
DEFINE_string(infile, "", "Input file (default is stdin)");
DEFINE_bool(batch, false,
"Input contains multiple requests. Please do not use this to send "
"more than a few RPCs. gRPC CLI has very different performance "
"characteristics compared with normal RPC calls which make it "
"unsuitable for loadtesting or significant production traffic.");

namespace {

Expand Down Expand Up @@ -460,12 +465,17 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
return false;
}

if (argc == 3) {
request_text = argv[2];
}

if (parser->IsStreaming(method_name, true /* is_request */)) {
std::istream* input_stream;
std::ifstream input_file;

if (argc == 3) {
request_text = argv[2];
if (FLAGS_batch) {
fprintf(stderr, "Batch mode for streaming RPC is not supported.\n");
return false;
}

std::multimap<grpc::string, grpc::string> client_metadata;
Expand Down Expand Up @@ -549,8 +559,115 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
}

} else { // parser->IsStreaming(method_name, true /* is_request */)
if (FLAGS_batch) {
if (parser->IsStreaming(method_name, false /* is_request */)) {
fprintf(stderr, "Batch mode for streaming RPC is not supported.\n");
return false;
}

std::istream* input_stream;
std::ifstream input_file;

if (FLAGS_infile.empty()) {
if (isatty(STDIN_FILENO)) {
print_mode = true;
fprintf(stderr, "reading request messages from stdin...\n");
}
input_stream = &std::cin;
} else {
input_file.open(FLAGS_infile, std::ios::in | std::ios::binary);
input_stream = &input_file;
}

std::multimap<grpc::string, grpc::string> client_metadata;
ParseMetadataFlag(&client_metadata);
if (print_mode) {
PrintMetadata(client_metadata, "Sending client initial metadata:");
}

std::stringstream request_ss;
grpc::string line;
while (!request_text.empty() ||
(!input_stream->eof() && getline(*input_stream, line))) {
if (!request_text.empty()) {
if (FLAGS_binary_input) {
serialized_request_proto = request_text;
request_text.clear();
} else {
serialized_request_proto = parser->GetSerializedProtoFromMethod(
method_name, request_text, true /* is_request */);
request_text.clear();
if (parser->HasError()) {
if (print_mode) {
fprintf(stderr, "Failed to parse request.\n");
}
continue;
}
}

grpc::string serialized_response_proto;
std::multimap<grpc::string_ref, grpc::string_ref>
server_initial_metadata, server_trailing_metadata;
CliCall call(channel, formatted_method_name, client_metadata);
call.Write(serialized_request_proto);
call.WritesDone();
if (!call.Read(&serialized_response_proto,
&server_initial_metadata)) {
fprintf(stderr, "Failed to read response.\n");
}
Status status = call.Finish(&server_trailing_metadata);

if (status.ok()) {
if (print_mode) {
fprintf(stderr, "Rpc succeeded with OK status.\n");
PrintMetadata(server_initial_metadata,
"Received initial metadata from server:");
PrintMetadata(server_trailing_metadata,
"Received trailing metadata from server:");
}

if (FLAGS_binary_output) {
if (!callback(serialized_response_proto)) {
break;
}
} else {
grpc::string response_text = parser->GetTextFormatFromMethod(
method_name, serialized_response_proto,
false /* is_request */);
if (parser->HasError() && print_mode) {
fprintf(stderr, "Failed to parse response.\n");
} else {
if (!callback(response_text)) {
break;
}
}
}
} else {
if (print_mode) {
fprintf(stderr,
"Rpc failed with status code %d, error message: %s\n",
status.error_code(), status.error_message().c_str());
}
}
} else {
if (line.length() == 0) {
request_text = request_ss.str();
request_ss.str(grpc::string());
request_ss.clear();
} else {
request_ss << line << ' ';
}
}
}

if (input_file.is_open()) {
input_file.close();
}

return true;
}

if (argc == 3) {
request_text = argv[2];
if (!FLAGS_infile.empty()) {
fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
}
Expand All @@ -571,9 +688,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv,

if (FLAGS_binary_input) {
serialized_request_proto = request_text;
// formatted_method_name = method_name;
} else {
// formatted_method_name = parser->GetFormattedMethodName(method_name);
serialized_request_proto = parser->GetSerializedProtoFromMethod(
method_name, request_text, true /* is_request */);
if (parser->HasError()) {
Expand Down
55 changes: 55 additions & 0 deletions test/cpp/util/grpc_tool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ namespace testing {
DECLARE_bool(binary_input);
DECLARE_bool(binary_output);
DECLARE_bool(l);
DECLARE_bool(batch);

namespace {

Expand Down Expand Up @@ -399,6 +400,60 @@ TEST_F(GrpcToolTest, CallCommand) {
ShutdownServer();
}

TEST_F(GrpcToolTest, CallCommandBatch) {
// Test input "grpc_cli call Echo"
std::stringstream output_stream;

const grpc::string server_address = SetUpServer();
const char* argv[] = {"grpc_cli", "call", server_address.c_str(), "Echo",
"message: 'Hello0'"};

// Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
std::streambuf* orig = std::cin.rdbuf();
std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
std::cin.rdbuf(ss.rdbuf());

FLAGS_batch = true;
EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)));
FLAGS_batch = false;

// Expected output: "message: "Hello0"\nmessage: "Hello1"\nmessage:
// "Hello2"\n"
EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
"message: \"Hello0\"\nmessage: "
"\"Hello1\"\nmessage: \"Hello2\"\n"));
std::cin.rdbuf(orig);
ShutdownServer();
}

TEST_F(GrpcToolTest, CallCommandBatchWithBadRequest) {
// Test input "grpc_cli call Echo"
std::stringstream output_stream;

const grpc::string server_address = SetUpServer();
const char* argv[] = {"grpc_cli", "call", server_address.c_str(), "Echo",
"message: 'Hello0'"};

// Mock std::cin input "message: 1\n\n message: 'Hello2'\n\n"
std::streambuf* orig = std::cin.rdbuf();
std::istringstream ss("message: 1\n\n message: 'Hello2'\n\n");
std::cin.rdbuf(ss.rdbuf());

FLAGS_batch = true;
EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)));
FLAGS_batch = false;

// Expected output: "message: "Hello0"\nmessage: "Hello2"\n"
EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
"message: \"Hello0\"\nmessage: \"Hello2\"\n"));
std::cin.rdbuf(orig);
ShutdownServer();
}

TEST_F(GrpcToolTest, CallCommandRequestStream) {
// Test input: grpc_cli call localhost:<port> RequestStream "message:
// 'Hello0'"
Expand Down

0 comments on commit b5eaf77

Please sign in to comment.