Skip to content

Commit

Permalink
[FLINK-34371][runtime] Expose OperatorAttributes to Operator and Stre…
Browse files Browse the repository at this point in the history
…amNodes
  • Loading branch information
yunfengzhou-hub authored and xintongsong committed Feb 28, 2024
1 parent ea5f90a commit 40eb2c2
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,11 @@ public void setSupportsConcurrentExecutionAttempts(
boolean supportsConcurrentExecutionAttempts) {
this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts;
}

public boolean isOutputOnlyAfterEndOfStream() {
if (operatorFactory == null) {
return false;
}
return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfi
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return operator.getClass();
}

@Override
public OperatorAttributes getOperatorAttributes() {
return operator.getOperatorAttributes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
Expand Down Expand Up @@ -148,4 +149,15 @@ OperatorSnapshotFutures snapshotState(
OperatorMetricGroup getMetricGroup();

OperatorID getOperatorID();

/**
* Called to get the OperatorAttributes of the operator. If there is no defined attribute, a
* default OperatorAttributes is built.
*
* @return OperatorAttributes of the operator.
*/
@Experimental
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesBuilder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -87,4 +88,15 @@ default void setInputType(TypeInformation<?> type, ExecutionConfig executionConf

/** Returns the runtime class of the stream operator. */
Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader);

/**
* Is called to get the OperatorAttributes of the operator. OperatorAttributes can inform the
* frame to optimize the job performance.
*
* @return OperatorAttributes of the operator.
*/
@Experimental
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesBuilder().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OperatorAttributes;
import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamMap;
Expand Down Expand Up @@ -2303,6 +2305,36 @@ void testOutputFormatSupportConcurrentExecutionAttempts() {
new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true);
}

@Test
void testOutputOnlyAfterEndOfStream() {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());

final DataStream<Integer> source = env.fromData(1, 2, 3).name("source");
source.transform(
"transform",
Types.INT,
new StreamOperatorWithConfigurableOperatorAttributes<>(
x -> x,
new OperatorAttributesBuilder()
.setOutputOnlyAfterEndOfStream(true)
.build()))
.map(x -> x)
.sinkTo(new DiscardingSink<>())
.name("sink");

final StreamGraph streamGraph = env.getStreamGraph();
Map<String, StreamNode> nodeMap = new HashMap<>();
for (StreamNode node : streamGraph.getStreamNodes()) {
nodeMap.put(node.getOperatorName(), node);
}
assertThat(nodeMap).hasSize(4);
assertThat(nodeMap.get("Source: source").isOutputOnlyAfterEndOfStream()).isFalse();
assertThat(nodeMap.get("transform").isOutputOnlyAfterEndOfStream()).isTrue();
assertThat(nodeMap.get("Map").isOutputOnlyAfterEndOfStream()).isFalse();
assertThat(nodeMap.get("sink: Writer").isOutputOnlyAfterEndOfStream()).isFalse();
}

private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
OutputFormat<Integer> outputFormat, boolean isSupported) {
final StreamExecutionEnvironment env =
Expand Down Expand Up @@ -2945,4 +2977,20 @@ public void run(SourceContext<Integer> sourceContext) {
@Override
public void cancel() {}
}

private static class StreamOperatorWithConfigurableOperatorAttributes<IN, OUT>
extends StreamMap<IN, OUT> {
private final OperatorAttributes attributes;

public StreamOperatorWithConfigurableOperatorAttributes(
MapFunction<IN, OUT> mapper, OperatorAttributes attributes) {
super(mapper);
this.attributes = attributes;
}

@Override
public OperatorAttributes getOperatorAttributes() {
return attributes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
+ "finish[], "
+ "getCurrentKey[], "
+ "getMetricGroup[], "
+ "getOperatorAttributes[], "
+ "getOperatorID[], "
+ "initializeState[interface org.apache.flink.streaming.api.operators.StreamTaskStateInitializer], "
+ "notifyCheckpointAborted[long], "
Expand Down

0 comments on commit 40eb2c2

Please sign in to comment.