Skip to content

Commit

Permalink
[FLINK-8001] [kafka] Prevent PeriodicWatermarkEmitter from violating …
Browse files Browse the repository at this point in the history
…IDLE status

Prior to this commit, a bug exists such that if a Kafka consumer subtask
initially marks itself as idle because it didn't have any partitions to
subscribe to, that idleness status will be violated when the
PeriodicWatermarkEmitter is fired.

The problem is that the PeriodicWatermarkEmitter incorrecty yields a
Long.MAX_VALUE watermark even when there are no partitions to subscribe
to. This commit fixes this by additionally ensuring that the aggregated
watermark in the PeriodicWatermarkEmitterr is an effective one (i.e., is
really aggregated from some partition).
  • Loading branch information
tzulitai committed Nov 8, 2017
1 parent 81e1ac3 commit 6bce2b8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ public void start() {
public void onProcessingTime(long timestamp) throws Exception {

long minAcrossAll = Long.MAX_VALUE;
boolean isEffectiveMinAggregation = false;
for (KafkaTopicPartitionState<?> state : allPartitions) {

// we access the current watermark for the periodic assigners under the state
Expand All @@ -659,10 +660,11 @@ public void onProcessingTime(long timestamp) throws Exception {
}

minAcrossAll = Math.min(minAcrossAll, curr);
isEffectiveMinAggregation = true;
}

// emit next watermark, if there is one
if (minAcrossAll > lastWatermarkTimestamp) {
if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) {
lastWatermarkTimestamp = minAcrossAll;
emitter.emitWatermark(new Watermark(minAcrossAll));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,35 @@ public void testPeriodicWatermarks() throws Exception {
assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
}

@Test
public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWatermarks() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();

TestSourceContext<Long> sourceContext = new TestSourceContext<>();

TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();

TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
null, /* punctuated watermarks assigner*/
processingTimeProvider,
10);

processingTimeProvider.setCurrentTime(10);
// no partitions; when the periodic watermark emitter fires, no watermark should be emitted
assertFalse(sourceContext.hasWatermark());

// counter-test that when the fetcher does actually have partitions,
// when the periodic watermark emitter fires again, a watermark really is emitted
fetcher.addDiscoveredPartitions(Collections.singletonList(new KafkaTopicPartition(testTopic, 0)));
fetcher.emitRecord(100L, fetcher.subscribedPartitionStates().get(0), 3L);
processingTimeProvider.setCurrentTime(20);
assertEquals(100, sourceContext.getLatestWatermark().getTimestamp());
}

// ------------------------------------------------------------------------
// Test mocks
// ------------------------------------------------------------------------
Expand Down

0 comments on commit 6bce2b8

Please sign in to comment.