Skip to content

Commit

Permalink
[FLINK-25726][streaming] Check committer existence in SinkV1Adapter t…
Browse files Browse the repository at this point in the history
…o determine if a committer is needed

Before this commit the serializer presence was the indicator.
Unfortunately, the serializer is also required in case of using a global
committer. The topology having a single writer and a global committer in
Sink V1 would have failed.
  • Loading branch information
fapaul committed Feb 4, 2022
1 parent 5846d8d commit 051ad67
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ public Sink<InputT> asSpecializedSink() {
if (sink.getGlobalCommittableSerializer().isPresent()) {
globalCommitter = true;
}
if (sink.getCommittableSerializer().isPresent()) {
committer = true;
try {
if (sink.createCommitter().isPresent()) {
committer = true;
}
} catch (IOException e) {
throw new IllegalStateException("Failed to instantiate committer.", e);
}

if (globalCommitter && committer && stateful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,22 @@ private static class CommittingSinkV1 extends DefaultSinkV1 {
public Optional<SimpleVersionedSerializer<Integer>> getCommittableSerializer() {
return Optional.of(new NoOpSerializer());
}

@Override
public Optional<Committer<Integer>> createCommitter() throws IOException {
return Optional.of(
new Committer<Integer>() {
@Override
public List<Integer> commit(List<Integer> committables) {
throw new UnsupportedOperationException();
}

@Override
public void close() {
throw new UnsupportedOperationException();
}
});
}
}

private static class StatefulCommittingSinkV1 extends CommittingSinkV1 {
Expand Down

0 comments on commit 051ad67

Please sign in to comment.