Skip to content

Commit

Permalink
[FLINK-25575][streaming] Scope batch exchanges in PartitionTransforma…
Browse files Browse the repository at this point in the history
…tion to batch mode.

This change allows DataStream internally to insert fail-over regions into the operator graph that are only used in batch mode. For streaming mode, these exchanges are reset to UNDEFINED and set during StreamGraph construction.

Co-authored-by: Arvid Heise <[email protected]>
  • Loading branch information
fapaul and Arvid Heise committed Feb 2, 2022
1 parent 107390f commit ee8a795
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -45,17 +46,19 @@ public class PartitionTransformationTranslator<OUT>
@Override
protected Collection<Integer> translateForBatchInternal(
final PartitionTransformation<OUT> transformation, final Context context) {
return translateInternal(transformation, context);
return translateInternal(transformation, context, true);
}

@Override
protected Collection<Integer> translateForStreamingInternal(
final PartitionTransformation<OUT> transformation, final Context context) {
return translateInternal(transformation, context);
return translateInternal(transformation, context, false);
}

private Collection<Integer> translateInternal(
final PartitionTransformation<OUT> transformation, final Context context) {
final PartitionTransformation<OUT> transformation,
final Context context,
boolean supportsBatchExchange) {
checkNotNull(transformation);
checkNotNull(context);

Expand All @@ -70,13 +73,17 @@ private Collection<Integer> translateInternal(

List<Integer> resultIds = new ArrayList<>();

StreamExchangeMode exchangeMode = transformation.getExchangeMode();
// StreamExchangeMode#BATCH has no effect in streaming mode so we can safely reset it to
// UNDEFINED and let Flink decide on the best exchange mode.
if (!supportsBatchExchange && exchangeMode == StreamExchangeMode.BATCH) {
exchangeMode = StreamExchangeMode.UNDEFINED;
}

for (Integer inputId : context.getStreamNodeIds(input)) {
final int virtualId = Transformation.getNewNodeId();
streamGraph.addVirtualPartitionNode(
inputId,
virtualId,
transformation.getPartitioner(),
transformation.getExchangeMode());
inputId, virtualId, transformation.getPartitioner(), exchangeMode);
resultIds.add(virtualId);
}
return resultIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
Expand All @@ -69,6 +71,7 @@
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

import org.assertj.core.api.Assertions;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
Expand Down Expand Up @@ -825,6 +828,28 @@ public void testTrackTransformationsByIdentity() {
.hasMessageContaining("Unknown transformation: FailingTransformation");
}

@Test
public void testResetBatchExchangeModeInStreamingExecution() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
PartitionTransformation<Integer> transformation =
new PartitionTransformation<>(
sourceDataStream.getTransformation(),
new RebalancePartitioner<>(),
StreamExchangeMode.BATCH);
DataStream<Integer> partitionStream = new DataStream<>(env, transformation);
partitionStream.map(value -> value).print();

final StreamGraph streamGraph = env.getStreamGraph();
Assertions.assertThat(streamGraph.getStreamEdges(1, 3))
.hasSize(1)
.satisfies(
e ->
Assertions.assertThat(e.get(0).getExchangeMode())
.isEqualTo(StreamExchangeMode.UNDEFINED));
}

private static class FailingTransformation extends Transformation<String> {
private final int hashCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ public void testExchangeModePipelined() {
@Test
public void testExchangeModeBatch() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setBufferTimeout(-1);
// fromElements -> Map -> Print
DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
Expand Down Expand Up @@ -784,11 +785,6 @@ protected ResultPartitionType featureValueOf(JobVertex actual) {
};
}

@Test(expected = UnsupportedOperationException.class)
public void testConflictExchangeModeWithBufferTimeout() {
testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.BATCH);
}

@Test
public void testNormalExchangeModeWithBufferTimeout() {
testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
Expand Down Expand Up @@ -1247,11 +1243,8 @@ public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
final JobVertex map1Vertex = verticesMatched.get(2);
final JobVertex map2Vertex = verticesMatched.get(3);

// vertices in the same region should be in the same slot sharing group
assertSameSlotSharingGroup(source1Vertex, map1Vertex);

// vertices in different regions should be in different slot sharing groups
assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex);
assertDistinctSharingGroups(source1Vertex, source2Vertex, map2Vertex, map1Vertex);
}

@Test
Expand Down Expand Up @@ -1509,6 +1502,7 @@ private static List<JobVertex> getExpectedVerticesList(List<JobVertex> vertices)
private StreamGraph createStreamGraphForSlotSharingTest() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setBufferTimeout(-1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

final DataStream<Integer> source1 = env.fromElements(1, 2, 3).name("source1");
source1.rebalance().map(v -> v).name("map1");
Expand Down

0 comments on commit ee8a795

Please sign in to comment.