Skip to content

Commit

Permalink
[FLINK-27185][connector] Convert connector-kafka module to assertj
Browse files Browse the repository at this point in the history
Co-authored-by: slinkydeveloper <[email protected]>
Co-authored-by: Sergey Nuyanzin <[email protected]>
  • Loading branch information
3 people authored and XComp committed Jun 8, 2022
1 parent 99f4511 commit 44f73c4
Show file tree
Hide file tree
Showing 48 changed files with 957 additions and 1,157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for serializing and deserialzing {@link KafkaCommittable} with {@link
Expand All @@ -39,6 +39,6 @@ public void testCommittableSerDe() throws IOException {
final short epoch = 5;
final KafkaCommittable committable = new KafkaCommittable(1L, epoch, transactionalId, null);
final byte[] serialized = SERIALIZER.serialize(committable);
assertEquals(committable, SERIALIZER.deserialize(1, serialized));
assertThat(SERIALIZER.deserialize(1, serialized)).isEqualTo(committable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
Expand Down Expand Up @@ -77,34 +73,33 @@ public void testDoNotAllowMultipleValueSerializer() {

@Test
public void testDoNotAllowMultipleTopicSelector() {
assertThrows(
IllegalStateException.class,
() ->
KafkaRecordSerializationSchema.builder()
.setTopicSelector(e -> DEFAULT_TOPIC)
.setTopic(DEFAULT_TOPIC));
assertThrows(
IllegalStateException.class,
() ->
KafkaRecordSerializationSchema.builder()
.setTopic(DEFAULT_TOPIC)
.setTopicSelector(e -> DEFAULT_TOPIC));
assertThatThrownBy(
() ->
KafkaRecordSerializationSchema.builder()
.setTopicSelector(e -> DEFAULT_TOPIC)
.setTopic(DEFAULT_TOPIC))
.isInstanceOf(IllegalStateException.class);
assertThatThrownBy(
() ->
KafkaRecordSerializationSchema.builder()
.setTopic(DEFAULT_TOPIC)
.setTopicSelector(e -> DEFAULT_TOPIC))
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testExpectTopicSelector() {
assertThrows(
IllegalStateException.class,
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
::build);
assertThatThrownBy(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
::build)
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testExpectValueSerializer() {
assertThrows(
IllegalStateException.class,
KafkaRecordSerializationSchema.builder().setTopic(DEFAULT_TOPIC)::build);
assertThatThrownBy(KafkaRecordSerializationSchema.builder().setTopic(DEFAULT_TOPIC)::build)
.isInstanceOf(IllegalStateException.class);
}

@Test
Expand All @@ -122,14 +117,14 @@ public void testSerializeRecordWithTopicSelector() {
final KafkaRecordSerializationSchema<String> schema =
builder.setValueSerializationSchema(serializationSchema).build();
final ProducerRecord<byte[], byte[]> record = schema.serialize("a", null, null);
assertEquals("topic-a", record.topic());
assertNull(record.key());
assertArrayEquals(serializationSchema.serialize("a"), record.value());
assertThat(record.topic()).isEqualTo("topic-a");
assertThat(record.key()).isNull();
assertThat(record.value()).isEqualTo(serializationSchema.serialize("a"));

final ProducerRecord<byte[], byte[]> record2 = schema.serialize("b", null, null);
assertEquals("topic-b", record2.topic());
assertNull(record2.key());
assertArrayEquals(serializationSchema.serialize("b"), record2.value());
assertThat(record2.topic()).isEqualTo("topic-b");
assertThat(record2.key()).isNull();
assertThat(record2.value()).isEqualTo(serializationSchema.serialize("b"));
}

@Test
Expand All @@ -147,8 +142,8 @@ public void testSerializeRecordWithPartitioner() throws Exception {
final KafkaRecordSerializationSchema.KafkaSinkContext sinkContext = new TestSinkContext();
schema.open(null, sinkContext);
final ProducerRecord<byte[], byte[]> record = schema.serialize("a", sinkContext, null);
assertEquals(partition, record.partition());
assertTrue(opened.get());
assertThat(record.partition()).isEqualTo(partition);
assertThat(opened.get()).isTrue();
}

@Test
Expand All @@ -161,8 +156,9 @@ public void testSerializeRecordWithKey() {
.setKeySerializationSchema(serializationSchema)
.build();
final ProducerRecord<byte[], byte[]> record = schema.serialize("a", null, null);
assertArrayEquals(record.key(), serializationSchema.serialize("a"));
assertArrayEquals(record.value(), serializationSchema.serialize("a"));
assertThat(serializationSchema.serialize("a"))
.isEqualTo(record.key())
.isEqualTo(record.value());
}

@Test
Expand All @@ -177,9 +173,9 @@ public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception
.setKafkaKeySerializer(SimpleStringSerializer.class, config)
.build();
open(schema);
assertEquals(configuration, config);
assertTrue(isKeySerializer);
assertTrue(configurableConfiguration.isEmpty());
assertThat(config).isEqualTo(configuration);
assertThat(isKeySerializer).isTrue();
assertThat(configurableConfiguration).isEmpty();
}

@Test
Expand All @@ -191,9 +187,9 @@ public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exceptio
.setKafkaValueSerializer(SimpleStringSerializer.class, config)
.build();
open(schema);
assertEquals(configuration, config);
assertFalse(isKeySerializer);
assertTrue(configurableConfiguration.isEmpty());
assertThat(config).isEqualTo(configuration);
assertThat(isKeySerializer).isFalse();
assertThat(configurableConfiguration).isEmpty();
}

@Test
Expand All @@ -205,11 +201,11 @@ public void testSerializeRecordWithKafkaSerializer() throws Exception {
.setKafkaValueSerializer(ConfigurableStringSerializer.class, config)
.build();
open(schema);
assertEquals(configurableConfiguration, config);
assertTrue(configuration.isEmpty());
assertThat(config).isEqualTo(configurableConfiguration);
assertThat(configuration).isEmpty();
final Deserializer<String> deserializer = new StringDeserializer();
final ProducerRecord<byte[], byte[]> record = schema.serialize("a", null, null);
assertEquals("a", deserializer.deserialize(DEFAULT_TOPIC, record.value()));
assertThat(deserializer.deserialize(DEFAULT_TOPIC, record.value())).isEqualTo("a");
}

@Test
Expand All @@ -223,19 +219,19 @@ public void testSerializeRecordWithTimestamp() {
.build();
final ProducerRecord<byte[], byte[]> recordWithTimestamp =
schema.serialize("a", null, 100L);
assertEquals(100L, (long) recordWithTimestamp.timestamp());
assertThat((long) recordWithTimestamp.timestamp()).isEqualTo(100L);

final ProducerRecord<byte[], byte[]> recordWithTimestampZero =
schema.serialize("a", null, 0L);
assertEquals(0L, (long) recordWithTimestampZero.timestamp());
assertThat((long) recordWithTimestampZero.timestamp()).isEqualTo(0L);

final ProducerRecord<byte[], byte[]> recordWithoutTimestamp =
schema.serialize("a", null, null);
assertNull(recordWithoutTimestamp.timestamp());
assertThat(recordWithoutTimestamp.timestamp()).isNull();

final ProducerRecord<byte[], byte[]> recordWithInvalidTimestamp =
schema.serialize("a", null, -100L);
assertNull(recordWithInvalidTimestamp.timestamp());
assertThat(recordWithInvalidTimestamp.timestamp()).isNull();
}

private static void assertOnlyOneSerializerAllowed(
Expand All @@ -255,7 +251,8 @@ private static void assertOnlyOneSerializerAllowed(
KafkaRecordSerializationSchemaBuilder<String>,
KafkaRecordSerializationSchemaBuilder<String>>
updater : serializers) {
assertThrows(IllegalStateException.class, () -> updater.apply(builder));
assertThatThrownBy(() -> updater.apply(builder))
.isInstanceOf(IllegalStateException.class);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import java.util.Properties;
import java.util.function.Consumer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link KafkaSinkBuilder}. */
public class KafkaSinkBuilderTest extends TestLogger {
Expand All @@ -46,13 +45,13 @@ public void testPropertyHandling() {
validateProducerConfig(
getBasicBuilder(),
p -> {
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
});

validateProducerConfig(
getBasicBuilder().setProperty("k1", "v1"),
p -> {
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
p.containsKey("k1");
});

Expand All @@ -63,8 +62,8 @@ public void testPropertyHandling() {
validateProducerConfig(
getBasicBuilder().setKafkaProducerConfig(testConf),
p -> {
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
testConf.forEach((k, v) -> assertEquals(v, p.get(k)));
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
testConf.forEach((k, v) -> assertThat(p.get(k)).isEqualTo(v));
});

validateProducerConfig(
Expand All @@ -73,9 +72,8 @@ public void testPropertyHandling() {
.setKafkaProducerConfig(testConf)
.setProperty("k2", "correct"),
p -> {
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
assertEquals("v1", p.get("k1"));
assertEquals("correct", p.get("k2"));
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
assertThat(p).containsEntry("k1", "v1").containsEntry("k2", "correct");
});
}

Expand All @@ -86,9 +84,7 @@ public void testBootstrapServerSetting() {

validateProducerConfig(
getNoServerBuilder().setKafkaProducerConfig(testConf1),
p -> {
Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
});
p -> assertThat(p).containsKeys(DEFAULT_KEYS));
}

private void validateProducerConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,8 @@

import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/** Tests for using KafkaSink writing to a Kafka cluster. */
public class KafkaSinkITCase extends TestLogger {
Expand Down Expand Up @@ -223,8 +218,7 @@ public void testRecoveryWithAtLeastOnceGuarantee() throws Exception {
testRecoveryWithAssertion(
DeliveryGuarantee.AT_LEAST_ONCE,
1,
(records) ->
assertThat(records, hasItems(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)));
(records) -> assertThat(records).contains(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L));
}

@Test
Expand All @@ -233,12 +227,11 @@ public void testRecoveryWithExactlyOnceGuarantee() throws Exception {
DeliveryGuarantee.EXACTLY_ONCE,
1,
(records) ->
assertThat(
records,
contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
assertThat(records)
.contains(
(LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray())));
.toArray(Long[]::new))));
}

@Test
Expand All @@ -247,12 +240,11 @@ public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints() throw
DeliveryGuarantee.EXACTLY_ONCE,
2,
(records) ->
assertThat(
records,
contains(
assertThat(records)
.contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray())));
.toArray(Long[]::new)));
}

