From 051ad6759e0213e0356d5559c5a2b96d279f18fb Mon Sep 17 00:00:00 2001 From: Fabian Paul Date: Fri, 21 Jan 2022 16:08:17 +0100 Subject: [PATCH] [FLINK-25726][streaming] Check committer existence in SinkV1Adapter to determine if a committer is needed Before this commit the serializer presence was the indicator. Unfortunately, the serializer is also required in case of using a global committer. The topology having a single writer and a global committer in Sink V1 would have failed. --- .../api/transformations/SinkV1Adapter.java | 8 ++++++-- .../api/transformations/SinkV1AdapterTest.java | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java index 462e8a6445fdf..e65a3aec4488b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java @@ -88,8 +88,12 @@ public Sink asSpecializedSink() { if (sink.getGlobalCommittableSerializer().isPresent()) { globalCommitter = true; } - if (sink.getCommittableSerializer().isPresent()) { - committer = true; + try { + if (sink.createCommitter().isPresent()) { + committer = true; + } + } catch (IOException e) { + throw new IllegalStateException("Failed to instantiate committer.", e); } if (globalCommitter && committer && stateful) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java index f43c099330017..35f68f2495ee5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java @@ -128,6 +128,22 @@ private static class CommittingSinkV1 extends DefaultSinkV1 { public Optional> getCommittableSerializer() { return Optional.of(new NoOpSerializer()); } + + @Override + public Optional> createCommitter() throws IOException { + return Optional.of( + new Committer() { + @Override + public List commit(List committables) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + }); + } } private static class StatefulCommittingSinkV1 extends CommittingSinkV1 {