diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index 30537c20369f2..24c6904550fab 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogPartitionSpec; @@ -75,8 +76,6 @@ import java.util.LinkedHashMap; import java.util.List; -import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG; -import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -90,6 +89,11 @@ /** Test Hive syntax when Hive dialect is used. */ public class HiveDialectITCase { + private static final String DEFAULT_BUILTIN_CATALOG = + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(); + private static final String DEFAULT_BUILTIN_DATABASE = + TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(); + private TableEnvironment tableEnv; private HiveCatalog hiveCatalog; private String warehouse; diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java index fe06b2bdd5830..ad84edeef179f 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -86,6 +87,11 @@ */ public class HiveCatalogITCase { + private static final String DEFAULT_BUILTIN_CATALOG = + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(); + private static final String DEFAULT_BUILTIN_DATABASE = + TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); private static HiveCatalog hiveCatalog; @@ -472,15 +478,14 @@ public void testCreateTableLike() throws Exception { tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName()); tableEnv.executeSql("create table generic_table (x int) with ('connector'='COLLECTION')"); - tableEnv.useCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG); + tableEnv.useCatalog(DEFAULT_BUILTIN_CATALOG); tableEnv.executeSql( String.format( "create table copy like `%s`.`default`.generic_table", hiveCatalog.getName())); - Catalog builtInCat = tableEnv.getCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG).get(); + Catalog builtInCat = tableEnv.getCatalog(DEFAULT_BUILTIN_CATALOG).get(); CatalogBaseTable catalogTable = - builtInCat.getTable( - new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "copy")); + builtInCat.getTable(new ObjectPath(DEFAULT_BUILTIN_DATABASE, "copy")); assertThat(catalogTable.getOptions()).hasSize(1); assertThat(catalogTable.getOptions()) .containsEntry(FactoryUtil.CONNECTOR.key(), "COLLECTION"); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java index 135cf8f26f29b..ae1a829a9b639 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java @@ -106,7 +106,7 @@ private StreamTableEnvironment createTableEnvironment() { StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment(); - final Executor executor = lookupExecutor(settings.getExecutor(), streamExecEnv); + final Executor executor = lookupExecutor(streamExecEnv); return createStreamTableEnvironment( streamExecEnv, settings, @@ -130,12 +130,7 @@ private StreamTableEnvironment createStreamTableEnvironment( final Planner planner = PlannerFactoryUtil.createPlanner( - settings.getPlanner(), - executor, - tableConfig, - moduleManager, - catalogManager, - functionCatalog); + executor, tableConfig, moduleManager, catalogManager, functionCatalog); return new StreamTableEnvironmentImpl( catalogManager, @@ -149,12 +144,11 @@ private StreamTableEnvironment createStreamTableEnvironment( userClassLoader); } - private Executor lookupExecutor( - String executorIdentifier, StreamExecutionEnvironment executionEnvironment) { + private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) { try { final ExecutorFactory executorFactory = FactoryUtil.discoverFactory( - classLoader, ExecutorFactory.class, executorIdentifier); + classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); final Method createMethod = executorFactory .getClass() diff --git a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java index c9fb822c66d4c..c8df9500c9d13 100644 --- a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java @@ -98,14 +98,12 @@ public AbstractStreamTableEnvironmentImpl( } public static Executor lookupExecutor( - ClassLoader classLoader, - String executorIdentifier, - StreamExecutionEnvironment executionEnvironment) { + ClassLoader classLoader, StreamExecutionEnvironment executionEnvironment) { final ExecutorFactory executorFactory; try { executorFactory = FactoryUtil.discoverFactory( - classLoader, ExecutorFactory.class, executorIdentifier); + classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); } catch (Exception e) { throw new TableException( "Could not instantiate the executor. Make sure a planner module is on the classpath", diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index c6c7d43e222aa..5d66cfaeb70ca 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -116,17 +116,11 @@ public static StreamTableEnvironment create( final FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); - final Executor executor = - lookupExecutor(classLoader, settings.getExecutor(), executionEnvironment); + final Executor executor = lookupExecutor(classLoader, executionEnvironment); final Planner planner = PlannerFactoryUtil.createPlanner( - settings.getPlanner(), - executor, - tableConfig, - moduleManager, - catalogManager, - functionCatalog); + executor, tableConfig, moduleManager, catalogManager, functionCatalog); return new StreamTableEnvironmentImpl( catalogManager, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index cc9a32b703be5..d8a246408c159 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -18,18 +18,11 @@ package org.apache.flink.table.api; -import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; -import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.functions.UserDefinedFunction; -import javax.annotation.Nullable; - import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; @@ -63,12 +56,6 @@ public class EnvironmentSettings { public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog"; public static final String DEFAULT_BUILTIN_DATABASE = "default_database"; - /** Factory identifier of the {@link Planner} to use. */ - private final String planner; - - /** Factory identifier of the {@link Executor} to use. */ - private final String executor; - /** * Specifies the name of the initial catalog to be created when instantiating {@link * TableEnvironment}. @@ -88,13 +75,7 @@ public class EnvironmentSettings { private final boolean isStreamingMode; private EnvironmentSettings( - String planner, - @Nullable String executor, - String builtInCatalogName, - String builtInDatabaseName, - boolean isStreamingMode) { - this.planner = planner; - this.executor = executor; + String builtInCatalogName, String builtInDatabaseName, boolean isStreamingMode) { this.builtInCatalogName = builtInCatalogName; this.builtInDatabaseName = builtInDatabaseName; this.isStreamingMode = isStreamingMode; @@ -180,23 +161,9 @@ public boolean isStreamingMode() { return isStreamingMode; } - /** Returns the identifier of the {@link Planner} to be used. */ - @Internal - public String getPlanner() { - return planner; - } - - /** Returns the {@link Executor} that should submit and execute table programs. */ - @Internal - public String getExecutor() { - return executor; - } - /** A builder for {@link EnvironmentSettings}. */ @PublicEvolving public static class Builder { - private final String planner = PlannerFactory.DEFAULT_IDENTIFIER; - private final String executor = ExecutorFactory.DEFAULT_IDENTIFIER; private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG; private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE; @@ -255,7 +222,7 @@ public Builder withBuiltInDatabaseName(String builtInDatabaseName) { /** Returns an immutable instance of {@link EnvironmentSettings}. */ public EnvironmentSettings build() { return new EnvironmentSettings( - planner, executor, builtInCatalogName, builtInDatabaseName, isStreamingMode); + builtInCatalogName, builtInDatabaseName, isStreamingMode); } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index ea17a9d5e0960..8d6a5a96a4bf5 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -299,17 +299,12 @@ private static TableEnvironmentImpl create( final ExecutorFactory executorFactory = FactoryUtil.discoverFactory( - classLoader, ExecutorFactory.class, settings.getExecutor()); + classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); final Executor executor = executorFactory.create(configuration); final Planner planner = PlannerFactoryUtil.createPlanner( - settings.getPlanner(), - executor, - tableConfig, - moduleManager, - catalogManager, - functionCatalog); + executor, tableConfig, moduleManager, catalogManager, functionCatalog); return new TableEnvironmentImpl( catalogManager, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java index ef64f32bdb984..674c916fc486a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java @@ -35,7 +35,6 @@ public class PlannerFactoryUtil { /** Discovers a planner factory and creates a planner instance. */ public static Planner createPlanner( - String plannerIdentifier, Executor executor, TableConfig tableConfig, ModuleManager moduleManager, @@ -45,7 +44,7 @@ public static Planner createPlanner( FactoryUtil.discoverFactory( Thread.currentThread().getContextClassLoader(), PlannerFactory.class, - plannerIdentifier); + PlannerFactory.DEFAULT_IDENTIFIER); final Context context = new DefaultPlannerContext( diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index f56ba48f10278..a968e49c22ea3 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -317,10 +317,10 @@ object StreamTableEnvironmentImpl { val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager) val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor( - classLoader, settings.getExecutor, executionEnvironment.getWrappedStreamExecutionEnvironment) + classLoader, executionEnvironment.getWrappedStreamExecutionEnvironment) - val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, executor, tableConfig, - moduleManager, catalogManager, functionCatalog) + val planner = PlannerFactoryUtil.createPlanner( + executor, tableConfig, moduleManager, catalogManager, functionCatalog) new StreamTableEnvironmentImpl( catalogManager, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 6cb12cce545f8..0d088d6defbaf 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -26,13 +26,13 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.ExecutionOptions - import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} import org.apache.flink.streaming.api.{TimeCharacteristic, environment} import org.apache.flink.table.api._ +import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.api.config.ExecutionConfigOptions @@ -1549,12 +1549,13 @@ object TestingTableEnvironment { val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, moduleManager) - val executorFactory = - FactoryUtil.discoverFactory(classLoader, classOf[ExecutorFactory], settings.getExecutor) + val executorFactory = FactoryUtil.discoverFactory( + classLoader, classOf[ExecutorFactory], ExecutorFactory.DEFAULT_IDENTIFIER) + val executor = executorFactory.create(tableConfig.getConfiguration) - val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, executor, tableConfig, - moduleManager, catalogMgr, functionCatalog).asInstanceOf[PlannerBase] + val planner = PlannerFactoryUtil.createPlanner( + executor, tableConfig, moduleManager, catalogMgr, functionCatalog).asInstanceOf[PlannerBase] new TestingTableEnvironment( catalogMgr,