diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index a128174ff2459..5240326cc0aed 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -648,6 +648,7 @@ public void start() { public void onProcessingTime(long timestamp) throws Exception { long minAcrossAll = Long.MAX_VALUE; + boolean isEffectiveMinAggregation = false; for (KafkaTopicPartitionState state : allPartitions) { // we access the current watermark for the periodic assigners under the state @@ -659,10 +660,11 @@ public void onProcessingTime(long timestamp) throws Exception { } minAcrossAll = Math.min(minAcrossAll, curr); + isEffectiveMinAggregation = true; } // emit next watermark, if there is one - if (minAcrossAll > lastWatermarkTimestamp) { + if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) { lastWatermarkTimestamp = minAcrossAll; emitter.emitWatermark(new Watermark(minAcrossAll)); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java index 46894a159166e..e4a58dd3e8324 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java @@ -360,6 +360,35 @@ public void testPeriodicWatermarks() throws Exception { assertTrue(watermarkTs >= 13L && watermarkTs <= 15L); } + @Test + public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception { + final String testTopic = "test topic name"; + Map originalPartitions = new HashMap<>(); + + TestSourceContext sourceContext = new TestSourceContext<>(); + + TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService(); + + TestFetcher fetcher = new TestFetcher<>( + sourceContext, + originalPartitions, + new SerializedValue>(new PeriodicTestExtractor()), + null, /* punctuated watermarks assigner*/ + processingTimeProvider, + 10); + + processingTimeProvider.setCurrentTime(10); + // no partitions; when the periodic watermark emitter fires, no watermark should be emitted + assertFalse(sourceContext.hasWatermark()); + + // counter-test that when the fetcher does actually have partitions, + // when the periodic watermark emitter fires again, a watermark really is emitted + fetcher.addDiscoveredPartitions(Collections.singletonList(new KafkaTopicPartition(testTopic, 0))); + fetcher.emitRecord(100L, fetcher.subscribedPartitionStates().get(0), 3L); + processingTimeProvider.setCurrentTime(20); + assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); + } + // ------------------------------------------------------------------------ // Test mocks // ------------------------------------------------------------------------