Skip to content

Commit

Permalink
[FLINK-24857][test][FileSource][Kafka] Upgrade SourceReaderTestBase t…
Browse files Browse the repository at this point in the history
…o JUnit 5
  • Loading branch information
imaffe authored and fapaul committed Jan 3, 2022
1 parent ceb11ad commit 1b02f02
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -50,75 +48,81 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** A unit test class for {@link SourceReaderBase}. */
public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> {

@Rule public ExpectedException expectedException = ExpectedException.none();

@Test
public void testExceptionInSplitReader() throws Exception {
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("One or more fetchers have encountered exception");
final String errMsg = "Testing Exception";

FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>();
// We have to handle split changes first, otherwise fetch will not be called.
try (MockSourceReader reader =
new MockSourceReader(
elementsQueue,
() ->
new SplitReader<int[], MockSourceSplit>() {
@Override
public RecordsWithSplitIds<int[]> fetch() {
throw new RuntimeException(errMsg);
}

@Override
public void handleSplitsChanges(
SplitsChange<MockSourceSplit> splitsChanges) {}

@Override
public void wakeUp() {}

@Override
public void close() {}
},
getConfig(),
new TestingReaderContext())) {
ValidatingSourceOutput output = new ValidatingSourceOutput();
reader.addSplits(
Collections.singletonList(
getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED)));
reader.notifyNoMoreSplits();
// This is not a real infinite loop, it is supposed to throw exception after two polls.
while (true) {
InputStatus inputStatus = reader.pollNext(output);
assertNotEquals(InputStatus.END_OF_INPUT, inputStatus);
// Add a sleep to avoid tight loop.
Thread.sleep(1);
}
}
void testExceptionInSplitReader() {
assertThatThrownBy(
() -> {
final String errMsg = "Testing Exception";

FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue = new FutureCompletingBlockingQueue<>();
// We have to handle split changes first, otherwise fetch will not be
// called.
try (MockSourceReader reader =
new MockSourceReader(
elementsQueue,
() ->
new SplitReader<int[], MockSourceSplit>() {
@Override
public RecordsWithSplitIds<int[]> fetch() {
throw new RuntimeException(errMsg);
}

@Override
public void handleSplitsChanges(
SplitsChange<MockSourceSplit>
splitsChanges) {}

@Override
public void wakeUp() {}

@Override
public void close() {}
},
getConfig(),
new TestingReaderContext())) {
ValidatingSourceOutput output = new ValidatingSourceOutput();
reader.addSplits(
Collections.singletonList(
getSplit(
0,
NUM_RECORDS_PER_SPLIT,
Boundedness.CONTINUOUS_UNBOUNDED)));
reader.notifyNoMoreSplits();
// This is not a real infinite loop, it is supposed to throw
// exception after
// two polls.
while (true) {
InputStatus inputStatus = reader.pollNext(output);
assertThat(inputStatus).isNotEqualTo(InputStatus.END_OF_INPUT);
// Add a sleep to avoid tight loop.
Thread.sleep(1);
}
}
})
.isInstanceOf(RuntimeException.class)
.hasMessage("One or more fetchers have encountered exception");
}

@Test
public void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
final TestingRecordsWithSplitIds<String> records =
new TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records);

reader.pollNext(new TestingReaderOutput<>());

assertFalse(records.isRecycled());
assertThat(records.isRecycled()).isFalse();
}

@Test
public void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
final TestingRecordsWithSplitIds<String> records =
new TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records);
Expand All @@ -129,11 +133,11 @@ public void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
reader.pollNext(new TestingReaderOutput<>());
reader.pollNext(new TestingReaderOutput<>());

assertTrue(records.isRecycled());
assertThat(records.isRecycled()).isTrue();
}

@Test
public void testMultipleSplitsWithDifferentFinishingMoments() throws Exception {
void testMultipleSplitsWithDifferentFinishingMoments() throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
Expand Down Expand Up @@ -169,7 +173,7 @@ public void testMultipleSplitsWithDifferentFinishingMoments() throws Exception {
}

@Test
public void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception {
void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
Expand Down Expand Up @@ -205,7 +209,7 @@ public void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception {
}

@Test
public void testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue()
void testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue()
throws Exception {

FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
Expand All @@ -231,8 +235,8 @@ public void testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverE

// Add the last record to the split when the splitFetcherManager shutting down SplitFetchers
splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> split.addRecord(1));
assertEquals(
InputStatus.MORE_AVAILABLE, sourceReader.pollNext(new TestingReaderOutput<>()));
assertThat(sourceReader.pollNext(new TestingReaderOutput<>()))
.isEqualTo(InputStatus.MORE_AVAILABLE);
}

// ---------------- helper methods -----------------
Expand Down
Loading

0 comments on commit 1b02f02

Please sign in to comment.