Skip to content

Commit

Permalink
[FLINK-15635][table] Propagate ClassLoader whenever is appropriate
Browse files Browse the repository at this point in the history
Signed-off-by: slinkydeveloper <[email protected]>
  • Loading branch information
slinkydeveloper authored and wuchong committed Jun 8, 2022
1 parent 3402a6c commit beaf495
Show file tree
Hide file tree
Showing 71 changed files with 1,423 additions and 1,237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,12 @@ private StreamTableEnvironment createStreamTableEnvironment(

final Planner planner =
PlannerFactoryUtil.createPlanner(
executor, tableConfig, moduleManager, catalogManager, functionCatalog);
executor,
tableConfig,
userClassLoader,
moduleManager,
catalogManager,
functionCatalog);

return new StreamTableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public static SessionContext create(DefaultContext defaultContext, String sessio
.build();

FunctionCatalog functionCatalog =
new FunctionCatalog(configuration, catalogManager, moduleManager);
new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader);
SessionState sessionState =
new SessionState(catalogManager, moduleManager, functionCatalog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;

import org.jline.reader.MaskingCallback;
import org.jline.terminal.Terminal;
Expand Down Expand Up @@ -107,7 +108,11 @@ private void testResultViewClearResult(
schema,
false,
testConfig,
new RowDataToStringConverterImpl(schema.toPhysicalRowDataType()));
new RowDataToStringConverterImpl(
schema.toPhysicalRowDataType(),
DateTimeUtils.UTC_ZONE.toZoneId(),
Thread.currentThread().getContextClassLoader(),
false));

