Skip to content

Commit

Permalink
[FLINK-23453][streaming] Created the buffer debloater for the ability…
Browse files Browse the repository at this point in the history
… to automatically change the buffer size based on the throughput.
  • Loading branch information
akalash authored and pnowojski committed Aug 5, 2021
1 parent b92fa30 commit 9158fc7
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class ThroughputCalculator {
private long currentAccumulatedDataSize;
private long currentMeasurementTime;
private long measurementStartTime = NOT_TRACKED;
private long lastThroughput;

public ThroughputCalculator(Clock clock, int numberOfSamples) {
this.clock = clock;
Expand Down Expand Up @@ -62,13 +61,13 @@ public long calculateThroughput() {
currentMeasurementTime += clock.relativeTimeMillis() - measurementStartTime;
}

lastThroughput =
long throughput =
throughputEMA.calculateThroughput(
currentAccumulatedDataSize, currentMeasurementTime);

measurementStartTime = clock.relativeTimeMillis();
currentAccumulatedDataSize = currentMeasurementTime = 0;

return lastThroughput;
return throughput;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.tasks.bufferdebloat;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;

import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES;
import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Class for automatic calculation of the buffer size based on the current throughput and
* configuration.
*/
public class BufferDebloater {
private static final double MILLIS_IN_SECOND = 1000.0;

/**
* How different should be the total buffer size compare to throughput (when it is 1.0 then
* bufferSize == throughput).
*/
private final double targetBufferSizeFactor;

private final IndexedInputGate[] inputGates;
private final long maxBufferSize;
private final long minBufferSize;
private final int bufferDebloatThresholdPercentages;

private int lastBufferSize;

public BufferDebloater(Configuration taskConfig, IndexedInputGate[] inputGates) {
this.inputGates = inputGates;
this.targetBufferSizeFactor =
taskConfig.get(BUFFER_DEBLOAT_TARGET).toMillis() / MILLIS_IN_SECOND;
this.maxBufferSize = taskConfig.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE).getBytes();
this.minBufferSize = taskConfig.get(TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE).getBytes();

this.bufferDebloatThresholdPercentages =
taskConfig.getInteger(BUFFER_DEBLOAT_THRESHOLD_PERCENTAGES);

this.lastBufferSize = (int) maxBufferSize;

// Right now the buffer size can not be grater than integer max value according to
// MemorySegment and buffer implementation.
checkArgument(maxBufferSize <= Integer.MAX_VALUE);
checkArgument(maxBufferSize > 0);
checkArgument(minBufferSize > 0);
checkArgument(maxBufferSize >= minBufferSize);
checkArgument(targetBufferSizeFactor > 0.0);
}

public void recalculateBufferSize(long currentThroughput) {
long desiredTotalBufferSizeInBytes = (long) (currentThroughput * targetBufferSizeFactor);

int totalNumber = 0;
for (IndexedInputGate inputGate : inputGates) {
totalNumber += Math.max(1, inputGate.getBuffersInUseCount());
}
int newSize =
(int)
Math.max(
minBufferSize,
Math.min(
desiredTotalBufferSizeInBytes / totalNumber,
maxBufferSize));

boolean skipUpdate =
Math.abs(1 - ((double) lastBufferSize) / newSize) * 100
< bufferDebloatThresholdPercentages;

// Skip update if the new value pretty close to the old one.
if (skipUpdate) {
return;
}

lastBufferSize = newSize;
for (IndexedInputGate inputGate : inputGates) {
inputGate.announceBufferSize(newSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.tasks.bufferdebloat;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

import static java.util.Arrays.asList;
import static org.apache.flink.configuration.MemorySize.MemoryUnit.BYTES;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_TARGET;
import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_SEGMENT_SIZE;
import static org.apache.flink.configuration.TaskManagerOptions.MIN_MEMORY_SEGMENT_SIZE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

/** Test for {@link BufferDebloater}. */
public class BufferDebloaterTest extends TestLogger {

@Test
public void testZeroBuffersInUse() {
// if the gate returns the zero buffers in use it should be transformed to 1.
testBufferSizeCalculation(3, asList(0, 1, 0), 3333, 50, 2400, 1000, 1111);
}

@Test
public void testCorrectBufferSizeCalculation() {
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, 1200, 249);
}

@Test
public void testCalculatedBufferSizeLessThanMin() {
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 250, 1100, 1200, 250);
}

@Test
public void testCalculatedBufferSizeForThroughputZero() {
// When the throughput is zero then min buffer size will be taken.
testBufferSizeCalculation(3, asList(3, 5, 8), 0, 50, 1100, 1200, 50);
}

@Test
public void testConfiguredConsumptionTimeIsTooLow() {
// When the consumption time is low then min buffer size will be taken.
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, 7, 50);
}

@Test
public void testCalculatedBufferSizeGreaterThanMax() {
// New calculated buffer size should be more than max value it means that we should take max
// value which means that no updates should happen(-1 means that we take the initial value)
// because the old value equal to new value.
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 248, 1200, -1);
}

@Test
public void testCalculatedBufferSlightlyDifferentFromCurrentOne() {
// New calculated buffer size should be a little less than current value(or max value which
// is the same) it means that no updates should happen(-1 means that we take the initial
// value) because the new value is not so different from the old one.
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 250, 1200, -1);
}

@Test(expected = IllegalArgumentException.class)
public void testNegativeMinBufferSize() {
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, -1, 248, 1200, 248);
}

