Skip to content

Commit

Permalink
[FLINK-33668][runtime] Decouple the network memory and job topology o…
Browse files Browse the repository at this point in the history
…n input gate side
  • Loading branch information
jiangxin369 authored and reswqa committed Mar 8, 2024
1 parent 012b893 commit 2e25789
Show file tree
Hide file tree
Showing 33 changed files with 464 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.runtime.io.network.metrics;

import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -28,27 +31,35 @@
*/
public class CreditBasedInputBuffersUsageGauge extends AbstractBuffersUsageGauge {

private final FloatingBuffersUsageGauge floatingBuffersUsageGauge;
private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;

public CreditBasedInputBuffersUsageGauge(
FloatingBuffersUsageGauge floatingBuffersUsageGauge,
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
SingleInputGate[] inputGates) {
public CreditBasedInputBuffersUsageGauge(SingleInputGate[] inputGates) {
super(checkNotNull(inputGates));
this.floatingBuffersUsageGauge = checkNotNull(floatingBuffersUsageGauge);
this.exclusiveBuffersUsageGauge = checkNotNull(exclusiveBuffersUsageGauge);
}

@Override
public int calculateUsedBuffers(SingleInputGate inputGate) {
return floatingBuffersUsageGauge.calculateUsedBuffers(inputGate)
+ exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate);
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
int numBuffers = bufferPool.bestEffortGetNumOfUsedBuffers();
for (InputChannel ic : inputGate.inputChannels()) {
if (ic instanceof RemoteInputChannel) {
RemoteInputChannel remoteInputChannel = (RemoteInputChannel) ic;
numBuffers -= remoteInputChannel.unsynchronizedGetFloatingBuffersAvailable();
numBuffers -=
remoteInputChannel.getNumExclusiveBuffers()
- remoteInputChannel.unsynchronizedGetExclusiveBuffersUsed();
}
}
return Math.max(0, numBuffers);
}
return 0;
}

@Override
public int calculateTotalBuffers(SingleInputGate inputGate) {
return floatingBuffersUsageGauge.calculateTotalBuffers(inputGate)
+ exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate);
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
return inputGate.getBufferPool().getNumBuffers();
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public int calculateTotalBuffers(SingleInputGate inputGate) {
int totalExclusiveBuffers = 0;
for (InputChannel ic : inputGate.inputChannels()) {
if (ic instanceof RemoteInputChannel) {
totalExclusiveBuffers += ((RemoteInputChannel) ic).getInitialCredit();
totalExclusiveBuffers += ((RemoteInputChannel) ic).getNumExclusiveBuffers();
}
}
return totalExclusiveBuffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,39 @@

package org.apache.flink.runtime.io.network.metrics;

import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Gauge metric measuring the floating buffers usage gauge for {@link SingleInputGate}s. */
public class FloatingBuffersUsageGauge extends AbstractBuffersUsageGauge {

public FloatingBuffersUsageGauge(SingleInputGate[] inputGates) {
private final CreditBasedInputBuffersUsageGauge totalBuffersUsageGauge;
private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;

public FloatingBuffersUsageGauge(
SingleInputGate[] inputGates,
CreditBasedInputBuffersUsageGauge totalBuffersUsageGauge,
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge) {
super(checkNotNull(inputGates));

this.totalBuffersUsageGauge = checkNotNull(totalBuffersUsageGauge);
this.exclusiveBuffersUsageGauge = checkNotNull(exclusiveBuffersUsageGauge);
}

@Override
public int calculateUsedBuffers(SingleInputGate inputGate) {
int availableFloatingBuffers = 0;
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
int requestedFloatingBuffers = bufferPool.bestEffortGetNumOfUsedBuffers();
for (InputChannel ic : inputGate.inputChannels()) {
if (ic instanceof RemoteInputChannel) {
availableFloatingBuffers +=
((RemoteInputChannel) ic).unsynchronizedGetFloatingBuffersAvailable();
}
}
return Math.max(0, requestedFloatingBuffers - availableFloatingBuffers);
}
return 0;
return Math.max(
0,
totalBuffersUsageGauge.calculateUsedBuffers(inputGate)
- exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
}

@Override
public int calculateTotalBuffers(SingleInputGate inputGate) {
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
return inputGate.getBufferPool().getNumBuffers();
}
return 0;
return Math.max(
0,
totalBuffersUsageGauge.calculateTotalBuffers(inputGate)
- exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ private static void registerInputMetrics(
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates));
buffersGroup.gauge(METRIC_INPUT_QUEUE_SIZE, new InputBuffersSizeGauge(inputGates));

FloatingBuffersUsageGauge floatingBuffersUsageGauge =
new FloatingBuffersUsageGauge(inputGates);
CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge =
new CreditBasedInputBuffersUsageGauge(inputGates);
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge =
new ExclusiveBuffersUsageGauge(inputGates);
CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge =
new CreditBasedInputBuffersUsageGauge(
floatingBuffersUsageGauge, exclusiveBuffersUsageGauge, inputGates);
FloatingBuffersUsageGauge floatingBuffersUsageGauge =
new FloatingBuffersUsageGauge(
inputGates, creditBasedInputBuffersUsageGauge, exclusiveBuffersUsageGauge);
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, exclusiveBuffersUsageGauge);
buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, floatingBuffersUsageGauge);
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, creditBasedInputBuffersUsageGauge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void requestSubpartition(
partitionId,
subpartitionIndexSet,
inputChannel.getInputChannelId(),
inputChannel.getInitialCredit());
inputChannel.getNumExclusiveBuffers());

final ChannelFutureListener listener =
future -> {
Expand Down
Loading

0 comments on commit 2e25789

Please sign in to comment.