Skip to content

Commit

Permalink
[FLINK-27902][network] Introduce isBlockingOrBlockingPersistentResult…
Browse files Browse the repository at this point in the history
…Partition and replace all remaining isBlocking calls.
  • Loading branch information
reswqa authored and xintongsong committed Jun 8, 2022
1 parent 24a7504 commit ac870f9
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ public void testUnionClosedBranchingTest() throws Exception {
if (!executionMode.equals(ExecutionMode.PIPELINED_FORCED)) {
assertTrue(
"Expected batch exchange, but result type is " + dsType + ".",
dsType.isBlocking());
dsType.isBlockingOrBlockingPersistentResultPartition());
} else {
assertFalse(
"Expected non-batch exchange, but result type is " + dsType + ".",
dsType.isBlocking());
dsType.isBlockingOrBlockingPersistentResultPartition());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ public List<IntermediateResultPartition> finishAllBlockingPartitions() {
List<IntermediateResultPartition> finishedBlockingPartitions = null;

for (IntermediateResultPartition partition : resultPartitions.values()) {
if (partition.getResultType().isBlocking()) {
if (!partition.getResultType().canBePipelinedConsumed()) {

partition.markFinished();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public boolean isConsumable() {
}

void resetForNewExecution() {
if (getResultType().isBlocking() && hasDataProduced) {
if (!getResultType().canBePipelinedConsumed() && hasDataProduced) {
// A BLOCKING result partition with data produced means it is finished
// Need to add the running producer count of the result on resetting it
for (ConsumedPartitionGroup consumedPartitionGroup : getConsumedPartitionGroups()) {
Expand All @@ -179,7 +179,7 @@ private EdgeManager getEdgeManager() {

void markFinished() {
// Sanity check that this is only called on blocking partitions.
if (!getResultType().isBlocking()) {
if (getResultType().canBePipelinedConsumed()) {
throw new IllegalStateException(
"Tried to mark a non-blocking result partition as finished");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public ResultPartition create(
int maxParallelism,
SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
BufferCompressor bufferCompressor = null;
if (type.isBlocking() && blockingShuffleCompressionEnabled) {
if (type.isBlockingOrBlockingPersistentResultPartition()
&& blockingShuffleCompressionEnabled) {
bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ public boolean isReleaseByUpstream() {
return releaseBy == ReleaseBy.UPSTREAM;
}

/**
* {@link #isBlockingOrBlockingPersistentResultPartition()} is used to judge whether it is the
* specified {@link #BLOCKING} or {@link #BLOCKING_PERSISTENT} resultPartitionType.
*
* <p>this method suitable for judgment conditions related to the specific implementation of
* {@link ResultPartitionType}.
*
* <p>this method not related to data consumption and partition release. As for the logic
* related to partition release, use {@link #isReleaseByScheduler()} instead, and as consume
* type, use {@link #mustBePipelinedConsumed()} or {@link #canBePipelinedConsumed()} instead.
*/
public boolean isBlockingOrBlockingPersistentResultPartition() {
return this == BLOCKING || this == BLOCKING_PERSISTENT;
}

/**
* {@link #isPipelinedOrPipelinedBoundedResultPartition()} is used to judge whether it is the
* specified {@link #PIPELINED} or {@link #PIPELINED_BOUNDED} resultPartitionType.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public SingleInputGate create(
createBufferPoolFactory(networkBufferPool, floatingNetworkBuffersPerGate);

BufferDecompressor bufferDecompressor = null;
if (igdd.getConsumedPartitionType().isBlocking() && blockingShuffleCompressionEnabled) {
if (igdd.getConsumedPartitionType().isBlockingOrBlockingPersistentResultPartition()
&& blockingShuffleCompressionEnabled) {
bufferDecompressor = new BufferDecompressor(networkBufferSize, compressionCodec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private void initializeConsumedPartitionGroups() {
SchedulingResultPartition consumedPartition =
resultPartitionRetriever.apply(consumedPartitionGroup.getFirst());

if (consumedPartition.getResultType().isBlocking()) {
if (!consumedPartition.getResultType().canBePipelinedConsumed()) {
blockingConsumedPartitionGroupSet.add(consumedPartitionGroup);
}
if (consumedPartition.getResultType().isReleaseByScheduler()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void checkAllExchangesBlocking(final JobGraph jobGraph) {
for (JobVertex jobVertex : jobGraph.getVertices()) {
for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) {
checkState(
dataSet.getResultType().isBlocking(),
dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition(),
String.format(
"At the moment, adaptive batch scheduler requires batch workloads "
+ "to be executed with types of all edges being BLOCKING. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public static Pair<Integer, Integer> getMinMaxNetworkBuffersPerResultPartition(
final int sortShuffleMinBuffers,
final int numSubpartitions,
final ResultPartitionType type) {
boolean isSortShuffle = type.isBlocking() && numSubpartitions >= sortShuffleMinParallelism;
boolean isSortShuffle =
type.isBlockingOrBlockingPersistentResultPartition()
&& numSubpartitions >= sortShuffleMinParallelism;
int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions + 1;
int max =
type.isBounded()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private static JobGraph createJobGraph(
map.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, resultPartitionType);
sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, resultPartitionType);

if (resultPartitionType.isPipelined()) {
if (!resultPartitionType.isBlockingOrBlockingPersistentResultPartition()) {
return JobGraphTestUtils.streamingJobGraph(source, map, sink);

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private int calculateBuffersConsumption(SingleInputGate inputGate) throws Except
}

private int calculateBuffersConsumption(ResultPartition partition) {
if (partition.getPartitionType().isBlocking()) {
if (!partition.getPartitionType().canBePipelinedConsumed()) {
return partition.getBufferPool().getNumberOfRequiredMemorySegments();
} else {
return partition.getBufferPool().getMaxNumberOfMemorySegments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,14 +1014,14 @@ private void connect(Integer headOfChain, StreamEdge edge) {

private void checkBufferTimeout(ResultPartitionType type, StreamEdge edge) {
long bufferTimeout = edge.getBufferTimeout();
if (type.isBlocking()
if (!type.canBePipelinedConsumed()
&& bufferTimeout != ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) {
throw new UnsupportedOperationException(
"Blocking partition does not support buffer timeout "
"only canBePipelinedConsumed partition support buffer timeout "
+ bufferTimeout
+ " for src operator in edge "
+ edge
+ ". \nPlease either disable buffer timeout (via -1) or use the non-blocking partition.");
+ ". \nPlease either disable buffer timeout (via -1) or use the canBePipelinedConsumed partition.");
}
}

Expand Down

0 comments on commit ac870f9

Please sign in to comment.