Skip to content

Commit

Permalink
[FLINK-33024][table-planner][JUnit5 Migration] Module: flink-table-pl…
Browse files Browse the repository at this point in the history
…anner (JsonPlanTestBase) (apache#23353)
  • Loading branch information
Jiabao-Sun authored Sep 20, 2023
1 parent 50939cd commit 5e3abe2
Show file tree
Hide file tree
Showing 31 changed files with 258 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.testutils.junit.utils.TempDirUtils;

import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
Expand All @@ -47,15 +48,16 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link CompiledPlan} and related {@link TableEnvironment} methods. */
public class CompiledPlanITCase extends JsonPlanTestBase {
class CompiledPlanITCase extends JsonPlanTestBase {

private static final List<String> DATA =
Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
private static final String[] COLUMNS_DEFINITION =
new String[] {"a bigint", "b int", "c varchar"};

@Before
public void setup() throws Exception {
@BeforeEach
@Override
protected void setup() throws Exception {
super.setup();

String srcTableDdl =
Expand All @@ -76,7 +78,7 @@ public void setup() throws Exception {
}

@Test
public void testCompilePlanSql() throws IOException {
void testCompilePlanSql() throws IOException {
CompiledPlan compiledPlan =
tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable");
String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out");
Expand All @@ -92,7 +94,7 @@ public void testCompilePlanSql() throws IOException {
}

@Test
public void testExecutePlanSql() throws Exception {
void testExecutePlanSql() throws Exception {
File sinkPath = createSourceSinkTables();

tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM src").execute().await();
Expand All @@ -101,10 +103,10 @@ public void testExecutePlanSql() throws Exception {
}

@Test
public void testExecuteCtasPlanSql() throws Exception {
void testExecuteCtasPlanSql() throws Exception {
createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);

File sinkPath = TEMPORARY_FOLDER.newFolder();
File sinkPath = TempDirUtils.newFolder(tempFolder);
assertThatThrownBy(
() ->
tableEnv.compilePlanSql(
Expand All @@ -125,7 +127,7 @@ public void testExecuteCtasPlanSql() throws Exception {
}

@Test
public void testExecutePlanTable() throws Exception {
void testExecutePlanTable() throws Exception {
File sinkPath = createSourceSinkTables();

tableEnv.from("src").select($("*")).insertInto("sink").compilePlan().execute().await();
Expand All @@ -134,8 +136,9 @@ public void testExecutePlanTable() throws Exception {
}

@Test
public void testCompileWriteToFileAndThenExecuteSql() throws Exception {
Path planPath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
void testCompileWriteToFileAndThenExecuteSql() throws Exception {
Path planPath =
Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json");

File sinkPath = createSourceSinkTables();

Expand All @@ -148,8 +151,9 @@ public void testCompileWriteToFileAndThenExecuteSql() throws Exception {
}

@Test
public void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Exception {
Path planPath = Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json");
void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Exception {
Path planPath =
Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json");

File sinkPath = createSourceSinkTables();

Expand All @@ -165,9 +169,9 @@ public void testCompileWriteToFilePathWithSchemeAndThenExecuteSql() throws Excep
}

@Test
public void testCompilePlan() throws Exception {
void testCompilePlan() throws Exception {
Path planPath =
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();

File sinkPath = createSourceSinkTables();
Expand Down Expand Up @@ -195,9 +199,9 @@ public void testCompilePlan() throws Exception {
}

@Test
public void testCompilePlanWithStatementSet() throws Exception {
void testCompilePlanWithStatementSet() throws Exception {
Path planPath =
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();

createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
Expand Down Expand Up @@ -226,9 +230,9 @@ public void testCompilePlanWithStatementSet() throws Exception {
}

@Test
public void testCompilePlanIfNotExists() throws Exception {
void testCompilePlanIfNotExists() throws Exception {
Path planPath =
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();

File sinkPath = createSourceSinkTables();
Expand Down Expand Up @@ -256,11 +260,14 @@ public void testCompilePlanIfNotExists() throws Exception {
}

@Test
public void testCompilePlanOverwrite() throws Exception {
void testCompilePlanOverwrite() throws Exception {
tableEnv.getConfig().set(TableConfigOptions.PLAN_FORCE_RECOMPILE, true);

Path planPath =
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
Paths.get(
URI.create(TempDirUtils.newFolder(tempFolder, "plan").getPath())
.getPath(),
"plan.json")
.toAbsolutePath();

List<String> expectedData =
Expand Down Expand Up @@ -297,9 +304,9 @@ public void testCompilePlanOverwrite() throws Exception {
}

@Test
public void testCompileAndExecutePlan() throws Exception {
void testCompileAndExecutePlan() throws Exception {
Path planPath =
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();

File sinkPath = createSourceSinkTables();
Expand All @@ -316,9 +323,9 @@ public void testCompileAndExecutePlan() throws Exception {
}

@Test
public void testCompileAndExecutePlanWithStatementSet() throws Exception {
void testCompileAndExecutePlanWithStatementSet() throws Exception {
Path planPath =
Paths.get(URI.create(getTempDirPath("plan")).getPath(), "plan.json")
Paths.get(TempDirUtils.newFolder(tempFolder, "plan").getPath(), "plan.json")
.toAbsolutePath();

createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
Expand All @@ -344,7 +351,7 @@ public void testCompileAndExecutePlanWithStatementSet() throws Exception {
}

@Test
public void testExplainPlan() throws IOException {
void testExplainPlan() throws IOException {
String planFromResources =
JsonTestUtils.setFlinkVersion(
JsonTestUtils.readFromResource("/jsonplan/testGetJsonPlan.out"),
Expand All @@ -360,7 +367,7 @@ public void testExplainPlan() throws IOException {
}

@Test
public void testPersistedConfigOption() throws Exception {
void testPersistedConfigOption() throws Exception {
List<String> data =
Stream.concat(
DATA.stream(),
Expand Down Expand Up @@ -397,7 +404,7 @@ public void testPersistedConfigOption() throws Exception {
}

@Test
public void testBatchMode() {
void testBatchMode() {
tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

String srcTableDdl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/** Test for calc json plan. */
public class CalcJsonPlanITCase extends JsonPlanTestBase {
class CalcJsonPlanITCase extends JsonPlanTestBase {

@Test
public void testSimpleCalc() throws Exception {
void testSimpleCalc() throws Exception {
List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
File sinkPath =
Expand All @@ -55,7 +55,7 @@ public void testSimpleCalc() throws Exception {
}

@Test
public void testCalcWithUdf() throws Exception {
void testCalcWithUdf() throws Exception {
tableEnv.createTemporaryFunction("udf1", new JavaFunc0());
tableEnv.createTemporarySystemFunction("udf2", new JavaFunc2());
tableEnv.createFunction("udf3", UdfWithOpen.class);
Expand Down Expand Up @@ -86,7 +86,7 @@ public void testCalcWithUdf() throws Exception {
}

@Test
public void testProjectPushDown() throws Exception {
void testProjectPushDown() throws Exception {
List<String> data = Arrays.asList("1,1,hi", "2,1,hello", "3,2,hello world");
createTestCsvSourceTable("MyTable", data, "a bigint", "b int not null", "c varchar");
File sinkPath = createTestCsvSinkTable("MySink", "b int", "a bigint", "a1 varchar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Integration tests for operations on changelog source, including upsert source. */
public class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {
class ChangelogSourceJsonPlanITCase extends JsonPlanTestBase {

@Test
public void testChangelogSource() throws Exception {
void testChangelogSource() throws Exception {
registerChangelogSource();
createTestNonInsertOnlyValuesSinkTable(
"user_sink",
Expand All @@ -56,7 +56,7 @@ public void testChangelogSource() throws Exception {
}

@Test
public void testToUpsertSource() throws Exception {
void testToUpsertSource() throws Exception {
registerUpsertSource();
createTestNonInsertOnlyValuesSinkTable(
"user_sink",
Expand All @@ -79,7 +79,7 @@ public void testToUpsertSource() throws Exception {

// ------------------------------------------------------------------------------------------

public void registerChangelogSource() {
protected void registerChangelogSource() {
Map<String, String> properties = new HashMap<>();
properties.put("changelog-mode", "I,UA,UB,D");
createTestValuesSourceTable(
Expand All @@ -95,7 +95,7 @@ public void registerChangelogSource() {
properties);
}

public void registerUpsertSource() {
protected void registerUpsertSource() {
Map<String, String> properties = new HashMap<>();
properties.put("changelog-mode", "I,UA,D");
createTestValuesSourceTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -39,10 +39,10 @@
* Tests for configuring operator-level state TTL via {@link
* org.apache.flink.table.api.CompiledPlan}.
*/
public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {

@Test
public void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
String dataId =
TestValuesTableFactory.registerRowData(
Arrays.asList(
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testDifferentStateTtlForDifferentOneInputOperator() throws Exception
}

@Test
public void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
String leftTableDataId =
TestValuesTableFactory.registerRowData(
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;

import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

/** Integration tests for correlate. */
public class CorrelateJsonPlanITCase extends JsonPlanTestBase {
class CorrelateJsonPlanITCase extends JsonPlanTestBase {

@Before
public void before() {
@BeforeEach
void before() {
List<Row> data = Collections.singletonList(Row.of("1,1,hi"));
createTestValuesSourceTable("MyTable", data, "a varchar");
}

@Test
public void testSystemFuncByObject() throws ExecutionException, InterruptedException {
void testSystemFuncByObject() throws ExecutionException, InterruptedException {
tableEnv.createTemporarySystemFunction(
"STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
Expand All @@ -53,7 +53,7 @@ public void testSystemFuncByObject() throws ExecutionException, InterruptedExcep
}

@Test
public void testSystemFuncByClass() throws ExecutionException, InterruptedException {
void testSystemFuncByClass() throws ExecutionException, InterruptedException {
tableEnv.createTemporarySystemFunction(
"STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class);
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
Expand All @@ -65,7 +65,7 @@ public void testSystemFuncByClass() throws ExecutionException, InterruptedExcept
}

@Test
public void testTemporaryFuncByObject() throws ExecutionException, InterruptedException {
void testTemporaryFuncByObject() throws ExecutionException, InterruptedException {
tableEnv.createTemporaryFunction(
"STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
Expand All @@ -77,7 +77,7 @@ public void testTemporaryFuncByObject() throws ExecutionException, InterruptedEx
}

@Test
public void testTemporaryFuncByClass() throws ExecutionException, InterruptedException {
void testTemporaryFuncByClass() throws ExecutionException, InterruptedException {
tableEnv.createTemporaryFunction(
"STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class);
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
Expand All @@ -89,7 +89,7 @@ public void testTemporaryFuncByClass() throws ExecutionException, InterruptedExc
}

@Test
public void testFilter() throws ExecutionException, InterruptedException {
void testFilter() throws ExecutionException, InterruptedException {
tableEnv.createTemporarySystemFunction(
"STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit());
createTestValuesSinkTable("MySink", "a STRING", "b STRING");
Expand All @@ -103,7 +103,7 @@ public void testFilter() throws ExecutionException, InterruptedException {
}

@Test
public void testUnnest() throws ExecutionException, InterruptedException {
void testUnnest() throws ExecutionException, InterruptedException {
List<Row> data =
Collections.singletonList(
Row.of("Bob", new Row[] {Row.of("1"), Row.of("2"), Row.of("3")}));
Expand Down
Loading

0 comments on commit 5e3abe2

Please sign in to comment.