From 40eb2c2b245a818f8217fabb62fad27dfdde1e1d Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Fri, 2 Feb 2024 12:09:06 +0800 Subject: [PATCH] [FLINK-34371][runtime] Expose OperatorAttributes to Operator and StreamNodes --- .../flink/streaming/api/graph/StreamNode.java | 7 +++ .../api/operators/SimpleOperatorFactory.java | 5 ++ .../api/operators/StreamOperator.java | 12 +++++ .../api/operators/StreamOperatorFactory.java | 12 +++++ .../graph/StreamingJobGraphGeneratorTest.java | 48 +++++++++++++++++++ ...bstractUdfStreamOperatorLifecycleTest.java | 1 + 6 files changed, 85 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index c3fcae19a02e9..d86cec1059e3c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -443,4 +443,11 @@ public void setSupportsConcurrentExecutionAttempts( boolean supportsConcurrentExecutionAttempts) { this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; } + + public boolean isOutputOnlyAfterEndOfStream() { + if (operatorFactory == null) { + return false; + } + return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java index 95329baaef478..7c22e4af97cf2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java @@ -132,4 +132,9 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi public Class getStreamOperatorClass(ClassLoader classLoader) { return operator.getClass(); } + + @Override + public OperatorAttributes getOperatorAttributes() { + return operator.getOperatorAttributes(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 134b9129a284f..bff512eb4a39d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.metrics.groups.OperatorMetricGroup; @@ -148,4 +149,15 @@ OperatorSnapshotFutures snapshotState( OperatorMetricGroup getMetricGroup(); OperatorID getOperatorID(); + + /** + * Called to get the OperatorAttributes of the operator. If there is no defined attribute, a + * default OperatorAttributes is built. + * + * @return OperatorAttributes of the operator. + */ + @Experimental + default OperatorAttributes getOperatorAttributes() { + return new OperatorAttributesBuilder().build(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java index 410028fb0d552..4b9abf6af4a06 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -87,4 +88,15 @@ default void setInputType(TypeInformation type, ExecutionConfig executionConf /** Returns the runtime class of the stream operator. */ Class getStreamOperatorClass(ClassLoader classLoader); + + /** + * Is called to get the OperatorAttributes of the operator. OperatorAttributes can inform the + * frame to optimize the job performance. + * + * @return OperatorAttributes of the operator. + */ + @Experimental + default OperatorAttributes getOperatorAttributes() { + return new OperatorAttributesBuilder().build(); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 1e7c6f7ce3b45..6ab8a11ab2d6f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -103,6 +103,8 @@ import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamMap; @@ -2303,6 +2305,36 @@ void testOutputFormatSupportConcurrentExecutionAttempts() { new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true); } + @Test + void testOutputOnlyAfterEndOfStream() { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + + final DataStream source = env.fromData(1, 2, 3).name("source"); + source.transform( + "transform", + Types.INT, + new StreamOperatorWithConfigurableOperatorAttributes<>( + x -> x, + new OperatorAttributesBuilder() + .setOutputOnlyAfterEndOfStream(true) + .build())) + .map(x -> x) + .sinkTo(new DiscardingSink<>()) + .name("sink"); + + final StreamGraph streamGraph = env.getStreamGraph(); + Map nodeMap = new HashMap<>(); + for (StreamNode node : streamGraph.getStreamNodes()) { + nodeMap.put(node.getOperatorName(), node); + } + assertThat(nodeMap).hasSize(4); + assertThat(nodeMap.get("Source: source").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue(); + assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse(); + assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse(); + } + private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts( OutputFormat outputFormat, boolean isSupported) { final StreamExecutionEnvironment env = @@ -2945,4 +2977,20 @@ public void run(SourceContext sourceContext) { @Override public void cancel() {} } + + private static class StreamOperatorWithConfigurableOperatorAttributes + extends StreamMap { + private final OperatorAttributes attributes; + + public StreamOperatorWithConfigurableOperatorAttributes( + MapFunction mapper, OperatorAttributes attributes) { + super(mapper); + this.attributes = attributes; + } + + @Override + public OperatorAttributes getOperatorAttributes() { + return attributes; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index 3d4125bac394a..6833271f728bd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -99,6 +99,7 @@ public class AbstractUdfStreamOperatorLifecycleTest { + "finish[], " + "getCurrentKey[], " + "getMetricGroup[], " + + "getOperatorAttributes[], " + "getOperatorID[], " + "initializeState[interface org.apache.flink.streaming.api.operators.StreamTaskStateInitializer], " + "notifyCheckpointAborted[long], "