Skip to content

Commit

Permalink
[FLINK-12768][tests] Fix FlinkKinesisConsumerTest.testSourceSynchroni…
Browse files Browse the repository at this point in the history
…zation race condition

This closes apache#9183.
  • Loading branch information
tweise authored and tillrohrmann committed Aug 2, 2019
1 parent 61352fb commit 8da6965
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ private AsyncKinesisRecordEmitter(int queueCapacity) {
@Override
public void emit(RecordWrapper<T> record, RecordQueue<RecordWrapper<T>> queue) {
emitRecordAndUpdateState(record);
ShardWatermarkState<T> sws = shardWatermarks.get(queue.getQueueId());
sws.lastEmittedRecordWatermark = record.watermark;
}
}

Expand All @@ -289,11 +287,6 @@ public void put(RecordWrapper<T> record) {
emit(record, this);
}

@Override
public int getQueueId() {
return producerIndex;
}

@Override
public int getSize() {
return 0;
Expand Down Expand Up @@ -770,6 +763,8 @@ private void emitRecordAndUpdateState(RecordWrapper<T> rw) {
synchronized (checkpointLock) {
if (rw.getValue() != null) {
sourceContext.collectWithTimestamp(rw.getValue(), rw.timestamp);
ShardWatermarkState<T> sws = shardWatermarks.get(rw.shardStateIndex);
sws.lastEmittedRecordWatermark = rw.watermark;
} else {
LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.",
rw.lastSequenceNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ private int compareHeadElement(AsyncRecordQueue left, AsyncRecordQueue right) {
public interface RecordQueue<T> {
void put(T record) throws InterruptedException;

int getQueueId();

int getSize();

T peek();
Expand All @@ -98,21 +96,20 @@ private AsyncRecordQueue(int queueId) {
this.headTimestamp = Long.MAX_VALUE;
}

@Override
public void put(T record) throws InterruptedException {
queue.put(record);
synchronized (condition) {
condition.notify();
}
}

public int getQueueId() {
return queueId;
}

@Override
public int getSize() {
return queue.size();
}

@Override
public T peek() {
return queue.peek();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.CollectingSourceContext;
Expand All @@ -72,7 +74,9 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -531,8 +535,8 @@ public void testStreamShardMetadataSerializedUsingPojoSerializer() {
}

/**
* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
* {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#getShardId()} or
* {@link StreamShardMetadata#getStreamName()} does not result in the shard not being able to be restored.
* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
Expand Down Expand Up @@ -833,6 +837,7 @@ public void testSourceSynchronization() throws Exception {
final long autoWatermarkInterval = 1_000;
final long watermarkSyncInterval = autoWatermarkInterval + 1;

TestWatermarkTracker.WATERMARK.set(0);
HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>();
subscribedStreamsToLastDiscoveredShardIds.put(streamName, null);

Expand All @@ -846,10 +851,9 @@ public void testSourceSynchronization() throws Exception {
props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5));

BlockingQueue<String> shard1 = new LinkedBlockingQueue();
BlockingQueue<String> shard2 = new LinkedBlockingQueue();

Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>();
streamToQueueMap.put(streamName, Lists.newArrayList(shard1, shard2));
streamToQueueMap.put(streamName, Collections.singletonList(shard1));

// override createFetcher to mock Kinesis
FlinkKinesisConsumer<String> sourceFunc =
Expand Down Expand Up @@ -878,7 +882,16 @@ protected KinesisDataFetcher<String> createFetcher(
subscribedStreamsToLastDiscoveredShardIds,
(props) -> FakeKinesisBehavioursFactory.blockingQueueGetRecords(
streamToQueueMap)
) {};
) {
@Override
protected void emitWatermark() {
// necessary in this test to ensure that watermark state is updated
// before the watermark timer callback is triggered
synchronized (sourceContext.getCheckpointLock()) {
super.emitWatermark();
}
}
};
return fetcher;
}
};
Expand Down Expand Up @@ -952,27 +965,33 @@ public void emitWatermark(Watermark mark) {

// trigger sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
TestWatermarkTracker.assertSingleWatermark(-4);
TestWatermarkTracker.assertGlobalWatermark(-4);

final long record2 = record1 + (watermarkSyncInterval * 3) + 1;
shard1.put(Long.toString(record2));

// TODO: check for record received instead
Thread.sleep(100);
// wait for the record to be buffered in the emitter
final RecordEmitter<?> emitter = org.powermock.reflect.Whitebox.getInternalState(fetcher, "recordEmitter");
RecordEmitter.RecordQueue emitterQueue = emitter.getQueue(0);
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) {
Thread.sleep(10);
}
assertEquals("first record received", 1, emitterQueue.getSize());

// Advance the watermark. Since the new record is past global watermark + threshold,
// it won't be emitted and the watermark does not advance
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
assertEquals(3000L, (long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark"));
TestWatermarkTracker.assertSingleWatermark(-4);
TestWatermarkTracker.assertGlobalWatermark(-4);

// Trigger global watermark sync
testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
expectedResults.add(Long.toString(record2));
awaitRecordCount(results, expectedResults.size());
assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray()));
TestWatermarkTracker.assertSingleWatermark(3000);
TestWatermarkTracker.assertGlobalWatermark(3000);

// Trigger watermark update and emit
testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval);
Expand All @@ -984,8 +1003,8 @@ public void emitWatermark(Watermark mark) {
}

private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) throws Exception {
long timeoutMillis = System.currentTimeMillis() + 10_000;
while (System.currentTimeMillis() < timeoutMillis && queue.size() < count) {
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
while (deadline.hasTimeLeft() && queue.size() < count) {
Thread.sleep(10);
}
}
Expand Down Expand Up @@ -1018,7 +1037,7 @@ public long updateWatermark(long localWatermark) {
return localWatermark;
}

static void assertSingleWatermark(long expected) {
static void assertGlobalWatermark(long expected) {
Assert.assertEquals(expected, WATERMARK.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
String data = queue.take();
Record record = new Record()
.withData(
ByteBuffer.wrap(String.valueOf(data).getBytes(ConfigConstants.DEFAULT_CHARSET)))
ByteBuffer.wrap(data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(0));
Expand Down

0 comments on commit 8da6965

Please sign in to comment.