Skip to content

Commit

Permalink
[FLINK-26421][table] Remove planner & executor string identifiers
Browse files Browse the repository at this point in the history
Remove planner&executor string identifiers from `EnvironmentSettings`
and use the default strings which are only used anyway.
  • Loading branch information
matriv authored and twalthr committed Mar 15, 2022
1 parent 1d2f1e7 commit 9e3e51d
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
Expand Down Expand Up @@ -75,8 +76,6 @@
import java.util.LinkedHashMap;
import java.util.List;

import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand All @@ -90,6 +89,11 @@
/** Test Hive syntax when Hive dialect is used. */
public class HiveDialectITCase {

private static final String DEFAULT_BUILTIN_CATALOG =
TableConfigOptions.TABLE_CATALOG_NAME.defaultValue();
private static final String DEFAULT_BUILTIN_DATABASE =
TableConfigOptions.TABLE_DATABASE_NAME.defaultValue();

private TableEnvironment tableEnv;
private HiveCatalog hiveCatalog;
private String warehouse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -86,6 +87,11 @@
*/
public class HiveCatalogITCase {

private static final String DEFAULT_BUILTIN_CATALOG =
TableConfigOptions.TABLE_CATALOG_NAME.defaultValue();
private static final String DEFAULT_BUILTIN_DATABASE =
TableConfigOptions.TABLE_DATABASE_NAME.defaultValue();

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

private static HiveCatalog hiveCatalog;
Expand Down Expand Up @@ -472,15 +478,14 @@ public void testCreateTableLike() throws Exception {
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
tableEnv.executeSql("create table generic_table (x int) with ('connector'='COLLECTION')");
tableEnv.useCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG);
tableEnv.useCatalog(DEFAULT_BUILTIN_CATALOG);
tableEnv.executeSql(
String.format(
"create table copy like `%s`.`default`.generic_table",
hiveCatalog.getName()));
Catalog builtInCat = tableEnv.getCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG).get();
Catalog builtInCat = tableEnv.getCatalog(DEFAULT_BUILTIN_CATALOG).get();
CatalogBaseTable catalogTable =
builtInCat.getTable(
new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "copy"));
builtInCat.getTable(new ObjectPath(DEFAULT_BUILTIN_DATABASE, "copy"));
assertThat(catalogTable.getOptions()).hasSize(1);
assertThat(catalogTable.getOptions())
.containsEntry(FactoryUtil.CONNECTOR.key(), "COLLECTION");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private StreamTableEnvironment createTableEnvironment() {

StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment();

final Executor executor = lookupExecutor(settings.getExecutor(), streamExecEnv);
final Executor executor = lookupExecutor(streamExecEnv);
return createStreamTableEnvironment(
streamExecEnv,
settings,
Expand All @@ -130,12 +130,7 @@ private StreamTableEnvironment createStreamTableEnvironment(

final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
tableConfig,
moduleManager,
catalogManager,
functionCatalog);
executor, tableConfig, moduleManager, catalogManager, functionCatalog);

return new StreamTableEnvironmentImpl(
catalogManager,
Expand All @@ -149,12 +144,11 @@ private StreamTableEnvironment createStreamTableEnvironment(
userClassLoader);
}

private Executor lookupExecutor(
String executorIdentifier, StreamExecutionEnvironment executionEnvironment) {
private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
try {
final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, executorIdentifier);
classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
final Method createMethod =
executorFactory
.getClass()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,12 @@ public AbstractStreamTableEnvironmentImpl(
}

public static Executor lookupExecutor(
ClassLoader classLoader,
String executorIdentifier,
StreamExecutionEnvironment executionEnvironment) {
ClassLoader classLoader, StreamExecutionEnvironment executionEnvironment) {
final ExecutorFactory executorFactory;
try {
executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, executorIdentifier);
classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,11 @@ public static StreamTableEnvironment create(
final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);

final Executor executor =
lookupExecutor(classLoader, settings.getExecutor(), executionEnvironment);
final Executor executor = lookupExecutor(classLoader, executionEnvironment);

final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
tableConfig,
moduleManager,
catalogManager,
functionCatalog);
executor, tableConfig, moduleManager, catalogManager, functionCatalog);

return new StreamTableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,11 @@

package org.apache.flink.table.api;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.functions.UserDefinedFunction;

import javax.annotation.Nullable;

import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
Expand Down Expand Up @@ -63,12 +56,6 @@ public class EnvironmentSettings {
public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
public static final String DEFAULT_BUILTIN_DATABASE = "default_database";

/** Factory identifier of the {@link Planner} to use. */
private final String planner;

/** Factory identifier of the {@link Executor} to use. */
private final String executor;

/**
* Specifies the name of the initial catalog to be created when instantiating {@link
* TableEnvironment}.
Expand All @@ -88,13 +75,7 @@ public class EnvironmentSettings {
private final boolean isStreamingMode;

private EnvironmentSettings(
String planner,
@Nullable String executor,
String builtInCatalogName,
String builtInDatabaseName,
boolean isStreamingMode) {
this.planner = planner;
this.executor = executor;
String builtInCatalogName, String builtInDatabaseName, boolean isStreamingMode) {
this.builtInCatalogName = builtInCatalogName;
this.builtInDatabaseName = builtInDatabaseName;
this.isStreamingMode = isStreamingMode;
Expand Down Expand Up @@ -180,23 +161,9 @@ public boolean isStreamingMode() {
return isStreamingMode;
}

/** Returns the identifier of the {@link Planner} to be used. */
@Internal
public String getPlanner() {
return planner;
}

/** Returns the {@link Executor} that should submit and execute table programs. */
@Internal
public String getExecutor() {
return executor;
}

/** A builder for {@link EnvironmentSettings}. */
@PublicEvolving
public static class Builder {
private final String planner = PlannerFactory.DEFAULT_IDENTIFIER;
private final String executor = ExecutorFactory.DEFAULT_IDENTIFIER;

private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
Expand Down Expand Up @@ -255,7 +222,7 @@ public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
/** Returns an immutable instance of {@link EnvironmentSettings}. */
public EnvironmentSettings build() {
return new EnvironmentSettings(
planner, executor, builtInCatalogName, builtInDatabaseName, isStreamingMode);
builtInCatalogName, builtInDatabaseName, isStreamingMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,12 @@ private static TableEnvironmentImpl create(

final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
classLoader, ExecutorFactory.class, settings.getExecutor());
classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
final Executor executor = executorFactory.create(configuration);

final Planner planner =
PlannerFactoryUtil.createPlanner(
settings.getPlanner(),
executor,
tableConfig,
moduleManager,
catalogManager,
functionCatalog);
executor, tableConfig, moduleManager, catalogManager, functionCatalog);

return new TableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class PlannerFactoryUtil {

/** Discovers a planner factory and creates a planner instance. */
public static Planner createPlanner(
String plannerIdentifier,
Executor executor,
TableConfig tableConfig,
ModuleManager moduleManager,
Expand All @@ -45,7 +44,7 @@ public static Planner createPlanner(
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
PlannerFactory.class,
plannerIdentifier);
PlannerFactory.DEFAULT_IDENTIFIER);

final Context context =
new DefaultPlannerContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,10 @@ object StreamTableEnvironmentImpl {
val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager)

val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor(
classLoader, settings.getExecutor, executionEnvironment.getWrappedStreamExecutionEnvironment)
classLoader, executionEnvironment.getWrappedStreamExecutionEnvironment)

val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, executor, tableConfig,
moduleManager, catalogManager, functionCatalog)
val planner = PlannerFactoryUtil.createPlanner(
executor, tableConfig, moduleManager, catalogManager, functionCatalog)

new StreamTableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.configuration.ExecutionOptions

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl
import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.api.config.ExecutionConfigOptions
Expand Down Expand Up @@ -1549,12 +1549,13 @@ object TestingTableEnvironment {

val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, moduleManager)

val executorFactory =
FactoryUtil.discoverFactory(classLoader, classOf[ExecutorFactory], settings.getExecutor)
val executorFactory = FactoryUtil.discoverFactory(
classLoader, classOf[ExecutorFactory], ExecutorFactory.DEFAULT_IDENTIFIER)

val executor = executorFactory.create(tableConfig.getConfiguration)

val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, executor, tableConfig,
moduleManager, catalogMgr, functionCatalog).asInstanceOf[PlannerBase]
val planner = PlannerFactoryUtil.createPlanner(
executor, tableConfig, moduleManager, catalogMgr, functionCatalog).asInstanceOf[PlannerBase]

new TestingTableEnvironment(
catalogMgr,
Expand Down

0 comments on commit 9e3e51d

Please sign in to comment.