Skip to content

Commit

Permalink
[hotfix] Adapt the SinkTestStep to give plain access to expected results
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Oct 28, 2023
1 parent a5b4e60 commit aa7519b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,69 @@

import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/** Test step for creating a table sink. */
public final class SinkTestStep extends TableTestStep {

public final @Nullable Predicate<List<Row>> expectedBeforeRestore;
public final @Nullable Predicate<List<Row>> expectedAfterRestore;
public final @Nullable List<Row> expectedBeforeRestore;
public final @Nullable List<Row> expectedAfterRestore;
public final @Nullable List<String> expectedBeforeRestoreStrings;
public final @Nullable List<String> expectedAfterRestoreStrings;

SinkTestStep(
String name,
List<String> schemaComponents,
List<String> partitionKeys,
Map<String, String> options,
@Nullable Predicate<List<Row>> expectedBeforeRestore,
@Nullable Predicate<List<Row>> expectedAfterRestore) {
@Nullable List<Row> expectedBeforeRestore,
@Nullable List<Row> expectedAfterRestore,
@Nullable List<String> expectedBeforeRestoreStrings,
@Nullable List<String> expectedAfterRestoreStrings) {
super(name, schemaComponents, partitionKeys, options);
if (expectedBeforeRestore != null && expectedAfterRestoreStrings != null) {
throw new IllegalArgumentException(
"You can not mix Row/String representation in before/after restore data.");
}
if (expectedBeforeRestoreStrings != null && expectedAfterRestore != null) {
throw new IllegalArgumentException(
"You can not mix Row/String representation in before/after restore data.");
}
this.expectedBeforeRestore = expectedBeforeRestore;
this.expectedAfterRestore = expectedAfterRestore;
this.expectedBeforeRestoreStrings = expectedBeforeRestoreStrings;
this.expectedAfterRestoreStrings = expectedAfterRestoreStrings;
}

public List<String> getExpectedBeforeRestoreAsStrings() {
if (expectedBeforeRestoreStrings != null) {
return expectedBeforeRestoreStrings;
}

if (expectedBeforeRestore != null) {
return expectedBeforeRestore.stream().map(Row::toString).collect(Collectors.toList());
}

return null;
}

public List<String> getExpectedAfterRestoreAsStrings() {
if (expectedAfterRestoreStrings != null) {
return expectedAfterRestoreStrings;
}

if (expectedAfterRestore != null) {
return expectedAfterRestore.stream().map(Row::toString).collect(Collectors.toList());
}

return null;
}

@Override
public TestKind getKind() {
return expectedBeforeRestore == null
return expectedBeforeRestore == null && expectedBeforeRestoreStrings == null
? TestKind.SINK_WITHOUT_DATA
: expectedAfterRestore == null
: expectedAfterRestore == null && expectedAfterRestoreStrings == null
? TestKind.SINK_WITH_DATA
: TestKind.SINK_WITH_RESTORE_DATA;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -378,8 +375,11 @@ public Builder complete() {
/** Builder pattern for {@link SinkTestStep}. */
public static class SinkBuilder extends TableBuilder<SinkBuilder> {

private Predicate<List<Row>> expectedBeforeRestore;
private Predicate<List<Row>> expectedAfterRestore;
private List<Row> expectedBeforeRestore;
private List<Row> expectedAfterRestore;

private List<String> expectedBeforeRestoreStrings;
private List<String> expectedAfterRestoreStrings;

private SinkBuilder(String name, List<TestStep> targetSteps, Builder rootBuilder) {
super(name, targetSteps, rootBuilder);
Expand All @@ -394,46 +394,25 @@ public SinkBuilder withExpectedValues(String... expectedRows) {
}

public SinkBuilder withValuesBeforeRestore(Row... expectedRows) {
this.expectedBeforeRestore = equalIgnoringOrder(expectedRows);
this.expectedBeforeRestore = Arrays.asList(expectedRows);
return this;
}

public SinkBuilder withValuesBeforeRestore(String... expectedRows) {
this.expectedBeforeRestore = equalIgnoringOrder(expectedRows);
this.expectedBeforeRestoreStrings = Arrays.asList(expectedRows);
return this;
}

public SinkBuilder withValuesAfterRestore(Row... expectedRows) {
this.expectedAfterRestore = equalIgnoringOrder(expectedRows);
this.expectedAfterRestore = Arrays.asList(expectedRows);
return this;
}

public SinkBuilder withValuesAfterRestore(String... expectedRows) {
this.expectedAfterRestore = equalIgnoringOrder(expectedRows);
this.expectedAfterRestoreStrings = Arrays.asList(expectedRows);
return this;
}

private static Predicate<List<Row>> equalIgnoringOrder(Row... expectedRows) {
return (actualRows) -> {
if (actualRows.size() != expectedRows.length) {
return false;
}
return CollectionUtils.isEqualCollection(actualRows, Arrays.asList(expectedRows));
};
}

private static Predicate<List<Row>> equalIgnoringOrder(String... expectedRows) {
return (actualRows) -> {
if (actualRows.size() != expectedRows.length) {
return false;
}
final List<String> actualRowsString =
actualRows.stream().map(Row::toString).collect(Collectors.toList());
return CollectionUtils.isEqualCollection(
actualRowsString, Arrays.asList(expectedRows));
};
}

public Builder complete() {
targetSteps.add(
new SinkTestStep(
Expand All @@ -442,7 +421,9 @@ public Builder complete() {
partitionKeys,
options,
expectedBeforeRestore,
expectedAfterRestore));
expectedAfterRestore,
expectedBeforeRestoreStrings,
expectedAfterRestoreStrings));
return rootBuilder;
}
}
Expand Down

0 comments on commit aa7519b

Please sign in to comment.