Skip to content

Commit

Permalink
[FLINK-17440][network] Resolve potential buffer leak in output unspil…
Browse files Browse the repository at this point in the history
…ling for unaligned checkpoints

If there are any exceptions thrown while interacting with ChannelStateReader#readOutputData, we should close the
respective BufferConsumer to avoid leak.
  • Loading branch information
zhijiangW committed Apr 29, 2020
1 parent 59ae237 commit 72f528d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,23 @@ public class PipelinedSubpartition extends ResultSubpartition {

@Override
public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
boolean recycleBuffer = true;
for (ReadResult readResult = ReadResult.HAS_MORE_DATA; readResult == ReadResult.HAS_MORE_DATA;) {
BufferBuilder bufferBuilder = parent.getBufferPool().requestBufferBuilderBlocking(subpartitionInfo.getSubPartitionIdx());
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
readResult = stateReader.readOutputData(subpartitionInfo, bufferBuilder);

// check whether there are some states data filled in this time
if (bufferConsumer.isDataAvailable()) {
add(bufferConsumer, false, false);
bufferBuilder.finish();
} else {
bufferConsumer.close();
try {
readResult = stateReader.readOutputData(subpartitionInfo, bufferBuilder);

// check whether there are some states data filled in this time
if (bufferConsumer.isDataAvailable()) {
add(bufferConsumer, false, false);
recycleBuffer = false;
bufferBuilder.finish();
}
} finally {
if (recycleBuffer) {
bufferConsumer.close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,32 @@ public void testInitializeMoreStateThanBuffer() throws Exception {
}
}

/**
* Tests that the buffer is recycled correctly if exception is thrown during
* {@link ChannelStateReader#readOutputData(ResultSubpartitionInfo, BufferBuilder)}.
*/
@Test
public void testReadRecoveredStateWithException() throws Exception {
final int totalBuffers = 2;
final NetworkBufferPool globalPool = new NetworkBufferPool(totalBuffers, 1, 1);
final ResultPartition partition = new ResultPartitionBuilder()
.setNetworkBufferPool(globalPool)
.build();
final ChannelStateReader stateReader = new ChannelStateReaderWithException();

try {
partition.setup();
partition.readRecoveredState(stateReader);
} catch (IOException e) {
assertThat("should throw custom exception message", e.getMessage().contains("test"));
} finally {
globalPool.destroyAllBufferPools();
// verify whether there are any buffers leak
assertEquals(totalBuffers, globalPool.getNumberOfAvailableMemorySegments());
globalPool.destroy();
}
}

/**
* The {@link ChannelStateReader} instance for restoring the specific number of states.
*/
Expand Down Expand Up @@ -562,4 +588,25 @@ public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder buff
public void close() {
}
}

/**
* The {@link ChannelStateReader} instance for throwing exception when
* {@link #readOutputData(ResultSubpartitionInfo, BufferBuilder)}.
*/
private static final class ChannelStateReaderWithException implements ChannelStateReader {

@Override
public ReadResult readInputData(InputChannelInfo info, Buffer buffer) {
return ReadResult.NO_MORE_DATA;
}

@Override
public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder bufferBuilder) throws IOException {
throw new IOException("test");
}

@Override
public void close() {
}
}
}

0 comments on commit 72f528d

Please sign in to comment.