Skip to content

Commit

Permalink
[FLINK-14107][kinesis] Erroneous queue selection in record emitter ma…
Browse files Browse the repository at this point in the history
…y lead to deadlock
  • Loading branch information
tweise committed Sep 18, 2019
1 parent f65ee55 commit 4a4a147
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ public void run() {
}
if (record == null) {
this.emptyQueues.put(min, true);
} else if (nextQueue != null && nextQueue.headTimestamp > min.headTimestamp) {
// if we stopped emitting due to reaching max timestamp,
// the next queue may not be the new min
heads.offer(nextQueue);
nextQueue = min;
} else {
heads.offer(min);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.flink.streaming.connectors.kinesis.util;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;

import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -32,10 +34,10 @@
/** Test for {@link RecordEmitter}. */
public class RecordEmitterTest {

static List<TimestampedValue> results = Collections.synchronizedList(new ArrayList<>());

private class TestRecordEmitter extends RecordEmitter<TimestampedValue> {

private List<TimestampedValue> results = Collections.synchronizedList(new ArrayList<>());

private TestRecordEmitter() {
super(DEFAULT_QUEUE_CAPACITY);
}
Expand Down Expand Up @@ -68,14 +70,66 @@ public void test() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(emitter);

long timeout = System.currentTimeMillis() + 10_000;
while (results.size() != 4 && System.currentTimeMillis() < timeout) {
Thread.sleep(100);
Deadline dl = Deadline.fromNow(Duration.ofSeconds(10));
while (emitter.results.size() != 4 && dl.hasTimeLeft()) {
Thread.sleep(10);
}
emitter.stop();
executor.shutdownNow();

Assert.assertThat(results, Matchers.contains(one, five, two, ten));
Assert.assertThat(emitter.results, Matchers.contains(one, five, two, ten));
}

@Test
public void testRetainMinAfterReachingLimit() throws Exception {

TestRecordEmitter emitter = new TestRecordEmitter();

final TimestampedValue<String> one = new TimestampedValue<>("1", 1);
final TimestampedValue<String> two = new TimestampedValue<>("2", 2);
final TimestampedValue<String> three = new TimestampedValue<>("3", 3);
final TimestampedValue<String> ten = new TimestampedValue<>("10", 10);
final TimestampedValue<String> eleven = new TimestampedValue<>("11", 11);

final TimestampedValue<String> twenty = new TimestampedValue<>("20", 20);
final TimestampedValue<String> thirty = new TimestampedValue<>("30", 30);

final RecordEmitter.RecordQueue<TimestampedValue> queue0 = emitter.getQueue(0);
final RecordEmitter.RecordQueue<TimestampedValue> queue1 = emitter.getQueue(1);

queue0.put(one);
queue0.put(two);
queue0.put(three);
queue0.put(ten);
queue0.put(eleven);

queue1.put(twenty);
queue1.put(thirty);

emitter.setMaxLookaheadMillis(1);
emitter.setCurrentWatermark(5);

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(emitter);
try {
// emits one record past the limit
Deadline dl = Deadline.fromNow(Duration.ofSeconds(10));
while (emitter.results.size() != 4 && dl.hasTimeLeft()) {
Thread.sleep(10);
}
Assert.assertThat(emitter.results, Matchers.contains(one, two, three, ten));

// advance watermark, emits remaining record from queue0
emitter.setCurrentWatermark(10);
dl = Deadline.fromNow(Duration.ofSeconds(10));
while (emitter.results.size() != 5 && dl.hasTimeLeft()) {
Thread.sleep(10);
}
Assert.assertThat(emitter.results, Matchers.contains(one, two, three, ten, eleven));
}
finally {
emitter.stop();
executor.shutdownNow();
}
}
}

0 comments on commit 4a4a147

Please sign in to comment.