@Test(expected = IllegalArgumentException.class)
public void testNegativeMaxBufferSize() {
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, -1, 1200, 248);
}

@Test(expected = IllegalArgumentException.class)
public void testMinGreaterThanMaxBufferSize() {
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 49, 1200, 248);
}

@Test(expected = IllegalArgumentException.class)
public void testNegativeConsumptionTime() {
testBufferSizeCalculation(3, asList(3, 5, 8), 3333, 50, 1100, -1, 248);
}

private void testBufferSizeCalculation(
int numberOfGates,
List<Integer> numberOfBuffersInUse,
long throughput,
long minBufferSize,
long maxBufferSize,
int consumptionTime,
long expectedBufferSize) {
TestBufferSizeInputGate[] inputGates = new TestBufferSizeInputGate[numberOfGates];
for (int i = 0; i < numberOfGates; i++) {
inputGates[i] = new TestBufferSizeInputGate(numberOfBuffersInUse.get(i));
}

BufferDebloater bufferDebloater =
new BufferDebloater(
new Configuration()
.set(BUFFER_DEBLOAT_ENABLED, true)
.set(BUFFER_DEBLOAT_TARGET, Duration.ofMillis(consumptionTime))
.set(
MEMORY_SEGMENT_SIZE,
MemorySize.parse("" + maxBufferSize, BYTES))
.set(
MIN_MEMORY_SEGMENT_SIZE,
MemorySize.parse("" + minBufferSize, BYTES)),
inputGates);

// when: Buffer size is calculated.
bufferDebloater.recalculateBufferSize(throughput);

// then: Buffer size is in all gates should be as expected.
for (int i = 0; i < numberOfGates; i++) {
assertThat(inputGates[i].lastBufferSize, is(expectedBufferSize));
}
}

private static class TestBufferSizeInputGate extends MockInputGate {
private long lastBufferSize = -1;
private final int bufferInUseCount;

public TestBufferSizeInputGate(int bufferInUseCount) {
// Number of channels don't make sense here because
super(1, Collections.emptyList());
this.bufferInUseCount = bufferInUseCount;
}

@Override
public int getBuffersInUseCount() {
return bufferInUseCount;
}

@Override
public void announceBufferSize(int bufferSize) {
lastBufferSize = bufferSize;
}
}
}

0 comments on commit 9158fc7

Please sign in to comment.