diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java index da74b08946a10..95c3688ad8bf3 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java @@ -236,6 +236,11 @@ public void run() { } if (record == null) { this.emptyQueues.put(min, true); + } else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) { + // if we stopped emitting due to reaching max timestamp, + // the next queue may not be the new min + heads.offer(nextQueue); + nextQueue = min; } else { heads.offer(min); } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java index 1948237566e34..84949cca3d48d 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java @@ -17,12 +17,14 @@ package org.apache.flink.streaming.connectors.kinesis.util; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,10 +34,10 @@ /** Test for {@link RecordEmitter}. */ public class RecordEmitterTest { - static List results = Collections.synchronizedList(new ArrayList<>()); - private class TestRecordEmitter extends RecordEmitter { + private List results = Collections.synchronizedList(new ArrayList<>()); + private TestRecordEmitter() { super(DEFAULT_QUEUE_CAPACITY); } @@ -68,14 +70,66 @@ public void test() throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(emitter); - long timeout = System.currentTimeMillis() + 10_000; - while (results.size() != 4 && System.currentTimeMillis() < timeout) { - Thread.sleep(100); + Deadline dl = Deadline.fromNow(Duration.ofSeconds(10)); + while (emitter.results.size() != 4 && dl.hasTimeLeft()) { + Thread.sleep(10); } emitter.stop(); executor.shutdownNow(); - Assert.assertThat(results, Matchers.contains(one, five, two, ten)); + Assert.assertThat(emitter.results, Matchers.contains(one, five, two, ten)); } + @Test + public void testRetainMinAfterReachingLimit() throws Exception { + + TestRecordEmitter emitter = new TestRecordEmitter(); + + final TimestampedValue one = new TimestampedValue<>("1", 1); + final TimestampedValue two = new TimestampedValue<>("2", 2); + final TimestampedValue three = new TimestampedValue<>("3", 3); + final TimestampedValue ten = new TimestampedValue<>("10", 10); + final TimestampedValue eleven = new TimestampedValue<>("11", 11); + + final TimestampedValue twenty = new TimestampedValue<>("20", 20); + final TimestampedValue thirty = new TimestampedValue<>("30", 30); + + final RecordEmitter.RecordQueue queue0 = emitter.getQueue(0); + final RecordEmitter.RecordQueue queue1 = emitter.getQueue(1); + + queue0.put(one); + queue0.put(two); + queue0.put(three); + queue0.put(ten); + queue0.put(eleven); + + queue1.put(twenty); + queue1.put(thirty); + + emitter.setMaxLookaheadMillis(1); + emitter.setCurrentWatermark(5); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(emitter); + try { + // emits one record past the limit + Deadline dl = Deadline.fromNow(Duration.ofSeconds(10)); + while (emitter.results.size() != 4 && dl.hasTimeLeft()) { + Thread.sleep(10); + } + Assert.assertThat(emitter.results, Matchers.contains(one, two, three, ten)); + + // advance watermark, emits remaining record from queue0 + emitter.setCurrentWatermark(10); + dl = Deadline.fromNow(Duration.ofSeconds(10)); + while (emitter.results.size() != 5 && dl.hasTimeLeft()) { + Thread.sleep(10); + } + Assert.assertThat(emitter.results, Matchers.contains(one, two, three, ten, eleven)); + } + finally { + emitter.stop(); + executor.shutdownNow(); + } + } }