forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-25809][table-api-java] Add table test program infrastructure
- Loading branch information
Showing
11 changed files
with
1,210 additions
and
0 deletions.
There are no files selected for viewing
43 changes: 43 additions & 0 deletions
43
...able-api-java/src/test/java/org/apache/flink/table/test/program/ConfigOptionTestStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.test.program; | ||
|
||
import org.apache.flink.configuration.ConfigOption; | ||
import org.apache.flink.table.api.TableEnvironment; | ||
|
||
/** Test step for setting a {@link ConfigOption}. */ | ||
public final class ConfigOptionTestStep<T> implements TestStep { | ||
|
||
public final ConfigOption<T> option; | ||
public final T value; | ||
|
||
ConfigOptionTestStep(ConfigOption<T> option, T value) { | ||
this.option = option; | ||
this.value = value; | ||
} | ||
|
||
@Override | ||
public TestKind getKind() { | ||
return TestKind.CONFIG; | ||
} | ||
|
||
public void apply(TableEnvironment env) { | ||
env.getConfig().set(option, value); | ||
} | ||
} |
75 changes: 75 additions & 0 deletions
75
...nk-table-api-java/src/test/java/org/apache/flink/table/test/program/FunctionTestStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.test.program; | ||
|
||
import org.apache.flink.table.api.TableEnvironment; | ||
import org.apache.flink.table.functions.UserDefinedFunction; | ||
|
||
/** Test step for registering a (temporary) (system or catalog) function. */ | ||
public final class FunctionTestStep implements TestStep { | ||
|
||
/** Whether function should be temporary or not. */ | ||
enum FunctionPersistence { | ||
TEMPORARY, | ||
PERSISTENT | ||
} | ||
|
||
/** Whether function should be persisted in a catalog or not. */ | ||
enum FunctionBehavior { | ||
SYSTEM, | ||
CATALOG | ||
} | ||
|
||
public final FunctionPersistence persistence; | ||
public final FunctionBehavior behavior; | ||
public final String name; | ||
public final Class<? extends UserDefinedFunction> function; | ||
|
||
FunctionTestStep( | ||
FunctionPersistence persistence, | ||
FunctionBehavior behavior, | ||
String name, | ||
Class<? extends UserDefinedFunction> function) { | ||
this.persistence = persistence; | ||
this.behavior = behavior; | ||
this.name = name; | ||
this.function = function; | ||
} | ||
|
||
@Override | ||
public TestKind getKind() { | ||
return TestKind.FUNCTION; | ||
} | ||
|
||
public void apply(TableEnvironment env) { | ||
if (behavior == FunctionBehavior.SYSTEM) { | ||
if (persistence == FunctionPersistence.TEMPORARY) { | ||
env.createTemporarySystemFunction(name, function); | ||
} else { | ||
throw new UnsupportedOperationException("System functions must be temporary."); | ||
} | ||
} else { | ||
if (persistence == FunctionPersistence.TEMPORARY) { | ||
env.createTemporaryFunction(name, function); | ||
} else { | ||
env.createFunction(name, function); | ||
} | ||
} | ||
} | ||
} |
55 changes: 55 additions & 0 deletions
55
.../flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.test.program; | ||
|
||
import org.apache.flink.types.Row; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.function.Predicate; | ||
|
||
/** Test step for creating a table sink. */ | ||
public final class SinkTestStep extends TableTestStep { | ||
|
||
public final @Nullable Predicate<List<Row>> expectedBeforeRestore; | ||
public final @Nullable Predicate<List<Row>> expectedAfterRestore; | ||
|
||
SinkTestStep( | ||
String name, | ||
List<String> schemaComponents, | ||
List<String> partitionKeys, | ||
Map<String, String> options, | ||
@Nullable Predicate<List<Row>> expectedBeforeRestore, | ||
@Nullable Predicate<List<Row>> expectedAfterRestore) { | ||
super(name, schemaComponents, partitionKeys, options); | ||
this.expectedBeforeRestore = expectedBeforeRestore; | ||
this.expectedAfterRestore = expectedAfterRestore; | ||
} | ||
|
||
@Override | ||
public TestKind getKind() { | ||
return expectedBeforeRestore == null | ||
? TestKind.SINK_WITHOUT_DATA | ||
: expectedAfterRestore == null | ||
? TestKind.SINK_WITH_DATA | ||
: TestKind.SINK_WITH_RESTORE_DATA; | ||
} | ||
} |
52 changes: 52 additions & 0 deletions
52
...link-table-api-java/src/test/java/org/apache/flink/table/test/program/SourceTestStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.test.program; | ||
|
||
import org.apache.flink.types.Row; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** Test step for creating a table source. */ | ||
public final class SourceTestStep extends TableTestStep { | ||
|
||
public final List<Row> dataBeforeRestore; | ||
public final List<Row> dataAfterRestore; | ||
|
||
SourceTestStep( | ||
String name, | ||
List<String> schemaComponents, | ||
List<String> partitionKeys, | ||
Map<String, String> options, | ||
List<Row> dataBeforeRestore, | ||
List<Row> dataAfterRestore) { | ||
super(name, schemaComponents, partitionKeys, options); | ||
this.dataBeforeRestore = dataBeforeRestore; | ||
this.dataAfterRestore = dataAfterRestore; | ||
} | ||
|
||
@Override | ||
public TestKind getKind() { | ||
return dataBeforeRestore.isEmpty() | ||
? TestKind.SOURCE_WITHOUT_DATA | ||
: dataAfterRestore.isEmpty() | ||
? TestKind.SOURCE_WITH_DATA | ||
: TestKind.SOURCE_WITH_RESTORE_DATA; | ||
} | ||
} |
46 changes: 46 additions & 0 deletions
46
...e/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SqlTestStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.test.program; | ||
|
||
import org.apache.flink.table.api.TableEnvironment; | ||
import org.apache.flink.table.api.TableResult; | ||
|
||
/** | ||
* Test step for execution SQL. | ||
* | ||
* <p>Note: Not every runner supports generic SQL statements. Sometimes the runner would like to | ||
* enrich properties e.g. of a CREATE TABLE. Use this step with caution. | ||
*/ | ||
public final class SqlTestStep implements TestStep { | ||
|
||
public final String sql; | ||
|
||
SqlTestStep(String sql) { | ||
this.sql = sql; | ||
} | ||
|
||
@Override | ||
public TestKind getKind() { | ||
return TestKind.SQL; | ||
} | ||
|
||
public TableResult apply(TableEnvironment env) { | ||
return env.executeSql(sql); | ||
} | ||
} |
46 changes: 46 additions & 0 deletions
46
...able-api-java/src/test/java/org/apache/flink/table/test/program/StatementSetTestStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.test.program; | ||
|
||
import org.apache.flink.table.api.StatementSet; | ||
import org.apache.flink.table.api.TableEnvironment; | ||
import org.apache.flink.table.api.TableResult; | ||
|
||
import java.util.List; | ||
|
||
/** Test step for creating a statement set. */ | ||
public final class StatementSetTestStep implements TestStep { | ||
|
||
public final List<String> statements; | ||
|
||
StatementSetTestStep(List<String> statements) { | ||
this.statements = statements; | ||
} | ||
|
||
@Override | ||
public TestKind getKind() { | ||
return TestKind.STATEMENT_SET; | ||
} | ||
|
||
public TableResult apply(TableEnvironment env) { | ||
final StatementSet statementSet = env.createStatementSet(); | ||
statements.forEach(statementSet::addInsertSql); | ||
return statementSet.execute(); | ||
} | ||
} |
Oops, something went wrong.