Skip to content

Commit

Permalink
MINOR: increase timeout for unstable KTableSourceTopicRestartIntegrat…
Browse files Browse the repository at this point in the history
…ionTest (apache#4445)

Reviewers: Guozhang Wang <[email protected]>
  • Loading branch information
mjsax authored and guozhangwang committed Jan 22, 2018
1 parent 9ef883e commit 2c08d7e
Showing 1 changed file with 9 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@

@Category({IntegrationTest.class})
public class KTableSourceTopicRestartIntegrationTest {


private static final int NUM_BROKERS = 3;
private static final String SOURCE_TOPIC = "source-topic";

Expand All @@ -72,10 +70,8 @@ public class KTableSourceTopicRestartIntegrationTest {
private Map<String, String> expectedInitialResultsMap;
private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;


@BeforeClass
public static void setUpBeforeAllTests() throws Exception {

CLUSTER.createTopic(SOURCE_TOPIC);

STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-restore-from-source");
Expand All @@ -97,7 +93,6 @@ public static void setUpBeforeAllTests() throws Exception {

@Before
public void before() {

final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC);
kTable.toStream().foreach(new ForeachAction<String, String>() {
@Override
Expand All @@ -115,10 +110,8 @@ public void after() throws IOException {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}


@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {

try {
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streamsOne.start();
Expand All @@ -136,15 +129,13 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDi
produceKeyValues("f", "g", "h");

assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");

} finally {
streamsOne.close(5, TimeUnit.SECONDS);
}
}

@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {

try {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
Expand All @@ -163,15 +154,13 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEn
produceKeyValues("f", "g", "h");

assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");

} finally {
streamsOne.close(5, TimeUnit.SECONDS);
}
}

@Test
public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {

try {
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streamsOne.start();
Expand All @@ -189,7 +178,6 @@ public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() thr
final Map<String, String> expectedValues = createExpectedResultsMap("a", "b", "c", "f", "g", "h");

assertNumberValuesRead(readKeyValues, expectedValues, "Table did not get all values after restart");

} finally {
streamsOne.close(5, TimeUnit.SECONDS);
}
Expand All @@ -198,14 +186,15 @@ public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() thr
private void assertNumberValuesRead(final Map<String, String> valueMap,
final Map<String, String> expectedMap,
final String errorMessage) throws InterruptedException {

TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return valueMap.equals(expectedMap);
}
}, errorMessage);

TestUtils.waitForCondition(
new TestCondition() {
@Override
public boolean conditionMet() {
return valueMap.equals(expectedMap);
}
},
30 * 1000L,
errorMessage);
}

private void produceKeyValues(final String... keys) throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit 2c08d7e

Please sign in to comment.