Skip to content

Commit

Permalink
[FLINK-34371][runtime] Apply batch settings on EOF operators
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored and xintongsong committed Feb 28, 2024
1 parent d4e0084 commit 89b8c5c
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ public List<Transformation<?>> getTransitivePredecessors() {
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}

public boolean isOutputOnlyAfterEndOfStream() {
return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,8 @@ public List<Transformation<?>> getInputs() {
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}

public boolean isOutputOnlyAfterEndOfStream() {
return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,8 @@ public List<Transformation<?>> getTransitivePredecessors() {
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}

public boolean isOutputOnlyAfterEndOfStream() {
return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,34 +54,22 @@ public class MultiInputTransformationTranslator<OUT>
protected Collection<Integer> translateForBatchInternal(
final AbstractMultipleInputTransformation<OUT> transformation, final Context context) {
Collection<Integer> ids = translateInternal(transformation, context);
if (transformation instanceof KeyedMultipleInputTransformation) {
KeyedMultipleInputTransformation<OUT> keyedTransformation =
(KeyedMultipleInputTransformation<OUT>) transformation;
List<Transformation<?>> inputs = transformation.getInputs();
List<KeySelector<?, ?>> keySelectors = keyedTransformation.getStateKeySelectors();

StreamConfig.InputRequirement[] inputRequirements =
IntStream.range(0, inputs.size())
.mapToObj(
idx -> {
if (keySelectors.get(idx) != null) {
return StreamConfig.InputRequirement.SORTED;
} else {
return StreamConfig.InputRequirement.PASS_THROUGH;
}
})
.toArray(StreamConfig.InputRequirement[]::new);
maybeApplyBatchExecutionSettings(transformation, context);

BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, inputRequirements);
}
return ids;
}

@Override
protected Collection<Integer> translateForStreamingInternal(
final AbstractMultipleInputTransformation<OUT> transformation, final Context context) {
return translateInternal(transformation, context);
Collection<Integer> ids = translateInternal(transformation, context);

if (transformation.isOutputOnlyAfterEndOfStream()) {
maybeApplyBatchExecutionSettings(transformation, context);
}

return ids;
}

private Collection<Integer> translateInternal(
Expand Down Expand Up @@ -141,4 +129,27 @@ private Collection<Integer> translateInternal(

return Collections.singleton(transformationId);
}

private void maybeApplyBatchExecutionSettings(
final AbstractMultipleInputTransformation<OUT> transformation, final Context context) {
if (transformation instanceof KeyedMultipleInputTransformation) {
KeyedMultipleInputTransformation<OUT> keyedTransformation =
(KeyedMultipleInputTransformation<OUT>) transformation;
List<Transformation<?>> inputs = transformation.getInputs();
List<KeySelector<?, ?>> keySelectors = keyedTransformation.getStateKeySelectors();
StreamConfig.InputRequirement[] inputRequirements =
IntStream.range(0, inputs.size())
.mapToObj(
idx -> {
if (keySelectors.get(idx) != null) {
return StreamConfig.InputRequirement.SORTED;
} else {
return StreamConfig.InputRequirement.PASS_THROUGH;
}
})
.toArray(StreamConfig.InputRequirement[]::new);
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, inputRequirements);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,45 @@ public final class OneInputTransformationTranslator<IN, OUT>
@Override
public Collection<Integer> translateForBatchInternal(
final OneInputTransformation<IN, OUT> transformation, final Context context) {
KeySelector<IN, ?> keySelector = transformation.getStateKeySelector();
Collection<Integer> ids =
translateInternal(
transformation,
transformation.getOperatorFactory(),
transformation.getInputType(),
keySelector,
transformation.getStateKeySelector(),
transformation.getStateKeyType(),
context);
boolean isKeyed = keySelector != null;
if (isKeyed) {
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, StreamConfig.InputRequirement.SORTED);
}

maybeApplyBatchExecutionSettings(transformation, context);

return ids;
}

@Override
public Collection<Integer> translateForStreamingInternal(
final OneInputTransformation<IN, OUT> transformation, final Context context) {
return translateInternal(
transformation,
transformation.getOperatorFactory(),
transformation.getInputType(),
transformation.getStateKeySelector(),
transformation.getStateKeyType(),
context);
Collection<Integer> ids =
translateInternal(
transformation,
transformation.getOperatorFactory(),
transformation.getInputType(),
transformation.getStateKeySelector(),
transformation.getStateKeyType(),
context);

if (transformation.isOutputOnlyAfterEndOfStream()) {
maybeApplyBatchExecutionSettings(transformation, context);
}

return ids;
}

private void maybeApplyBatchExecutionSettings(
final OneInputTransformation<IN, OUT> transformation, final Context context) {
KeySelector<IN, ?> keySelector = transformation.getStateKeySelector();
if (keySelector != null) {
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, StreamConfig.InputRequirement.SORTED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,21 @@ protected Collection<Integer> translateForBatchInternal(
final TwoInputTransformation<IN1, IN2, OUT> transformation, final Context context) {
Collection<Integer> ids = translateInternal(transformation, context);

StreamConfig.InputRequirement input1Requirement =
transformation.getStateKeySelector1() != null
? StreamConfig.InputRequirement.SORTED
: StreamConfig.InputRequirement.PASS_THROUGH;

StreamConfig.InputRequirement input2Requirement =
transformation.getStateKeySelector2() != null
? StreamConfig.InputRequirement.SORTED
: StreamConfig.InputRequirement.PASS_THROUGH;
maybeApplyBatchExecutionSettings(transformation, context);

if (input1Requirement == StreamConfig.InputRequirement.SORTED
|| input2Requirement == StreamConfig.InputRequirement.SORTED) {
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, input1Requirement, input2Requirement);
}
return ids;
}

@Override
protected Collection<Integer> translateForStreamingInternal(
final TwoInputTransformation<IN1, IN2, OUT> transformation, final Context context) {
return translateInternal(transformation, context);
Collection<Integer> ids = translateInternal(transformation, context);

if (transformation.isOutputOnlyAfterEndOfStream()) {
maybeApplyBatchExecutionSettings(transformation, context);
}

return ids;
}

private Collection<Integer> translateInternal(
Expand All @@ -83,4 +76,23 @@ private Collection<Integer> translateInternal(
transformation.getStateKeySelector2(),
context);
}

private void maybeApplyBatchExecutionSettings(
final TwoInputTransformation<IN1, IN2, OUT> transformation, final Context context) {
StreamConfig.InputRequirement input1Requirement =
transformation.getStateKeySelector1() != null
? StreamConfig.InputRequirement.SORTED
: StreamConfig.InputRequirement.PASS_THROUGH;

StreamConfig.InputRequirement input2Requirement =
transformation.getStateKeySelector2() != null
? StreamConfig.InputRequirement.SORTED
: StreamConfig.InputRequirement.PASS_THROUGH;

if (input1Requirement == StreamConfig.InputRequirement.SORTED
|| input2Requirement == StreamConfig.InputRequirement.SORTED) {
BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(), context, input1Requirement, input2Requirement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
Expand All @@ -112,6 +113,7 @@
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
Expand Down Expand Up @@ -2311,7 +2313,8 @@ void testOutputOnlyAfterEndOfStream() {
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());

final DataStream<Integer> source = env.fromData(1, 2, 3).name("source");
source.transform(
source.keyBy(x -> x)
.transform(
"transform",
Types.INT,
new StreamOperatorWithConfigurableOperatorAttributes<>(
Expand All @@ -2334,6 +2337,10 @@ void testOutputOnlyAfterEndOfStream() {
assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue();
assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse();
assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse();
assertManagedMemoryWeightsSize(nodeMap.get("Source: source"), 0);
assertManagedMemoryWeightsSize(nodeMap.get("transform"), 1);
assertManagedMemoryWeightsSize(nodeMap.get("Map"), 0);
assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0);

JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
Map<String, JobVertex> vertexMap = new HashMap<>();
Expand All @@ -2345,7 +2352,6 @@ void testOutputOnlyAfterEndOfStream() {
vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED);
assertHasOutputPartitionType(
vertexMap.get("transform -> Map"), ResultPartitionType.BLOCKING);

assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse();
assertThat(vertexMap.get("transform -> Map").isAnyOutputBlocking()).isTrue();
assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse();
Expand All @@ -2361,13 +2367,47 @@ void testOutputOnlyAfterEndOfStream() {
vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED);
assertHasOutputPartitionType(vertexMap.get("transform"), ResultPartitionType.BLOCKING);
assertHasOutputPartitionType(vertexMap.get("Map"), ResultPartitionType.PIPELINED_BOUNDED);

assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse();
assertThat(vertexMap.get("transform").isAnyOutputBlocking()).isTrue();
assertThat(vertexMap.get("Map").isAnyOutputBlocking()).isFalse();
assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse();
}

@Test
void testApplyBatchExecutionSettingsOnTwoInputOperator() {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());

final DataStream<Integer> source1 = env.fromData(1, 2, 3).name("source1");
final DataStream<Integer> source2 = env.fromData(1, 2, 3).name("source2");
source1.keyBy(x -> x)
.connect(source2.keyBy(x -> x))
.transform(
"transform",
Types.INT,
new TwoInputStreamOperatorWithConfigurableOperatorAttributes<>(
new OperatorAttributesBuilder()
.setOutputOnlyAfterEndOfStream(true)
.build()))
.sinkTo(new DiscardingSink<>())
.name("sink");

final StreamGraph streamGraph = env.getStreamGraph(false);
Map<String, StreamNode> nodeMap = new HashMap<>();
for (StreamNode node : streamGraph.getStreamNodes()) {
nodeMap.put(node.getOperatorName(), node);
}
assertThat(nodeMap).hasSize(4);
assertManagedMemoryWeightsSize(nodeMap.get("Source: source1"), 0);
assertManagedMemoryWeightsSize(nodeMap.get("Source: source2"), 0);
assertManagedMemoryWeightsSize(nodeMap.get("transform"), 1);
assertManagedMemoryWeightsSize(nodeMap.get("sink: Writer"), 0);
}

private void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) {
assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize);
}

private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
OutputFormat<Integer> outputFormat, boolean isSupported) {
final StreamExecutionEnvironment env =
Expand Down Expand Up @@ -3026,4 +3066,31 @@ public OperatorAttributes getOperatorAttributes() {
return attributes;
}
}

private static class TwoInputStreamOperatorWithConfigurableOperatorAttributes<IN1, IN2, OUT>
extends CoProcessOperator<IN1, IN2, OUT> {
private final OperatorAttributes attributes;

public TwoInputStreamOperatorWithConfigurableOperatorAttributes(
OperatorAttributes attributes) {
super(new NoOpCoProcessFunction<>());
this.attributes = attributes;
}

@Override
public OperatorAttributes getOperatorAttributes() {
return attributes;
}
}

private static class NoOpCoProcessFunction<IN1, IN2, OUT>
extends CoProcessFunction<IN1, IN2, OUT> {
@Override
public void processElement1(
IN1 value, CoProcessFunction<IN1, IN2, OUT>.Context ctx, Collector<OUT> out) {}

@Override
public void processElement2(
IN2 value, CoProcessFunction<IN1, IN2, OUT>.Context ctx, Collector<OUT> out) {}
}
}

0 comments on commit 89b8c5c

Please sign in to comment.