From 6c82d0b2c5fce2d1bdea61cbc5d713e10945d44f Mon Sep 17 00:00:00 2001 From: Anton Kalashnikov Date: Tue, 9 Nov 2021 15:05:02 +0100 Subject: [PATCH] [FLINK-24690][runtime] Changed the calculation of reaching buffer debloat threshold into more expected way --- .../runtime/throughput/BufferDebloater.java | 29 ++++++--- .../throughput/BufferDebloaterTest.java | 62 +++++++++++++++++-- 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java index 7896794f1b594..e9c32d7693ace 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throughput/BufferDebloater.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.throughput; +import org.apache.flink.annotation.VisibleForTesting; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,7 @@ public class BufferDebloater { private final long targetTotalBufferSize; private final int maxBufferSize; private final int minBufferSize; - private final int bufferDebloatThresholdPercentages; + private final double bufferDebloatThresholdFactor; private final BufferSizeEMA bufferSizeEMA; private Duration lastEstimatedTimeToConsumeBuffers = Duration.ZERO; @@ -53,7 +55,7 @@ public BufferDebloater( this.targetTotalBufferSize = targetTotalBufferSize; this.maxBufferSize = maxBufferSize; this.minBufferSize = minBufferSize; - this.bufferDebloatThresholdPercentages = bufferDebloatThresholdPercentages; + this.bufferDebloatThresholdFactor = bufferDebloatThresholdPercentages / 100.0; this.lastBufferSize = maxBufferSize; bufferSizeEMA = new BufferSizeEMA(maxBufferSize, minBufferSize, numberOfSamples); @@ -84,11 +86,7 @@ public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUs * MILLIS_IN_SECOND / Math.max(1, currentThroughput)); - boolean skipUpdate = - newSize == lastBufferSize - || (newSize > minBufferSize && newSize < maxBufferSize) - && Math.abs(1 - ((double) lastBufferSize) / newSize) * 100 - < bufferDebloatThresholdPercentages; + boolean skipUpdate = skipUpdate(newSize); LOG.debug( "Buffer size recalculation: gateIndex={}, currentSize={}, newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={}, estimatedTimeToConsumeBuffers={}, announceNewSize={}", @@ -110,6 +108,23 @@ public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUs return OptionalInt.of(newSize); } + @VisibleForTesting + boolean skipUpdate(int newSize) { + if (newSize == lastBufferSize) { + return true; + } + + // According to logic of this class newSize can not be less than min or greater than max + // buffer size but if considering this method independently the behaviour for the small or + // big value should be the same as for min and max buffer size correspondingly. + if (newSize <= minBufferSize || newSize >= maxBufferSize) { + return false; + } + + int delta = (int) (lastBufferSize * bufferDebloatThresholdFactor); + return Math.abs(newSize - lastBufferSize) < delta; + } + public int getLastBufferSize() { return lastBufferSize; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java index cb22b649b3337..8ca0c9c9d50a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/throughput/BufferDebloaterTest.java @@ -27,6 +27,7 @@ import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -163,6 +164,54 @@ public void testAnnouncedMinBufferSizeEvenDespiteLastDiffLessThanThreshold() { bufferDebloater.recalculateBufferSize(40, numberOfBuffersInUse); } + @Test + public void testSkipUpdate() { + int maxBufferSize = 32768; + int minBufferSize = 256; + double threshold = 0.3; + BufferDebloater bufferDebloater = + testBufferDebloater() + .withDebloatTarget(1000) + .withBufferSize(minBufferSize, maxBufferSize) + // 30 % Threshold. + .withThresholdPercentages((int) (threshold * 100)) + .getBufferDebloater(); + + int currentBufferSize = maxBufferSize / 2; + + OptionalInt optionalInt = bufferDebloater.recalculateBufferSize(currentBufferSize, 1); + assertTrue(optionalInt.isPresent()); + assertEquals(currentBufferSize, optionalInt.getAsInt()); + + // It is true because less than threshold. + assertTrue(bufferDebloater.skipUpdate(currentBufferSize)); + assertTrue(bufferDebloater.skipUpdate(currentBufferSize - 1)); + assertTrue(bufferDebloater.skipUpdate(currentBufferSize + 1)); + + assertTrue( + bufferDebloater.skipUpdate( + currentBufferSize - (int) (currentBufferSize * threshold) + 1)); + assertTrue( + bufferDebloater.skipUpdate( + currentBufferSize + (int) (currentBufferSize * threshold) - 1)); + + // It is false because it reaches threshold. + assertFalse( + bufferDebloater.skipUpdate( + currentBufferSize - (int) (currentBufferSize * threshold))); + assertFalse( + bufferDebloater.skipUpdate( + currentBufferSize + (int) (currentBufferSize * threshold))); + assertFalse(bufferDebloater.skipUpdate(minBufferSize + 1)); + assertFalse(bufferDebloater.skipUpdate(minBufferSize)); + assertFalse(bufferDebloater.skipUpdate(maxBufferSize - 1)); + assertFalse(bufferDebloater.skipUpdate(maxBufferSize)); + + // Beyond the min and max size is always false. + assertFalse(bufferDebloater.skipUpdate(maxBufferSize + 1)); + assertFalse(bufferDebloater.skipUpdate(minBufferSize - 1)); + } + public static BufferDebloaterTestBuilder testBufferDebloater() { return new BufferDebloaterTestBuilder(); } @@ -173,6 +222,7 @@ private static class BufferDebloaterTestBuilder { private int minBufferSize; private int maxBufferSize; private int debloatTarget; + private int thresholdPercentages = BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES.defaultValue(); public BufferDebloaterTestBuilder withNumberOfBuffersInUse(Integer numberOfBuffersInUse) { this.numberOfBuffersInUse = numberOfBuffersInUse; @@ -195,6 +245,11 @@ public BufferDebloaterTestBuilder withDebloatTarget(int debloatTarget) { return this; } + public BufferDebloaterTestBuilder withThresholdPercentages(int thresholdPercentages) { + this.thresholdPercentages = thresholdPercentages; + return this; + } + public void expectNoChangeInBufferSize() { BufferDebloater bufferDebloater = getBufferDebloater(); @@ -219,12 +274,7 @@ public BufferDebloater expectBufferSize(int expectedBufferSize) { private BufferDebloater getBufferDebloater() { return new BufferDebloater( - 0, - debloatTarget, - maxBufferSize, - minBufferSize, - BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES.defaultValue(), - 1); + 0, debloatTarget, maxBufferSize, minBufferSize, thresholdPercentages, 1); } } }