Skip to content

Commit

Permalink
[FLINK-26249][test-utils] Refactor BuiltIn(Aggregate)FunctionTestBase…
Browse files Browse the repository at this point in the history
… to JUnit 5 and run tests in parallel

This closes apache#18940.
  • Loading branch information
slinkydeveloper authored and twalthr committed Mar 21, 2022
1 parent a6b586a commit d07fd36
Show file tree
Hide file tree
Showing 16 changed files with 971 additions and 1,021 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
Expand All @@ -35,58 +34,49 @@
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.planner.factories.TableFactoryHarness;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.table.types.DataType.getFieldCount;
import static org.apache.flink.table.test.TableAssertions.assertThat;
import static org.apache.flink.table.types.DataType.getFieldDataTypes;
import static org.assertj.core.api.Assertions.assertThat;

/** Test base for testing aggregate {@link BuiltInFunctionDefinition built-in functions}. */
@RunWith(Parameterized.class)
public class BuiltInAggregateFunctionTestBase {

@ClassRule
public static MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());

@Parameter public TestSpec testSpec;

@Test
public void testFunction() throws Exception {
final TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
Configuration configuration = tEnv.getConfig().getConfiguration();
// see https://issues.apache.org/jira/browse/FLINK-26092
configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
final Table sourceTable = asTable(tEnv, testSpec.sourceRowType, testSpec.sourceRows);

for (final TestItem testItem : testSpec.testItems) {
testItem.execute(tEnv, sourceTable);
}
@Execution(ExecutionMode.CONCURRENT)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(MiniClusterExtension.class)
abstract class BuiltInAggregateFunctionTestBase {

abstract Stream<TestSpec> getTestCaseSpecs();

final Stream<BuiltInFunctionTestBase.TestCase> getTestCases() {
return this.getTestCaseSpecs().flatMap(TestSpec::getTestCases);
}

@ParameterizedTest
@MethodSource("getTestCases")
final void test(BuiltInFunctionTestBase.TestCase testCase) throws Throwable {
testCase.execute();
}

protected static Table asTable(TableEnvironment tEnv, DataType sourceRowType, List<Row> rows) {
Expand Down Expand Up @@ -123,23 +113,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
};
}

protected static void assertRows(List<Row> expectedRows, TableResult tableResult) {
final List<Row> actualRows =
materializeResult(tableResult).stream()
.sorted(Comparator.comparing(Objects::toString))
.collect(Collectors.toList());
final List<Row> sortedExpectedRows =
expectedRows.stream()
.sorted(Comparator.comparing(Objects::toString))
.collect(Collectors.toList());

assertThat(actualRows)
.as(
String.format(
"%n%nExpected:%n%s%n%nActual:%n%s", sortedExpectedRows, actualRows))
.isEqualTo(sortedExpectedRows);
}

private static List<Row> materializeResult(TableResult tableResult) {
try (final CloseableIterator<Row> iterator = tableResult.collect()) {
final List<Row> actualRows = new ArrayList<>();
Expand Down Expand Up @@ -232,6 +205,29 @@ TestSpec testResult(
return this;
}

private Executable createTestItemExecutable(TestItem testItem) {
return () -> {
final TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
Configuration configuration = tEnv.getConfig().getConfiguration();
// see
// https://issues.apache.org/jira/browse/FLINK-26092
configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
final Table sourceTable = asTable(tEnv, sourceRowType, sourceRows);

testItem.execute(tEnv, sourceTable);
};
}

Stream<BuiltInFunctionTestBase.TestCase> getTestCases() {
return testItems.stream()
.map(
testItem ->
new BuiltInFunctionTestBase.TestCase(
testItem.toString(),
createTestItemExecutable(testItem)));
}

@Override
public String toString() {
final StringBuilder bob = new StringBuilder();
Expand Down Expand Up @@ -267,13 +263,15 @@ public void execute(TableEnvironment tEnv, Table sourceTable) {
final DataType actualRowType =
tableResult.getResolvedSchema().toSourceRowDataType();

assertThat(getFieldCount(actualRowType)).isEqualTo(getFieldCount(expectedRowType));
assertThat(getFieldDataTypes(actualRowType))
.isEqualTo(getFieldDataTypes(expectedRowType));
assertThat(actualRowType)
.getChildren()
.containsExactlyElementsOf(getFieldDataTypes(expectedRowType));
}

if (expectedRows != null) {
assertRows(expectedRows, tableResult);
final List<Row> actualRows = materializeResult(tableResult);

assertThat(actualRows).containsExactlyInAnyOrderElementsOf(expectedRows);
}
}

Expand Down
Loading

0 comments on commit d07fd36

Please sign in to comment.