Skip to content

Commit

Permalink
[FLINK-26610][datastream] Check whether sink uid is set when expandin…
Browse files Browse the repository at this point in the history
…g sink topology.

Currently sink developers may set uid for operators inside the
customized topology. In this case if the sink uid is not set, there
will be duplicate operator uids if the sink is added multiple times
in a job.
  • Loading branch information
pltbkd authored and gaoyunhaii committed Mar 14, 2022
1 parent b8e7fc3 commit 04a56cb
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.junit.Rule;
Expand Down Expand Up @@ -60,6 +61,11 @@ protected FileSink<Integer> createFileSink(String path) {
.build();
}

@Override
protected void configureSink(DataStreamSink<Integer> sink) {
sink.uid("sink");
}

private static FileCompactor createFileCompactor() {
return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand Down Expand Up @@ -69,17 +70,21 @@ protected JobGraph createJobGraph(String path) {
"Source",
Boundedness.BOUNDED);

source.setParallelism(NUM_SOURCES)
.rebalance()
.map(new BatchExecutionOnceFailingMap(NUM_RECORDS, triggerFailover))
.setParallelism(NUM_SINKS)
.sinkTo(createFileSink(path))
.setParallelism(NUM_SINKS);
DataStreamSink<Integer> sink =
source.setParallelism(NUM_SOURCES)
.rebalance()
.map(new BatchExecutionOnceFailingMap(NUM_RECORDS, triggerFailover))
.setParallelism(NUM_SINKS)
.sinkTo(createFileSink(path))
.setParallelism(NUM_SINKS);
configureSink(sink);

StreamGraph streamGraph = env.getStreamGraph();
return streamGraph.getJobGraph();
}

protected void configureSink(DataStreamSink<Integer> sink) {}

// ------------------------ Blocking mode user functions ----------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.junit.Rule;
Expand Down Expand Up @@ -60,6 +61,11 @@ protected FileSink<Integer> createFileSink(String path) {
.build();
}

@Override
protected void configureSink(DataStreamSink<Integer> sink) {
sink.uid("sink");
}

private static FileCompactor createFileCompactor() {
return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
Expand Down Expand Up @@ -89,15 +90,21 @@ protected JobGraph createJobGraph(String path) {
env.setRestartStrategy(RestartStrategies.noRestart());
}

env.addSource(new StreamingExecutionTestSource(latchId, NUM_RECORDS, triggerFailover))
.setParallelism(NUM_SOURCES)
.sinkTo(createFileSink(path))
.setParallelism(NUM_SINKS);
DataStreamSink<Integer> sink =
env.addSource(
new StreamingExecutionTestSource(
latchId, NUM_RECORDS, triggerFailover))
.setParallelism(NUM_SOURCES)
.sinkTo(createFileSink(path))
.setParallelism(NUM_SINKS);
configureSink(sink);

StreamGraph streamGraph = env.getStreamGraph();
return streamGraph.getJobGraph();
}

protected void configureSink(DataStreamSink<Integer> sink) {}

// ------------------------ Streaming mode user functions ----------------------------------

private static class StreamingExecutionTestSource extends RichParallelSourceFunction<Integer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link
* org.apache.flink.streaming.api.transformations.SinkTransformation}.
Expand Down Expand Up @@ -129,7 +131,9 @@ private void expand() {
if (sink instanceof WithPreWriteTopology) {
prewritten =
adjustTransformations(
prewritten, ((WithPreWriteTopology<T>) sink)::addPreWriteTopology);
prewritten,
((WithPreWriteTopology<T>) sink)::addPreWriteTopology,
true);
}

if (sink instanceof TwoPhaseCommittingSink) {
Expand All @@ -141,7 +145,8 @@ private void expand() {
input.transform(
WRITER_NAME,
CommittableMessageTypeInfo.noOutput(),
new SinkWriterOperatorFactory<>(sink)));
new SinkWriterOperatorFactory<>(sink)),
false);
}

final List<Transformation<?>> sinkTransformations =
Expand Down Expand Up @@ -172,15 +177,17 @@ private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStre
input.transform(
WRITER_NAME,
typeInformation,
new SinkWriterOperatorFactory<>(sink)));
new SinkWriterOperatorFactory<>(sink)),
false);

DataStream<CommittableMessage<CommT>> precommitted = addFailOverRegion(written);

if (sink instanceof WithPreCommitTopology) {
precommitted =
adjustTransformations(
precommitted,
((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology);
((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology,
true);
}

DataStream<CommittableMessage<CommT>> committed =
Expand All @@ -193,7 +200,8 @@ private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStre
new CommitterOperatorFactory<>(
committingSink,
isBatchMode,
isCheckpointingEnabled)));
isCheckpointingEnabled)),
false);

if (sink instanceof WithPostCommitTopology) {
DataStream<CommittableMessage<CommT>> postcommitted = addFailOverRegion(committed);
Expand All @@ -202,7 +210,8 @@ private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStre
pc -> {
((WithPostCommitTopology<T, CommT>) sink).addPostCommitTopology(pc);
return null;
});
},
true);
}
}

Expand Down Expand Up @@ -233,7 +242,9 @@ private <I> DataStream<I> addFailOverRegion(DataStream<I> input) {
* customized parallelism value at environment level.
*/
private <I, R> R adjustTransformations(
DataStream<I> inputStream, Function<DataStream<I>, R> action) {
DataStream<I> inputStream,
Function<DataStream<I>, R> action,
boolean isExpandedTopology) {

// Reset the environment parallelism temporarily before adjusting transformations,
// we can therefore be aware of any customized parallelism of the sub topology
Expand All @@ -247,6 +258,16 @@ private <I, R> R adjustTransformations(
transformations.subList(numTransformsBefore, transformations.size());

for (Transformation<?> subTransformation : expandedTransformations) {
String subUid = subTransformation.getUid();
if (isExpandedTopology && subUid != null && !subUid.isEmpty()) {
checkState(
transformation.getUid() != null && !transformation.getUid().isEmpty(),
"Sink "
+ transformation.getName()
+ " requires to set a uid since its customized topology"
+ " has set uid for some operators.");
}

concatUid(
subTransformation,
Transformation::getUid,
Expand Down

0 comments on commit 04a56cb

Please sign in to comment.