Skip to content

Commit

Permalink
[FLINK-27660][table] add table api for registering function with reso…
Browse files Browse the repository at this point in the history
…urce uris
  • Loading branch information
HuangZhenQiu authored and wuchong committed Sep 13, 2022
1 parent 12dfe6a commit fbd369c
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleEntry;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.types.AbstractDataType;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -516,13 +518,51 @@ void createFunction(
boolean ignoreIfExists);

/**
* Drops a catalog function registered in the given path.
* Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
* specific class name and user defined resource uri.
*
* @param path The path under which the function has been registered. See also the {@link
* <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
* defined function by only provide a full path class name and and available resource list in
* which each url may be local or remote. User doesn't need to initialize the function instance
* in advance.
*
* <p>Compared to system functions with a globally defined name, catalog functions are always
* (implicitly or explicitly) identified by a catalog and database.
*
* <p>There must not be another function (temporary or permanent) registered under the same
* path.
*
* @param path The path under which the function will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @return true if a function existed in the given path and was removed
* @param className The class name of UDF to be registered.
* @param resourceUris The list of udf resource uris in local or remote.
*/
boolean dropFunction(String path);
void createFunction(String path, String className, List<ResourceUri> resourceUris);

/**
* Registers a {@link UserDefinedFunction} class as a catalog function in the given path by the
* specific class name and user defined resource uri.
*
* <p>Compared to {@link #createFunction(String, Class)}, this method allow registering a user
* defined function by only provide a full path class name and an available resource list in
* which each url may be local or remote. User doesn't need to initialize the function instance
* in advance.
*
* <p>Compared to system functions with a globally defined name, catalog functions are always
* (implicitly or explicitly) identified by a catalog and database.
*
* <p>There must not be another function (temporary or permanent) registered under the same
* path.
*
* @param path The path under which the function will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param className The class name of UDF to be registered.
* @param resourceUris The list of udf resource uris in local or remote.
* @param ignoreIfExists If a function exists under the given path and this flag is set, no
* operation is executed. An exception is thrown otherwise.
*/
void createFunction(
String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists);

/**
* Registers a {@link UserDefinedFunction} class as a temporary catalog function.
Expand Down Expand Up @@ -563,6 +603,59 @@ void createFunction(
*/
void createTemporaryFunction(String path, UserDefinedFunction functionInstance);

/**
* Registers a {@link UserDefinedFunction} class as a temporary catalog function in the given
* path by the specific class name and user defined resource uri.
*
* <p>Compared to {@link #createTemporaryFunction(String, Class)}, this method allow registering
* a user defined function by only provide a full path class name and an available resource list
* in which each url may be local or remote. User doesn't need to initialize the function
* instance in advance.
*
* <p>Compared to {@link #createTemporarySystemFunction(String, String, List)} with a globally
* defined name, catalog functions are always (implicitly or explicitly) identified by a catalog
* and database.
*
* <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
* exists, it will be inaccessible in the current session. To make the permanent function
* available again one can drop the corresponding temporary function.
*
* @param path The path under which the function will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param className The class name of UDF to be registered.
* @param resourceUris The list udf resource uri in local or remote.
*/
void createTemporaryFunction(String path, String className, List<ResourceUri> resourceUris);

/**
* Registers a {@link UserDefinedFunction} class as a temporary system function by the specific
* class name and user defined resource uri.
*
* <p>Compared to {@link #createTemporarySystemFunction(String, Class)}, this method allow
* registering a user defined function by only provide a full path class name and an available
* resource list in which each url may be local or remote. User doesn't need to initialize the
* function instance in advance.
*
* <p>Temporary functions can shadow permanent ones. If a permanent function under a given name
* exists, it will be inaccessible in the current session. To make the permanent function
* available again one can drop the corresponding temporary system function.
*
* @param name The name under which the function will be registered globally.
* @param className The class name of UDF to be registered.
* @param resourceUris The list of udf resource uris in local or remote.
*/
void createTemporarySystemFunction(
String name, String className, List<ResourceUri> resourceUris);

/**
* Drops a catalog function registered in the given path.
*
* @param path The path under which the function has been registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @return true if a function existed in the given path and was removed
*/
boolean dropFunction(String path);

/**
* Drops a temporary catalog function registered in the given path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
Expand All @@ -53,6 +54,7 @@
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
Expand Down Expand Up @@ -204,6 +206,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final FunctionCatalog functionCatalog;
protected final Planner planner;
private final boolean isStreamingMode;

private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
+ "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
Expand Down Expand Up @@ -260,7 +263,6 @@ protected TableEnvironmentImpl(
},
getParser()::parseSqlExpression,
isStreamingMode);

catalogManager.initSchemaResolver(
isStreamingMode, operationTreeBuilder.getResolverBuilder());
}
Expand Down Expand Up @@ -417,6 +419,12 @@ public void createTemporarySystemFunction(String name, UserDefinedFunction funct
functionCatalog.registerTemporarySystemFunction(name, functionInstance, false);
}

@Override
public void createTemporarySystemFunction(
String name, String className, List<ResourceUri> resourceUris) {
functionCatalog.registerTemporarySystemFunction(name, className, resourceUris);
}

@Override
public boolean dropTemporarySystemFunction(String name) {
return functionCatalog.dropTemporarySystemFunction(name, true);
Expand All @@ -437,6 +445,19 @@ public void createFunction(
unresolvedIdentifier, functionClass, ignoreIfExists);
}

@Override
public void createFunction(String path, String className, List<ResourceUri> resourceUris) {
createFunction(path, className, resourceUris, false);
}

@Override
public void createFunction(
String path, String className, List<ResourceUri> resourceUris, boolean ignoreIfExists) {
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
functionCatalog.registerCatalogFunction(
unresolvedIdentifier, className, resourceUris, ignoreIfExists);
}

@Override
public boolean dropFunction(String path) {
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
Expand All @@ -458,6 +479,16 @@ public void createTemporaryFunction(String path, UserDefinedFunction functionIns
unresolvedIdentifier, functionInstance, false);
}

@Override
public void createTemporaryFunction(
String path, String className, List<ResourceUri> resourceUris) {
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
final CatalogFunction catalogFunction =
new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris);
functionCatalog.registerTemporaryCatalogFunction(
unresolvedIdentifier, catalogFunction, false);
}

@Override
public boolean dropTemporaryFunction(String path) {
final UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path);
Expand Down Expand Up @@ -1896,6 +1927,7 @@ private TableResultInternal dropCatalogFunction(

private TableResultInternal createSystemFunction(CreateTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());

try {
functionCatalog.registerTemporarySystemFunction(
operation.getFunctionName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ public void registerTemporarySystemFunction(
name, new CatalogFunctionImpl(fullyQualifiedName, language), ignoreIfExists);
}

/** Registers a temporary system function from resource uris. */
public void registerTemporarySystemFunction(
String name, String className, List<ResourceUri> resourceUris) {
registerTemporarySystemFunction(
name,
new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris),
false);
}

