Skip to content

Commit

Permalink
[FLINK-24690][runtime] Changed the calculation of reaching buffer deb…
Browse files Browse the repository at this point in the history
…loat threshold into more expected way
  • Loading branch information
akalash authored and dawidwys committed Nov 15, 2021
1 parent 07d2585 commit 6c82d0b
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.throughput;

import org.apache.flink.annotation.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,7 +38,7 @@ public class BufferDebloater {
private final long targetTotalBufferSize;
private final int maxBufferSize;
private final int minBufferSize;
private final int bufferDebloatThresholdPercentages;
private final double bufferDebloatThresholdFactor;
private final BufferSizeEMA bufferSizeEMA;

private Duration lastEstimatedTimeToConsumeBuffers = Duration.ZERO;
Expand All @@ -53,7 +55,7 @@ public BufferDebloater(
this.targetTotalBufferSize = targetTotalBufferSize;
this.maxBufferSize = maxBufferSize;
this.minBufferSize = minBufferSize;
this.bufferDebloatThresholdPercentages = bufferDebloatThresholdPercentages;
this.bufferDebloatThresholdFactor = bufferDebloatThresholdPercentages / 100.0;

this.lastBufferSize = maxBufferSize;
bufferSizeEMA = new BufferSizeEMA(maxBufferSize, minBufferSize, numberOfSamples);
Expand Down Expand Up @@ -84,11 +86,7 @@ public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUs
* MILLIS_IN_SECOND
/ Math.max(1, currentThroughput));

boolean skipUpdate =
newSize == lastBufferSize
|| (newSize > minBufferSize && newSize < maxBufferSize)
&& Math.abs(1 - ((double) lastBufferSize) / newSize) * 100
< bufferDebloatThresholdPercentages;
boolean skipUpdate = skipUpdate(newSize);

LOG.debug(
"Buffer size recalculation: gateIndex={}, currentSize={}, newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={}, estimatedTimeToConsumeBuffers={}, announceNewSize={}",
Expand All @@ -110,6 +108,23 @@ public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUs
return OptionalInt.of(newSize);
}

@VisibleForTesting
boolean skipUpdate(int newSize) {
if (newSize == lastBufferSize) {
return true;
}

// According to logic of this class newSize can not be less than min or greater than max
// buffer size but if considering this method independently the behaviour for the small or
// big value should be the same as for min and max buffer size correspondingly.
if (newSize <= minBufferSize || newSize >= maxBufferSize) {
return false;
}

int delta = (int) (lastBufferSize * bufferDebloatThresholdFactor);
return Math.abs(newSize - lastBufferSize) < delta;
}

public int getLastBufferSize() {
return lastBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -163,6 +164,54 @@ public void testAnnouncedMinBufferSizeEvenDespiteLastDiffLessThanThreshold() {
bufferDebloater.recalculateBufferSize(40, numberOfBuffersInUse);
}

@Test
public void testSkipUpdate() {
int maxBufferSize = 32768;
int minBufferSize = 256;
double threshold = 0.3;
BufferDebloater bufferDebloater =
testBufferDebloater()
.withDebloatTarget(1000)
.withBufferSize(minBufferSize, maxBufferSize)
// 30 % Threshold.
.withThresholdPercentages((int) (threshold * 100))
.getBufferDebloater();

int currentBufferSize = maxBufferSize / 2;

OptionalInt optionalInt = bufferDebloater.recalculateBufferSize(currentBufferSize, 1);
assertTrue(optionalInt.isPresent());
assertEquals(currentBufferSize, optionalInt.getAsInt());

// It is true because less than threshold.
assertTrue(bufferDebloater.skipUpdate(currentBufferSize));
assertTrue(bufferDebloater.skipUpdate(currentBufferSize - 1));
assertTrue(bufferDebloater.skipUpdate(currentBufferSize + 1));

assertTrue(
bufferDebloater.skipUpdate(
currentBufferSize - (int) (currentBufferSize * threshold) + 1));
assertTrue(
bufferDebloater.skipUpdate(
currentBufferSize + (int) (currentBufferSize * threshold) - 1));

// It is false because it reaches threshold.
assertFalse(
bufferDebloater.skipUpdate(
currentBufferSize - (int) (currentBufferSize * threshold)));
assertFalse(
bufferDebloater.skipUpdate(
currentBufferSize + (int) (currentBufferSize * threshold)));
assertFalse(bufferDebloater.skipUpdate(minBufferSize + 1));
assertFalse(bufferDebloater.skipUpdate(minBufferSize));
assertFalse(bufferDebloater.skipUpdate(maxBufferSize - 1));
assertFalse(bufferDebloater.skipUpdate(maxBufferSize));

// Beyond the min and max size is always false.
assertFalse(bufferDebloater.skipUpdate(maxBufferSize + 1));
assertFalse(bufferDebloater.skipUpdate(minBufferSize - 1));
}

public static BufferDebloaterTestBuilder testBufferDebloater() {
return new BufferDebloaterTestBuilder();
}
Expand All @@ -173,6 +222,7 @@ private static class BufferDebloaterTestBuilder {
private int minBufferSize;
private int maxBufferSize;
private int debloatTarget;
private int thresholdPercentages = BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES.defaultValue();

public BufferDebloaterTestBuilder withNumberOfBuffersInUse(Integer numberOfBuffersInUse) {
this.numberOfBuffersInUse = numberOfBuffersInUse;
Expand All @@ -195,6 +245,11 @@ public BufferDebloaterTestBuilder withDebloatTarget(int debloatTarget) {
return this;
}

public BufferDebloaterTestBuilder withThresholdPercentages(int thresholdPercentages) {
this.thresholdPercentages = thresholdPercentages;
return this;
}

public void expectNoChangeInBufferSize() {
BufferDebloater bufferDebloater = getBufferDebloater();

Expand All @@ -219,12 +274,7 @@ public BufferDebloater expectBufferSize(int expectedBufferSize) {

private BufferDebloater getBufferDebloater() {
return new BufferDebloater(
0,
debloatTarget,
maxBufferSize,
minBufferSize,
BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES.defaultValue(),
1);
0, debloatTarget, maxBufferSize, minBufferSize, thresholdPercentages, 1);
}
}
}

0 comments on commit 6c82d0b

Please sign in to comment.