From 4bf76ae69e3f22e25c2dad3e802be094554b5d43 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 20 Feb 2018 18:04:12 +0100 Subject: [PATCH] [FLINK-8733][network] fix SpillableSubpartition#spillFinishedBufferConsumers() not counting spilled bytes This closes #5549. --- .../partition/SpillableSubpartition.java | 5 ++++- .../partition/SpillableSubpartitionTest.java | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 8758b34ef552c..6ac493e7e2752 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -240,13 +241,15 @@ public int releaseMemory() throws IOException { return 0; } - private long spillFinishedBufferConsumers() throws IOException { + @VisibleForTesting + protected long spillFinishedBufferConsumers() throws IOException { long spilledBytes = 0; while (!buffers.isEmpty()) { BufferConsumer bufferConsumer = buffers.peek(); Buffer buffer = bufferConsumer.build(); updateStatistics(buffer); + spilledBytes += buffer.getSize(); spillWriter.writeBlock(buffer); if (bufferConsumer.isFinished()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index 9dc7bed21ec2f..65d98e69de25f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; @@ -40,12 +41,14 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -700,6 +703,24 @@ private void testCleanupReleasedPartition(boolean spilled, boolean createView) t // assertEquals((createView ? 4 : 0) + 2 * BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); } + /** + * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers()} spilled bytes counting. + */ + @Test + public void testSpillFinishedBufferConsumers() throws Exception { + SpillableSubpartition partition = createSubpartition(); + BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE); + + try (BufferConsumer buffer = bufferBuilder.createBufferConsumer()) { + partition.add(buffer); + assertEquals(0, partition.releaseMemory()); + // finally fill the buffer with some bytes + bufferBuilder.appendAndCommit(ByteBuffer.allocate(BUFFER_DATA_SIZE)); + bufferBuilder.finish(); // so that this buffer can be removed from the queue + assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers()); + } + } + /** * An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its * {@link #createBufferFileWriter(FileIOChannel.ID)} method.