diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java index 462e8a6445fdf..e65a3aec4488b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java @@ -88,8 +88,12 @@ public Sink asSpecializedSink() { if (sink.getGlobalCommittableSerializer().isPresent()) { globalCommitter = true; } - if (sink.getCommittableSerializer().isPresent()) { - committer = true; + try { + if (sink.createCommitter().isPresent()) { + committer = true; + } + } catch (IOException e) { + throw new IllegalStateException("Failed to instantiate committer.", e); } if (globalCommitter && committer && stateful) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java index f43c099330017..35f68f2495ee5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/transformations/SinkV1AdapterTest.java @@ -128,6 +128,22 @@ private static class CommittingSinkV1 extends DefaultSinkV1 { public Optional> getCommittableSerializer() { return Optional.of(new NoOpSerializer()); } + + @Override + public Optional> createCommitter() throws IOException { + return Optional.of( + new Committer() { + @Override + public List commit(List committables) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + }); + } } private static class StatefulCommittingSinkV1 extends CommittingSinkV1 {