@Test
Expand All @@ -271,9 +263,8 @@ public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
try {
executeWithMapper(new FailAsyncCheckpointMapper(1), config, "firstPrefix");
} catch (Exception e) {
assertThat(
e.getCause().getCause().getMessage(),
containsString("Exceeded checkpoint tolerable failure"));
assertThat(e.getCause().getCause().getMessage())
.contains("Exceeded checkpoint tolerable failure");
}
final File completedCheckpoint = TestUtils.getMostRecentCompletedCheckpoint(checkpointDir);

Expand All @@ -286,12 +277,11 @@ public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix");
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, true);
assertThat(
deserializeValues(collectedRecords),
contains(
assertThat(deserializeValues(collectedRecords))
.contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray()));
.toArray(Long[]::new));
}

@Test
Expand All @@ -302,11 +292,10 @@ public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce
try {
executeWithMapper(new FailAsyncCheckpointMapper(0), config, null);
} catch (Exception e) {
assertThat(
e.getCause().getCause().getMessage(),
containsString("Exceeded checkpoint tolerable failure"));
assertThat(e.getCause().getCause().getMessage())
.contains("Exceeded checkpoint tolerable failure");
}
assertTrue(deserializeValues(drainAllRecordsFromTopic(topic, true)).isEmpty());
assertThat(deserializeValues(drainAllRecordsFromTopic(topic, true))).isEmpty();

// Second job aborts all transactions from previous runs with higher parallelism
config.set(CoreOptions.DEFAULT_PARALLELISM, 1);
Expand All @@ -315,12 +304,11 @@ public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce
new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, null);
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, true);
assertThat(
deserializeValues(collectedRecords),
contains(
assertThat(deserializeValues(collectedRecords))
.contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray()));
.toArray(Long[]::new));
}

private void executeWithMapper(
Expand Down Expand Up @@ -408,10 +396,9 @@ private void writeRecordsToKafka(
drainAllRecordsFromTopic(
topic, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
final long recordsCount = expectedRecords.get().get();
assertEquals(collectedRecords.size(), recordsCount);
assertThat(
deserializeValues(collectedRecords),
contains(LongStream.range(1, recordsCount + 1).boxed().toArray()));
assertThat(recordsCount).isEqualTo(collectedRecords.size());
assertThat(deserializeValues(collectedRecords))
.contains(LongStream.range(1, recordsCount + 1).boxed().toArray(Long[]::new));
checkProducerLeak();
}

Expand Down
Loading

0 comments on commit 44f73c4

Please sign in to comment.