/** Drops a temporary system function. Returns true if a function was dropped. */
public boolean dropTemporarySystemFunction(String name, boolean ignoreIfNotExist) {
final String normalizedName = FunctionIdentifier.normalizeName(name);
Expand Down Expand Up @@ -184,8 +193,8 @@ public void registerCatalogFunction(
Class<? extends UserDefinedFunction> functionClass,
boolean ignoreIfExists) {
final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
final ObjectIdentifier normalizedIdentifier =
FunctionIdentifier.normalizeObjectIdentifier(identifier);
final CatalogFunction catalogFunction =
new CatalogFunctionImpl(functionClass.getName(), FunctionLanguage.JAVA);

try {
UserDefinedFunctionHelper.validateClass(functionClass);
Expand All @@ -197,45 +206,20 @@ public void registerCatalogFunction(
t);
}

final Catalog catalog =
catalogManager
.getCatalog(normalizedIdentifier.getCatalogName())
.orElseThrow(IllegalStateException::new);
final ObjectPath path = identifier.toObjectPath();

// we force users to deal with temporary catalog functions first
if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
if (ignoreIfExists) {
return;
}
throw new ValidationException(
String.format(
"Could not register catalog function. A temporary function '%s' does already exist. "
+ "Please drop the temporary function first.",
identifier.asSummaryString()));
}
registerCatalogFunction(identifier, catalogFunction, ignoreIfExists);
}

if (catalog.functionExists(path)) {
if (ignoreIfExists) {
return;
}
throw new ValidationException(
String.format(
"Could not register catalog function. A function '%s' does already exist.",
identifier.asSummaryString()));
}
public void registerCatalogFunction(
UnresolvedIdentifier unresolvedIdentifier,
String className,
List<ResourceUri> resourceUris,
boolean ignoreIfExists) {

final ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
final CatalogFunction catalogFunction =
new CatalogFunctionImpl(functionClass.getName(), FunctionLanguage.JAVA);
try {
catalog.createFunction(path, catalogFunction, ignoreIfExists);
} catch (Throwable t) {
throw new TableException(
String.format(
"Could not register catalog function '%s'.",
identifier.asSummaryString()),
t);
}
new CatalogFunctionImpl(className, FunctionLanguage.JAVA, resourceUris);

registerCatalogFunction(identifier, catalogFunction, ignoreIfExists);
}

/**
Expand Down Expand Up @@ -714,6 +698,56 @@ private void registerFunctionJarResources(String functionName, List<ResourceUri>
}
}

private void registerCatalogFunction(
ObjectIdentifier identifier, CatalogFunction catalogFunction, boolean ignoreIfExists) {
final ObjectIdentifier normalizedIdentifier =
FunctionIdentifier.normalizeObjectIdentifier(identifier);

final Catalog catalog =
catalogManager
.getCatalog(normalizedIdentifier.getCatalogName())
.orElseThrow(
() ->
new ValidationException(
String.format(
"Catalog %s not exists.",
normalizedIdentifier.getCatalogName())));

final ObjectPath path = identifier.toObjectPath();

// we force users to deal with temporary catalog functions first
if (tempCatalogFunctions.containsKey(normalizedIdentifier)) {
if (ignoreIfExists) {
return;
}
throw new ValidationException(
String.format(
"Could not register catalog function. A temporary function '%s' does already exist. "
+ "Please drop the temporary function first.",
identifier.asSummaryString()));
}

if (catalog.functionExists(path)) {
if (ignoreIfExists) {
return;
}
throw new ValidationException(
String.format(
"Could not register catalog function. A function '%s' does already exist.",
identifier.asSummaryString()));
}

try {
catalog.createFunction(path, catalogFunction, ignoreIfExists);
} catch (Throwable t) {
throw new TableException(
String.format(
"Could not register catalog function '%s'.",
identifier.asSummaryString()),
t);
}
}

/** The CatalogFunction which holds an instantiated UDF. */
public static class InlineCatalogFunction implements CatalogFunction {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
new URL[0],
Thread.currentThread().getContextClassLoader(),
tableConfig.getConfiguration());

return new TableEnvironmentMock(
catalogManager,
moduleManager,
Expand Down
Loading

0 comments on commit fbd369c

Please sign in to comment.