Skip to content

Commit

Permalink
[FLINK-25920] Improve sink test assertions
Browse files Browse the repository at this point in the history
Use the proper ObjectAssert as the base for CommittableSummaryAssert and CommittableWithLinageAssert.
  • Loading branch information
AHeise committed Sep 17, 2024
1 parent 8ce679a commit ad01d71
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,37 @@

package org.apache.flink.streaming.api.connector.sink2;

import org.assertj.core.api.AbstractAssert;

import static org.assertj.core.api.Assertions.assertThat;
import org.assertj.core.api.AbstractObjectAssert;

/** Custom assertions for {@link CommittableSummary}. */
public class CommittableSummaryAssert
extends AbstractAssert<CommittableSummaryAssert, CommittableSummary<?>> {
extends AbstractObjectAssert<CommittableSummaryAssert, CommittableSummary<?>> {

public CommittableSummaryAssert(CommittableSummary<?> summary) {
super(summary, CommittableSummaryAssert.class);
}

public CommittableSummaryAssert isEqualTo(CommittableSummary<?> summary) {
isNotNull();
assertThat(actual.getSubtaskId()).isEqualTo(summary.getSubtaskId());
assertThat(actual.getCheckpointId()).isEqualTo(summary.getCheckpointId());
assertThat(actual.getNumberOfSubtasks()).isEqualTo(summary.getNumberOfSubtasks());
assertThat(actual.getNumberOfCommittables()).isEqualTo(summary.getNumberOfCommittables());
assertThat(actual.getNumberOfPendingCommittables())
.isEqualTo(summary.getNumberOfPendingCommittables());
assertThat(actual.getNumberOfFailedCommittables())
.isEqualTo(summary.getNumberOfFailedCommittables());
return this;
}

public CommittableSummaryAssert hasSubtaskId(int subtaskId) {
isNotNull();
assertThat(actual.getSubtaskId()).isEqualTo(subtaskId);
return this;
return returns(subtaskId, CommittableSummary::getSubtaskId);
}

public CommittableSummaryAssert hasNumberOfSubtasks(int numberOfSubtasks) {
isNotNull();
assertThat(actual.getNumberOfSubtasks()).isEqualTo(numberOfSubtasks);
return this;
return returns(numberOfSubtasks, CommittableSummary::getNumberOfSubtasks);
}

public CommittableSummaryAssert hasOverallCommittables(int committableNumber) {
isNotNull();
assertThat(actual.getNumberOfCommittables()).isEqualTo(committableNumber);
return this;
return returns(committableNumber, CommittableSummary::getNumberOfCommittables);
}

public CommittableSummaryAssert hasPendingCommittables(int committableNumber) {
isNotNull();
assertThat(actual.getNumberOfPendingCommittables()).isEqualTo(committableNumber);
return this;
return returns(committableNumber, CommittableSummary::getNumberOfPendingCommittables);
}

public CommittableSummaryAssert hasFailedCommittables(int committableNumber) {
isNotNull();
assertThat(actual.getNumberOfFailedCommittables()).isEqualTo(committableNumber);
return this;
return returns(committableNumber, CommittableSummary::getNumberOfFailedCommittables);
}

public CommittableSummaryAssert hasCheckpointId(long checkpointId) {
isNotNull();
assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId);
return this;
return returns(checkpointId, CommittableSummary::getCheckpointIdOrEOI);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,28 @@

package org.apache.flink.streaming.api.connector.sink2;

import org.assertj.core.api.AbstractAssert;

import static org.assertj.core.api.Assertions.assertThat;
import org.assertj.core.api.AbstractObjectAssert;

/**
* Custom assertions for {@link
* org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage}.
*/
public class CommittableWithLinageAssert
extends AbstractAssert<CommittableWithLinageAssert, CommittableWithLineage<?>> {
extends AbstractObjectAssert<CommittableWithLinageAssert, CommittableWithLineage<?>> {

public CommittableWithLinageAssert(CommittableWithLineage<?> summary) {
super(summary, CommittableWithLinageAssert.class);
}

public CommittableWithLinageAssert isEqualTo(CommittableWithLineage<?> committableWithLineage) {
isNotNull();
assertThat(actual.getSubtaskId()).isEqualTo(committableWithLineage.getSubtaskId());
assertThat(actual.getCheckpointIdOrEOI())
.isEqualTo(committableWithLineage.getCheckpointIdOrEOI());
assertThat(actual.getCommittable()).isEqualTo(committableWithLineage.getCommittable());
return this;
}

public CommittableWithLinageAssert hasCommittable(Object committable) {
isNotNull();
assertThat(actual.getCommittable()).isEqualTo(committable);
return this;
return returns(committable, CommittableWithLineage::getCommittable);
}

public CommittableWithLinageAssert hasCheckpointId(long checkpointId) {
isNotNull();
assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId);
return this;
return returns(checkpointId, CommittableWithLineage::getCheckpointIdOrEOI);
}

public CommittableWithLinageAssert hasSubtaskId(int subtaskId) {
isNotNull();
assertThat(actual.getSubtaskId()).isEqualTo(subtaskId);
return this;
return returns(subtaskId, CommittableWithLineage::getSubtaskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.stream.IntStream;

import static org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

class SubtaskCommittableManagerTest {
Expand All @@ -40,10 +41,9 @@ class SubtaskCommittableManagerTest {
void testDrainCommittables() {
final SubtaskCommittableManager<Integer> subtaskCommittableManager =
new SubtaskCommittableManager<>(3, 1, 1L, METRIC_GROUP);
final CommittableWithLineage<Integer> first = new CommittableWithLineage<Integer>(1, 1L, 1);
final CommittableWithLineage<Integer> second =
new CommittableWithLineage<Integer>(2, 1L, 1);
final CommittableWithLineage<Integer> third = new CommittableWithLineage<Integer>(3, 1L, 1);
final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, 1L, 1);
final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, 1L, 1);
final CommittableWithLineage<Integer> third = new CommittableWithLineage<>(3, 1L, 1);

assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(0);

Expand All @@ -68,20 +68,8 @@ void testDrainCommittables() {
final List<CommittableWithLineage<Integer>> committables =
subtaskCommittableManager.drainCommitted();
assertThat(committables).hasSize(2);
assertThat(committables.get(0))
.satisfies(
c -> {
assertThat(c.getSubtaskId()).isEqualTo(1);
assertThat(c.getCommittable()).isEqualTo(1);
assertThat(c.getCheckpointId()).hasValue(1L);
});
assertThat(committables.get(1))
.satisfies(
c -> {
assertThat(c.getSubtaskId()).isEqualTo(1);
assertThat(c.getCommittable()).isEqualTo(2);
assertThat(c.getCheckpointId()).hasValue(1L);
});
assertThat(committables.get(0)).hasSubtaskId(1).hasCommittable(1).hasCheckpointId(1);
assertThat(committables.get(1)).hasSubtaskId(1).hasCommittable(2).hasCheckpointId(1);
assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(0);

// Drain again should not yield anything
Expand Down

0 comments on commit ad01d71

Please sign in to comment.