diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java index e02e838aeb31b..ed6ea6440ee21 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java @@ -18,64 +18,37 @@ package org.apache.flink.streaming.api.connector.sink2; -import org.assertj.core.api.AbstractAssert; - -import static org.assertj.core.api.Assertions.assertThat; +import org.assertj.core.api.AbstractObjectAssert; /** Custom assertions for {@link CommittableSummary}. */ public class CommittableSummaryAssert - extends AbstractAssert> { + extends AbstractObjectAssert> { public CommittableSummaryAssert(CommittableSummary summary) { super(summary, CommittableSummaryAssert.class); } - public CommittableSummaryAssert isEqualTo(CommittableSummary summary) { - isNotNull(); - assertThat(actual.getSubtaskId()).isEqualTo(summary.getSubtaskId()); - assertThat(actual.getCheckpointId()).isEqualTo(summary.getCheckpointId()); - assertThat(actual.getNumberOfSubtasks()).isEqualTo(summary.getNumberOfSubtasks()); - assertThat(actual.getNumberOfCommittables()).isEqualTo(summary.getNumberOfCommittables()); - assertThat(actual.getNumberOfPendingCommittables()) - .isEqualTo(summary.getNumberOfPendingCommittables()); - assertThat(actual.getNumberOfFailedCommittables()) - .isEqualTo(summary.getNumberOfFailedCommittables()); - return this; - } - public CommittableSummaryAssert hasSubtaskId(int subtaskId) { - isNotNull(); - assertThat(actual.getSubtaskId()).isEqualTo(subtaskId); - return this; + return returns(subtaskId, CommittableSummary::getSubtaskId); } public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) { - isNotNull(); - assertThat(actual.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks); - return this; + return returns(numberOfSubtasks, CommittableSummary::getNumberOfSubtasks); } public CommittableSummaryAssert hasOverallCommittables(int committableNumber) { - isNotNull(); - assertThat(actual.getNumberOfCommittables()).isEqualTo(committableNumber); - return this; + return returns(committableNumber, CommittableSummary::getNumberOfCommittables); } public CommittableSummaryAssert hasPendingCommittables(int committableNumber) { - isNotNull(); - assertThat(actual.getNumberOfPendingCommittables()).isEqualTo(committableNumber); - return this; + return returns(committableNumber, CommittableSummary::getNumberOfPendingCommittables); } public CommittableSummaryAssert hasFailedCommittables(int committableNumber) { - isNotNull(); - assertThat(actual.getNumberOfFailedCommittables()).isEqualTo(committableNumber); - return this; + return returns(committableNumber, CommittableSummary::getNumberOfFailedCommittables); } public CommittableSummaryAssert hasCheckpointId(long checkpointId) { - isNotNull(); - assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId); - return this; + return returns(checkpointId, CommittableSummary::getCheckpointIdOrEOI); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java index 0937c7454d937..853fe6235d7ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java @@ -18,45 +18,28 @@ package org.apache.flink.streaming.api.connector.sink2; -import org.assertj.core.api.AbstractAssert; - -import static org.assertj.core.api.Assertions.assertThat; +import org.assertj.core.api.AbstractObjectAssert; /** * Custom assertions for {@link * org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage}. */ public class CommittableWithLinageAssert - extends AbstractAssert> { + extends AbstractObjectAssert> { public CommittableWithLinageAssert(CommittableWithLineage summary) { super(summary, CommittableWithLinageAssert.class); } - public CommittableWithLinageAssert isEqualTo(CommittableWithLineage committableWithLineage) { - isNotNull(); - assertThat(actual.getSubtaskId()).isEqualTo(committableWithLineage.getSubtaskId()); - assertThat(actual.getCheckpointIdOrEOI()) - .isEqualTo(committableWithLineage.getCheckpointIdOrEOI()); - assertThat(actual.getCommittable()).isEqualTo(committableWithLineage.getCommittable()); - return this; - } - public CommittableWithLinageAssert hasCommittable(Object committable) { - isNotNull(); - assertThat(actual.getCommittable()).isEqualTo(committable); - return this; + return returns(committable, CommittableWithLineage::getCommittable); } public CommittableWithLinageAssert hasCheckpointId(long checkpointId) { - isNotNull(); - assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId); - return this; + return returns(checkpointId, CommittableWithLineage::getCheckpointIdOrEOI); } public CommittableWithLinageAssert hasSubtaskId(int subtaskId) { - isNotNull(); - assertThat(actual.getSubtaskId()).isEqualTo(subtaskId); - return this; + return returns(subtaskId, CommittableWithLineage::getSubtaskId); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java index dd4fdf91d6e0c..7c252e9dee308 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.stream.IntStream; +import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat; class SubtaskCommittableManagerTest { @@ -40,10 +41,9 @@ class SubtaskCommittableManagerTest { void testDrainCommittables() { final SubtaskCommittableManager subtaskCommittableManager = new SubtaskCommittableManager<>(3, 1, 1L, METRIC_GROUP); - final CommittableWithLineage first = new CommittableWithLineage(1, 1L, 1); - final CommittableWithLineage second = - new CommittableWithLineage(2, 1L, 1); - final CommittableWithLineage third = new CommittableWithLineage(3, 1L, 1); + final CommittableWithLineage first = new CommittableWithLineage<>(1, 1L, 1); + final CommittableWithLineage second = new CommittableWithLineage<>(2, 1L, 1); + final CommittableWithLineage third = new CommittableWithLineage<>(3, 1L, 1); assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(0); @@ -68,20 +68,8 @@ void testDrainCommittables() { final List> committables = subtaskCommittableManager.drainCommitted(); assertThat(committables).hasSize(2); - assertThat(committables.get(0)) - .satisfies( - c -> { - assertThat(c.getSubtaskId()).isEqualTo(1); - assertThat(c.getCommittable()).isEqualTo(1); - assertThat(c.getCheckpointId()).hasValue(1L); - }); - assertThat(committables.get(1)) - .satisfies( - c -> { - assertThat(c.getSubtaskId()).isEqualTo(1); - assertThat(c.getCommittable()).isEqualTo(2); - assertThat(c.getCheckpointId()).hasValue(1L); - }); + assertThat(committables.get(0)).hasSubtaskId(1).hasCommittable(1).hasCheckpointId(1); + assertThat(committables.get(1)).hasSubtaskId(1).hasCommittable(2).hasCheckpointId(1); assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(0); // Drain again should not yield anything