try (CliClient cli =
new TestingCliClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.types.Row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
Expand Down Expand Up @@ -120,6 +121,10 @@ public CloseableIterator<RowData> collectInternal() {

@Override
public RowDataToStringConverter getRowDataToStringConverter() {
return new RowDataToStringConverterImpl(resolvedSchema.toPhysicalRowDataType());
return new RowDataToStringConverterImpl(
resolvedSchema.toPhysicalRowDataType(),
DateTimeUtils.UTC_ZONE.toZoneId(),
Thread.currentThread().getContextClassLoader(),
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ public StreamTableEnvironmentImpl(

public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {

// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final ClassLoader classLoader = settings.getUserClassLoader();

final Executor executor = lookupExecutor(classLoader, executionEnvironment);

Expand All @@ -116,11 +114,16 @@ public static StreamTableEnvironment create(
.build();

final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader);

final Planner planner =
PlannerFactoryUtil.createPlanner(
executor, tableConfig, moduleManager, catalogManager, functionCatalog);
executor,
tableConfig,
classLoader,
moduleManager,
catalogManager,
functionCatalog);

return new StreamTableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ private StreamTableEnvironmentImpl getStreamTableEnvironment(
return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
new FunctionCatalog(tableConfig, catalogManager, moduleManager),
new FunctionCatalog(
tableConfig,
catalogManager,
moduleManager,
StreamTableEnvironmentImplTest.class.getClassLoader()),
tableConfig,
env,
new TestPlanner(elements.getTransformation()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.api;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
Expand Down Expand Up @@ -56,8 +57,11 @@ public class EnvironmentSettings {
*/
private final Configuration configuration;

private EnvironmentSettings(Configuration configuration) {
private final ClassLoader classLoader;

private EnvironmentSettings(Configuration configuration, ClassLoader classLoader) {
this.configuration = configuration;
this.classLoader = classLoader;
}

/**
Expand Down Expand Up @@ -97,7 +101,8 @@ public static Builder newInstance() {
*/
@Deprecated
public static EnvironmentSettings fromConfiguration(ReadableConfig configuration) {
return new EnvironmentSettings((Configuration) configuration);
return new EnvironmentSettings(
(Configuration) configuration, Thread.currentThread().getContextClassLoader());
}

/**
Expand Down Expand Up @@ -150,6 +155,7 @@ public ClassLoader getUserClassLoader() {
public static class Builder {

private final Configuration configuration = new Configuration();
private ClassLoader classLoader;

public Builder() {}

Expand Down Expand Up @@ -229,7 +235,7 @@ public EnvironmentSettings build() {
if (classLoader == null) {
classLoader = Thread.currentThread().getContextClassLoader();
}
return new EnvironmentSettings(configuration);
return new EnvironmentSettings(configuration, classLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ protected TableEnvironmentImpl(
this.operationTreeBuilder =
OperationTreeBuilder.create(
tableConfig,
userClassLoader,
functionCatalog.asLookup(getParser()::parseIdentifier),
catalogManager.getDataTypeFactory(),
path -> {
Expand Down Expand Up @@ -259,8 +260,7 @@ public static TableEnvironmentImpl create(Configuration configuration) {
}

public static TableEnvironmentImpl create(EnvironmentSettings settings) {
// temporary solution until FLINK-15635 is fixed
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final ClassLoader classLoader = settings.getUserClassLoader();

final ExecutorFactory executorFactory =
FactoryUtil.discoverFactory(
Expand All @@ -286,11 +286,16 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
.build();

final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, catalogManager, moduleManager);
new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader);

final Planner planner =
PlannerFactoryUtil.createPlanner(
executor, tableConfig, moduleManager, catalogManager, functionCatalog);
executor,
tableConfig,
classLoader,
moduleManager,
catalogManager,
functionCatalog);

return new TableEnvironmentImpl(
catalogManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ public String toString() {
+ "', "
+ "functionLanguage='"
+ getFunctionLanguage()
+ "', "
+ "isGeneric='"
+ isGeneric()
+ "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class FunctionCatalog {
private final ReadableConfig config;
private final CatalogManager catalogManager;
private final ModuleManager moduleManager;
private final ClassLoader classLoader;

private final Map<String, CatalogFunction> tempSystemFunctions = new LinkedHashMap<>();
private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions =
Expand All @@ -76,10 +77,14 @@ public final class FunctionCatalog {
private PlannerTypeInferenceUtil plannerTypeInferenceUtil;

public FunctionCatalog(
ReadableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
ReadableConfig config,
CatalogManager catalogManager,
ModuleManager moduleManager,
ClassLoader classLoader) {
this.config = checkNotNull(config);
this.catalogManager = checkNotNull(catalogManager);
this.moduleManager = checkNotNull(moduleManager);
this.classLoader = classLoader;
}

public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
Expand Down Expand Up @@ -146,7 +151,7 @@ public void registerTemporaryCatalogFunction(
normalizedIdentifier.toObjectPath(), catalogFunction);
}
try {
validateAndPrepareFunction(catalogFunction);
catalogFunction = validateAndPrepareFunction(catalogFunction);
} catch (Throwable t) {
throw new ValidationException(
String.format(
Expand Down Expand Up @@ -368,7 +373,7 @@ public Optional<ContextResolvedFunction> lookupFunction(UnresolvedIdentifier ide
*/
@Deprecated
public void registerTempSystemScalarFunction(String name, ScalarFunction function) {
UserDefinedFunctionHelper.prepareInstance(config, function);
function = UserDefinedFunctionHelper.prepareInstance(config, classLoader, function);

registerTempSystemFunction(name, new ScalarFunctionDefinition(name, function));
}
Expand All @@ -380,7 +385,7 @@ public void registerTempSystemScalarFunction(String name, ScalarFunction functio
@Deprecated
public <T> void registerTempSystemTableFunction(
String name, TableFunction<T> function, TypeInformation<T> resultType) {
UserDefinedFunctionHelper.prepareInstance(config, function);
function = UserDefinedFunctionHelper.prepareInstance(config, classLoader, function);

registerTempSystemFunction(name, new TableFunctionDefinition(name, function, resultType));
}
Expand All @@ -395,7 +400,7 @@ public <T, ACC> void registerTempSystemAggregateFunction(
ImperativeAggregateFunction<T, ACC> function,
TypeInformation<T> resultType,
TypeInformation<ACC> accType) {
UserDefinedFunctionHelper.prepareInstance(config, function);
function = UserDefinedFunctionHelper.prepareInstance(config, classLoader, function);

final FunctionDefinition definition;
if (function instanceof AggregateFunction) {
Expand All @@ -419,7 +424,7 @@ public <T, ACC> void registerTempSystemAggregateFunction(
*/
@Deprecated
public void registerTempCatalogScalarFunction(ObjectIdentifier oi, ScalarFunction function) {
UserDefinedFunctionHelper.prepareInstance(config, function);
function = UserDefinedFunctionHelper.prepareInstance(config, classLoader, function);

registerTempCatalogFunction(oi, new ScalarFunctionDefinition(oi.getObjectName(), function));
}
Expand Down Expand Up @@ -493,7 +498,7 @@ public void registerTemporarySystemFunction(
final String normalizedName = FunctionIdentifier.normalizeName(name);

try {
validateAndPrepareFunction(function);
function = validateAndPrepareFunction(function);
} catch (Throwable t) {
throw new ValidationException(
String.format(
Expand Down Expand Up @@ -621,7 +626,7 @@ private Optional<ContextResolvedFunction> resolveAmbiguousFunctionReference(Stri
}

@SuppressWarnings("unchecked")
private void validateAndPrepareFunction(CatalogFunction function)
private CatalogFunction validateAndPrepareFunction(CatalogFunction function)
throws ClassNotFoundException {
// If the input is instance of UserDefinedFunction, it means it uses the new type inference.
// In this situation the UDF have not been validated and cleaned, so we need to validate it
Expand All @@ -632,15 +637,17 @@ private void validateAndPrepareFunction(CatalogFunction function)
if (function instanceof InlineCatalogFunction) {
FunctionDefinition definition = ((InlineCatalogFunction) function).getDefinition();
if (definition instanceof UserDefinedFunction) {
UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition);
return new InlineCatalogFunction(
UserDefinedFunctionHelper.prepareInstance(
config, classLoader, (UserDefinedFunction) definition));
}
// Skip validation if it's not a UserDefinedFunction.
} else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
UserDefinedFunctionHelper.validateClass(
(Class<? extends UserDefinedFunction>)
contextClassLoader.loadClass(function.getClassName()));
classLoader.loadClass(function.getClassName()));
}
return function;
}

private FunctionDefinition getFunctionDefinition(String name, CatalogFunction function) {
Expand All @@ -651,8 +658,7 @@ private FunctionDefinition getFunctionDefinition(String name, CatalogFunction fu
return ((InlineCatalogFunction) function).getDefinition();
}
return UserDefinedFunctionHelper.instantiateFunction(
Thread.currentThread()
.getContextClassLoader(), // TODO use classloader of catalog manager in the
classLoader,
// future
config,
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.delegation;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
Expand Down Expand Up @@ -51,6 +52,13 @@ interface Context {
/** The configuration of the planner to use. */
TableConfig getTableConfig();

/**
* The user classloader.
*
* @see EnvironmentSettings#getUserClassLoader()
*/
ClassLoader getClassLoader();

/** The module manager. */
ModuleManager getModuleManager();

Expand All @@ -65,18 +73,21 @@ interface Context {
class DefaultPlannerContext implements Context {
private final Executor executor;
private final TableConfig tableConfig;
private final ClassLoader classLoader;
private final ModuleManager moduleManager;
private final CatalogManager catalogManager;
private final FunctionCatalog functionCatalog;

public DefaultPlannerContext(
Executor executor,
TableConfig tableConfig,
ClassLoader classLoader,
ModuleManager moduleManager,
CatalogManager catalogManager,
FunctionCatalog functionCatalog) {
this.executor = executor;
this.tableConfig = tableConfig;
this.classLoader = classLoader;
this.moduleManager = moduleManager;
this.catalogManager = catalogManager;
this.functionCatalog = functionCatalog;
Expand All @@ -92,6 +103,11 @@ public TableConfig getTableConfig() {
return tableConfig;
}

@Override
public ClassLoader getClassLoader() {
return classLoader;
}

@Override
public ModuleManager getModuleManager() {
return moduleManager;
Expand Down
Loading

0 comments on commit beaf495

Please sign in to comment.