Skip to content

Commit

Permalink
[FLINK-20031] Keep the UID of SinkWriter same as the SinkTransformation
Browse files Browse the repository at this point in the history
In case we want to migrate the StreamingFileSink to the new Sink API we
might need to let user set the SinkWriter's uid same as the
StreamingFileSink's. So that SinkWriter operator has the opportunity to
reuse the old state. (This is just to keep the option open for now.)

For this we need to let SinkWriter operator's uid is the same as the
SinkTransformation.
  • Loading branch information
guoweiM authored and aljoscha committed Nov 8, 2020
1 parent a6bea22 commit dd17dc7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;
Expand Down Expand Up @@ -181,6 +183,7 @@ private int addWriter(
hasState ? new StatefulSinkWriterOperatorFactory<>(sinkTransformation.getSink()) : new StatelessSinkWriterOperatorFactory<>(
sinkTransformation.getSink());

final String prefix = "Sink Writer:";
final ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();

if (chainingStrategy != null) {
Expand All @@ -192,7 +195,8 @@ private int addWriter(
context.getStreamNodeIds(input),
inputTypeInfo,
extractCommittableTypeInformation(sinkTransformation.getSink()),
"Sink Writer:",
String.format("%s %s", prefix, sinkTransformation.getName()),
sinkTransformation.getUid(),
parallelism,
sinkTransformation.getMaxParallelism(),
sinkTransformation,
Expand Down Expand Up @@ -221,6 +225,7 @@ private int addCommitter(
return -1;
}

final String prefix = "Sink Committer:";
final CommittableTypeInformation<CommT> committableTypeInfo = extractCommittableTypeInformation(
sinkTransformation.getSink());
checkNotNull(committableTypeInfo);
Expand All @@ -230,7 +235,11 @@ private int addCommitter(
Collections.singletonList(inputId),
committableTypeInfo,
committableTypeInfo,
"Sink Committer:",
String.format("%s %s", prefix, sinkTransformation.getName()),
sinkTransformation.getUid() == null ? null : String.format(
"%s %s",
prefix,
sinkTransformation.getUid()),
parallelism,
maxParallelism,
sinkTransformation,
Expand All @@ -254,12 +263,18 @@ private void addGlobalCommitter(
return;
}

final String prefix = "Sink Global Committer:";

addOperatorToStreamGraph(
globalCommitterFactory,
Collections.singletonList(inputId),
checkNotNull(extractCommittableTypeInformation(sinkTransformation.getSink())),
null,
"Sink Global Committer:",
String.format("%s %s", prefix, sinkTransformation.getName()),
sinkTransformation.getUid() == null ? null : String.format(
"%s %s",
prefix,
sinkTransformation.getUid()),
1,
1,
sinkTransformation,
Expand All @@ -281,7 +296,8 @@ private int getParallelism(
* @param inputs A collection of upstream stream node ids.
* @param inTypeInfo The input type information of the operator
* @param outTypInfo The output type information of the operator
* @param prefix The prefix of the name and uid of the operator
* @param name The name of the operator.
* @param uid The uid of the operator.
* @param parallelism The parallelism of the operator
* @param maxParallelism The max parallelism of the operator
* @param sinkTransformation The sink transformation which the operator belongs to
Expand All @@ -293,7 +309,8 @@ private <IN, OUT> int addOperatorToStreamGraph(
Collection<Integer> inputs,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypInfo,
String prefix,
String name,
@Nullable String uid,
int parallelism,
int maxParallelism,
SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation,
Expand All @@ -309,7 +326,7 @@ private <IN, OUT> int addOperatorToStreamGraph(
operatorFactory,
inTypeInfo,
outTypInfo,
String.format("%s %s", prefix, sinkTransformation.getName()));
name);

streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, maxParallelism);
Expand All @@ -319,10 +336,8 @@ private <IN, OUT> int addOperatorToStreamGraph(
transformationId,
sinkTransformation,
context.getDefaultBufferTimeout());
if (sinkTransformation.getUid() != null) {
streamGraph.setTransformationUID(
transformationId,
String.format("%s %s", prefix, sinkTransformation.getUid()));
if (uid != null) {
streamGraph.setTransformationUID(transformationId, uid);
}

for (int input : inputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void generateWriterTopology() {
sourceNode,
IntSerializer.class,
writerNode,
"Writer",
String.format("Sink Writer: %s", NAME),
UID,
StatelessSinkWriterOperatorFactory.class,
PARALLELISM,
-1);
Expand All @@ -111,7 +112,8 @@ public void generateWriterCommitterTopology() {
writerNode,
SimpleVersionedSerializerTypeSerializerProxy.class,
committerNode,
"Committer",
String.format("Sink Committer: %s", NAME),
String.format("Sink Committer: %s", UID),
committerClass,
runtimeExecutionMode == RuntimeExecutionMode.STREAMING ? PARALLELISM : 1,
runtimeExecutionMode == RuntimeExecutionMode.STREAMING ? -1 : 1);
Expand All @@ -135,7 +137,8 @@ public void generateWriterCommitterGlobalCommitterTopology() {
committerNode,
SimpleVersionedSerializerTypeSerializerProxy.class,
globalCommitterNode,
"Global Committer",
String.format("Sink Global Committer: %s", NAME),
String.format("Sink Global Committer: %s", UID),
globalCommitterClass,
1,
1);
Expand All @@ -158,7 +161,8 @@ public void generateWriterGlobalCommitterTopology() {
writerNode,
SimpleVersionedSerializerTypeSerializerProxy.class,
globalCommitterNode,
"Global Committer",
String.format("Sink Global Committer: %s", NAME),
String.format("Sink Global Committer: %s", UID),
globalCommitterClass,
1,
1);
Expand Down Expand Up @@ -208,7 +212,8 @@ private void validateTopology(
StreamNode src,
Class<?> srcOutTypeInfo,
StreamNode dest,
String midName,
String name,
String uid,
Class<?> expectedOperatorFactory,
int expectedParallelism,
int expectedMaxParallelism) {
Expand All @@ -224,13 +229,12 @@ private void validateTopology(
assertThat(dest.getTypeSerializersIn()[0], instanceOf(srcOutTypeInfo));

//verify dest node
assertThat(dest.getOperatorName(), equalTo(String.format("Sink %s: %s", midName, NAME)));
assertThat(dest.getOperatorName(), equalTo(name));
assertThat(dest.getTransformationUID(), equalTo(uid));

assertThat(
dest.getOperatorFactory(),
instanceOf(expectedOperatorFactory));
assertThat(
dest.getTransformationUID(),
equalTo(String.format("Sink %s: %s", midName, UID)));
assertThat(dest.getParallelism(), equalTo(expectedParallelism));
assertThat(dest.getMaxParallelism(), equalTo(expectedMaxParallelism));
assertThat(
Expand Down

0 comments on commit dd17dc7

Please sign in to comment.