Skip to content

Commit

Permalink
KAFKA-12171: Migrate streams:test-utils module to JUnit 5 (apache#9856)
Browse files Browse the repository at this point in the history
* replace `org.junit.Assert` by `org.junit.jupiter.api.Assertions`
* replace `org.junit` by `org.junit.jupiter.api`
* replace `org.junit.runners.Parameterized` by `org.junit.jupiter.params.ParameterizedTest`
* replace `org.junit.runners.Parameterized.Parameters` by `org.junit.jupiter.params.provider.{Arguments, MethodSource}`
* replace `Before` by `BeforeEach`
* replace `After` by `AfterEach`

Reviewers: Ismael Juma <[email protected]>
  • Loading branch information
chia7712 authored Jan 14, 2021
1 parent 52b8aa0 commit 04827da
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 133 deletions.
5 changes: 2 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ subprojects {
}
}

def shouldUseJUnit5 = ["clients", "examples", "log4j-appender", "raft", "tools"].contains(it.project.name)
def shouldUseJUnit5 = ["clients", "examples", "log4j-appender", "raft", "test-utils", "tools"].contains(it.project.name)

def testLoggingEvents = ["passed", "skipped", "failed"]
def testShowStandardStreams = false
Expand Down Expand Up @@ -1490,8 +1490,7 @@ project(':streams:test-utils') {
compile project(':clients')

testCompile project(':clients').sourceSets.test.output
testCompile libs.junitJupiterApi
testCompile libs.junitVintageEngine
testCompile libs.junitJupiter
testCompile libs.easymock
testCompile libs.hamcrest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

@SuppressWarnings("deprecation") // this is a test of a deprecated API
public class MockProcessorContextTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.kafka.streams;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class MockTimeTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,7 +50,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class TestTopicsTest {
private static final Logger log = LoggerFactory.getLogger(TestTopicsTest.class);
Expand All @@ -66,7 +66,7 @@ public class TestTopicsTest {

private final Instant testBaseTime = Instant.parse("2019-06-01T10:00:00Z");

@Before
@BeforeEach
public void setup() {
final StreamsBuilder builder = new StreamsBuilder();
//Create Actual Stream Processing pipeline
Expand All @@ -77,7 +77,7 @@ public void setup() {
testDriver = new TopologyTestDriver(builder.build());
}

@After
@AfterEach
public void tearDown() {
try {
testDriver.close();
Expand Down Expand Up @@ -326,14 +326,14 @@ public void testMultipleTopics() {
public void testNonExistingOutputTopic() {
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic("no-exist", longSerde.deserializer(), stringSerde.deserializer());
assertThrows("Uninitialized topic", NoSuchElementException.class, outputTopic::readRecord);
assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Uninitialized topic");
}

@Test
public void testNonUsedOutputTopic() {
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer());
assertThrows("Uninitialized topic", NoSuchElementException.class, outputTopic::readRecord);
assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Uninitialized topic");
}

@Test
Expand All @@ -346,14 +346,14 @@ public void testEmptyTopic() {
inputTopic.pipeInput("Hello");
assertThat(outputTopic.readValue(), equalTo("Hello"));
//No more output in topic
assertThrows("Empty topic", NoSuchElementException.class, outputTopic::readRecord);
assertThrows(NoSuchElementException.class, outputTopic::readRecord, "Empty topic");
}

@Test
public void testNonExistingInputTopic() {
final TestInputTopic<Long, String> inputTopic =
testDriver.createInputTopic("no-exist", longSerde.serializer(), stringSerde.serializer());
assertThrows("Unknown topic", IllegalArgumentException.class, () -> inputTopic.pipeInput(1L, "Hello"));
assertThrows(IllegalArgumentException.class, () -> inputTopic.pipeInput(1L, "Hello"), "Unknown topic");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams;

import java.util.Collections;

public class TopologyTestDriverAtLeastOnceTest extends TopologyTestDriverTest {
TopologyTestDriverAtLeastOnceTest() {
super(Collections.singletonMap(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams;

import java.util.Collections;

public class TopologyTestDriverEosTest extends TopologyTestDriverTest {
TopologyTestDriverEosTest() {
super(Collections.singletonMap(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.time.Duration;
Expand Down Expand Up @@ -87,16 +84,24 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(value = Parameterized.class)
public class TopologyTestDriverTest {
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public abstract class TopologyTestDriverTest {

TopologyTestDriverTest(final Map<String, String> overrides) {
config = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
));
config.putAll(overrides);
}

private final static String SOURCE_TOPIC_1 = "source-topic-1";
private final static String SOURCE_TOPIC_2 = "source-topic-2";
private final static String SINK_TOPIC_1 = "sink-topic-1";
Expand Down Expand Up @@ -131,25 +136,6 @@ public class TopologyTestDriverTest {
private final StringDeserializer stringDeserializer = new StringDeserializer();
private final LongDeserializer longDeserializer = new LongDeserializer();

@Parameterized.Parameters(name = "Eos enabled = {0}")
public static Collection<Object[]> data() {
final List<Object[]> values = new ArrayList<>();
for (final boolean eosEnabled : Arrays.asList(true, false)) {
values.add(new Object[] {eosEnabled});
}
return values;
}

public TopologyTestDriverTest(final boolean eosEnabled) {
config = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
));
if (eosEnabled) {
config.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
}
}

private final static class TTDTestRecord {
private final Object key;
private final Object value;
Expand Down Expand Up @@ -319,7 +305,7 @@ public boolean isCheckSupplierCall() {
}
}

@After
@AfterEach
public void tearDown() {
if (testDriver != null) {
testDriver.close();
Expand Down Expand Up @@ -462,10 +448,9 @@ public void shouldInitProcessor() {
@Test
public void shouldCloseProcessor() {
testDriver = new TopologyTestDriver(setupSingleProcessorTopology(), config);

testDriver.close();
assertTrue(mockProcessors.get(0).closed);
// As testDriver is already closed, bypassing @After tearDown testDriver.close().
// As testDriver is already closed, bypassing @AfterEach tearDown testDriver.close().
testDriver = null;
}

Expand Down Expand Up @@ -912,8 +897,8 @@ public void shouldPopulateGlobalStore() {
testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1));

final KeyValueStore<byte[], byte[]> globalStore = testDriver.getKeyValueStore(SOURCE_TOPIC_1 + "-globalStore");
Assert.assertNotNull(globalStore);
Assert.assertNotNull(testDriver.getAllStateStores().get(SOURCE_TOPIC_1 + "-globalStore"));
assertNotNull(globalStore);
assertNotNull(testDriver.getAllStateStores().get(SOURCE_TOPIC_1 + "-globalStore"));

pipeRecord(SOURCE_TOPIC_1, testRecord1);

Expand Down Expand Up @@ -1562,15 +1547,13 @@ public void process(final Record<String, Long> record) {
assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
testDriver.pipeRecord("input-topic", new TestRecord<>("a", 1L),
new StringSerializer(), new LongSerializer(), Instant.now());
Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
}


try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
assertNull(
"Closing the prior test driver should have cleaned up this store and value.",
testDriver.getKeyValueStore("storeProcessorStore").get("a")
);
assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"),
"Closing the prior test driver should have cleaned up this store and value.");
}

}
Expand All @@ -1583,16 +1566,16 @@ public void shouldFeedStoreFromGlobalKTable() {
Materialized.as("globalStore"));
try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) {
final KeyValueStore<String, String> globalStore = testDriver.getKeyValueStore("globalStore");
Assert.assertNotNull(globalStore);
Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
assertNotNull(globalStore);
assertNotNull(testDriver.getAllStateStores().get("globalStore"));
testDriver.pipeRecord(
"topic",
new TestRecord<>("k1", "value1"),
new StringSerializer(),
new StringSerializer(),
Instant.now());
// we expect to have both in the global store, the one from pipeInput and the one from the producer
Assert.assertEquals("value1", globalStore.get("k1"));
assertEquals("value1", globalStore.get("k1"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.easymock.EasyMock;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static java.util.Arrays.asList;
import static org.easymock.EasyMock.expect;
Expand All @@ -38,18 +35,14 @@
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.assertNull;

@RunWith(EasyMockRunner.class)
public class KeyValueStoreFacadeTest {
@Mock
private TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore;
@Mock
private KeyValueIterator<String, ValueAndTimestamp<String>> mockedKeyValueTimestampIterator;
private final TimestampedKeyValueStore<String, String> mockedKeyValueTimestampStore = EasyMock.mock(TimestampedKeyValueStore.class);

private KeyValueStoreFacade<String, String> keyValueStoreFacade;

@Before
@BeforeEach
public void setup() {
keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
}
Expand Down
Loading

0 comments on commit 04827da

Please sign in to comment.