Skip to content

Commit

Permalink
[subprocess] KUDU-3489: Support reading large messages through pipes
Browse files Browse the repository at this point in the history
This patch enables the subprocess server to be able to read messages
larger than 1MB which was otherwise flaky by reading the input stream
messages until we encounter EOF. This issue is noticed when large sized
requests are made to the subprocess server and it fails in receiving
the complete messages.

In addition made a small log change to MessageIO.java to display the
exception message correctly.

Change-Id: I6523fdaaca19ee089dbac52a7dedec8847926a6c
Reviewed-on: http://gerrit.cloudera.org:8080/20180
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Abhishek Chennaka <[email protected]>
  • Loading branch information
achennaka committed Jul 13, 2023
1 parent 35aa458 commit 2e953e4
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ public MessageIO(int maxMessageBytes,
* from multiple threads concurrently.
*
* @return the message in a byte array.
* @throws EOFException if the end of the stream has been reached
* @throws IOException if this input stream has been closed, an I/O
* error occurs, or fail to read the message
* properly
* @throws KuduSubprocessException if there was an oversized message
* in the stream
*/
@VisibleForTesting
byte[] readBytes() throws EOFException, IOException {
byte[] readBytes() throws IOException {
Preconditions.checkNotNull(in);
// Read four bytes of the message to get the size of the body.
byte[] sizeBytes = new byte[Integer.BYTES];
Expand All @@ -85,48 +84,58 @@ byte[] readBytes() throws EOFException, IOException {
/**
* Reads <code>size</code> bytes of data from the underlying buffered input
* stream into the specified byte array, starting at the offset <code>0</code>.
* The reads are performed until we reach EOF of the stream (when the return
* value of the underlying read method is -1) or when we read more than or
* equal to the <code>size</code> bytes.
* If it fails to read the specified size, <code>IOException</code> is thrown.
*
* @throws EOFException if the end of the stream has been reached
* @throws IOException if this input stream has been closed, an I/O
* error occurs, or fail to read the specified size
*/
private void doRead(byte[] bytes, int size) throws EOFException, IOException {
private void doRead(byte[] bytes, int size) throws IOException {
Preconditions.checkNotNull(bytes);
int read = in.read(bytes, 0, size);
if (read == -1) {
throw new EOFException("the end of the stream has been reached");
} else if (read != size) {
int totalRead = in.read(bytes, 0, size);
do {
int read = in.read(bytes, totalRead, size - totalRead);
if (read == -1) {
break;
}
totalRead += read;
} while (totalRead < size);
if (totalRead != size) {
throw new IOException(
String.format("unable to receive message, expected (%d) bytes " +
"but read (%d) bytes", size, read));
String.format("unable to receive message, expected (%d) bytes " +
"but read (%d) bytes.", size, totalRead));
}
}

/**
* Reads <code>size</code> bytes of data from the underlying buffered input
* stream and discards all the bytes read.
* stream and discards all the bytes read. The reads are performed until we
* reach EOF of the stream (when the return value of the underlying read
* method is -1) or when we read more than or equal to the
* <code>size</code> bytes.
* If it fails to read the specified size, <code>IOException</code> is thrown.
*
* @throws EOFException if the end of the stream has been reached
* @throws IOException if this input stream has been closed, an I/O
* error occurs, or fail to read the specified size
*/
private void doReadAndDiscard(int size) throws EOFException, IOException {
private void doReadAndDiscard(int size) throws IOException {
byte[] buf = new byte[4096];
int rem = size;
while (rem > 0) {
int toRead = Math.min(4096, rem);
int toRead = Math.min(4096, rem);
do {
int read = in.read(buf, 0, toRead);
if (read == -1) {
throw new EOFException(String.format("the end of the stream " +
"has been reached while reading out oversized message (%d bytes)", size));
} else if (read != toRead) {
throw new IOException(
String.format("unable to read next chunk of oversized message (%d bytes), " +
"expected %d bytes but read %d bytes", size, toRead, read));
break;
}
rem -= read;
toRead = Math.min(4096, rem);
} while (rem > 0);
if (rem > 0) {
throw new IOException(
String.format("unable to read next chunk of oversized message (%d bytes), " +
"expected %d bytes but read %d bytes", size, size, size - rem));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ public void run() {
try {
data = messageIO.readBytes();
} catch (KuduSubprocessException e) {
LOG.error("%s: continuing", e.getMessage());
LOG.error("{}: continuing", e.getMessage());
continue;
} catch (EOFException e) {
LOG.info("Reaching the end of the input stream, exiting.");
// Break the loop if the end of the stream has been reached.
break;
} catch (IOException e) {
throw new KuduSubprocessException("Unable to read the protobuf message", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ public void testMsgWithEmptyMessage() throws Exception {
public void testMalformedPB() throws Exception {
SubprocessExecutor executor = setUpExecutorIO(NO_ERR, /*injectIOError*/false);
requestSenderPipe.write("malformed".getBytes(StandardCharsets.UTF_8));
// We need to close the pipe for the read() in InputStream.java to not block
requestSenderPipe.close();
Throwable thrown = Assert.assertThrows(ExecutionException.class,
() -> executor.run(new SubprocessConfiguration(NO_ARGS),
new EchoProtocolHandler(), TIMEOUT_MS));
Expand Down
51 changes: 48 additions & 3 deletions src/kudu/subprocess/subprocess_server-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class SubprocessServerTest : public KuduTest {

Status InitSubprocessServer(int java_queue_size,
int java_parser_threads,
int java_max_msg_bytes,
shared_ptr<SubprocessServer>* out) {
// Set up a subprocess server pointing at the kudu-subprocess.jar that
// contains an echo handler and call EchoSubprocessMain.
Expand All @@ -130,15 +131,20 @@ class SubprocessServerTest : public KuduTest {
argv.emplace_back("-p");
argv.emplace_back(std::to_string(java_parser_threads));
}
if (java_max_msg_bytes > 0) {
argv.emplace_back("-m");
argv.emplace_back(std::to_string(java_max_msg_bytes));
}
*out = make_shared<SubprocessServer>(env_, pipe_path, std::move(argv),
EchoSubprocessMetrics(metric_entity_));
return (*out)->Init();
}

// Resets the subprocess server to account for any new configuration.
Status ResetSubprocessServer(int java_queue_size = 0,
int java_parser_threads = 0) {
return InitSubprocessServer(java_queue_size, java_parser_threads, &server_);
int java_parser_threads = 0,
int java_max_msg_bytes = 0) {
return InitSubprocessServer(java_queue_size, java_parser_threads, java_max_msg_bytes, &server_);
}

protected:
Expand Down Expand Up @@ -318,10 +324,11 @@ TEST_F(SubprocessServerTest, TestRunFromMultipleThreads) {
} \
} while (0);

threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
threads.emplace_back([&, i] {
shared_ptr<SubprocessServer> server;
EXIT_NOT_OK(InitSubprocessServer(0, 0, &server), i);
EXIT_NOT_OK(InitSubprocessServer(0, 0, 0, &server), i);
const string msg = Substitute("$0 bottles of tea on the wall", i);
SubprocessRequestPB req = CreateEchoSubprocessRequestPB(msg);
SubprocessResponsePB resp;
Expand Down Expand Up @@ -421,6 +428,44 @@ TEST_F(SubprocessServerTest, UnlimitedPayloadSize) {
}
}

// Check cases where the message is large enough to not fit in the pipe to be
// transferred in one single pipe message transmission.
TEST_F(SubprocessServerTest, LargePayloadSize) {
// Set a short timeout to speed up testing.
FLAGS_subprocess_timeout_secs = 5;
// Set the max message to 24MB (3x the default size)
FLAGS_subprocess_max_message_size_bytes = 24 * 1024 * 1024;
ASSERT_OK(ResetSubprocessServer(0, 0, 24 * 1024 * 1024));

// Send in a large message that isn't oversized as per the current limit.
{
auto req = CreateEchoSubprocessRequestPB(string(23 * 1024 * 1024, 'x'));
SubprocessResponsePB res;
ASSERT_OK(server_->Execute(&req, &res));
}

// Send a large oversized message.
{
auto req = CreateEchoSubprocessRequestPB(string(24 * 1024 * 1024, 'x'));
SubprocessResponsePB res;
const auto s = server_->Execute(&req, &res);

// The request will timeout because the oversized response is read and
// discarded, and there isn't any application-level data to be sent back.
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "timed out while in flight");
}

// Non-oversized follow-up messages should be received without any issues:
// the communication channel should be cleared of any oversized requests
// sent earlier.
{
auto req = CreateEchoSubprocessRequestPB(string(23 * 1024 * 1024, 'x'));
SubprocessResponsePB res;
ASSERT_OK(server_->Execute(&req, &res));
}
}

} // namespace subprocess
} // namespace kudu

0 comments on commit 2e953e4

Please sign in to comment.