Skip to content

Commit

Permalink
[FLINK-17345][python][table] Support register and get Python UDF in C…
Browse files Browse the repository at this point in the history
…atalog (apache#11884)
  • Loading branch information
WeiZhong94 authored Apr 24, 2020
1 parent efd22a7 commit 90fed72
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public class HiveCatalog extends AbstractCatalog {
// It's appended to Flink function's class name
// because Hive's Function object doesn't have properties or other place to store the flag for Flink functions.
private static final String FLINK_FUNCTION_PREFIX = "flink:";
private static final String FLINK_PYTHON_FUNCTION_PREFIX = FLINK_FUNCTION_PREFIX + "python:";

private final HiveConf hiveConf;
private final String hiveVersion;
Expand Down Expand Up @@ -1155,7 +1156,13 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx
Function function = client.getFunction(functionPath.getDatabaseName(), functionPath.getObjectName());

if (function.getClassName().startsWith(FLINK_FUNCTION_PREFIX)) {
return new CatalogFunctionImpl(function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()));
if (function.getClassName().startsWith(FLINK_PYTHON_FUNCTION_PREFIX)) {
return new CatalogFunctionImpl(
function.getClassName().substring(FLINK_PYTHON_FUNCTION_PREFIX.length()),
FunctionLanguage.PYTHON);
} else {
return new CatalogFunctionImpl(function.getClassName().substring(FLINK_FUNCTION_PREFIX.length()));
}
} else {
return new CatalogFunctionImpl(function.getClassName());
}
Expand Down Expand Up @@ -1187,13 +1194,16 @@ private static Function instantiateHiveFunction(ObjectPath functionPath, Catalog

// Hive Function does not have properties map
// thus, use a prefix in class name to distinguish Flink and Hive functions
String functionClassName = isGeneric ?
FLINK_FUNCTION_PREFIX + function.getClassName() :
function.getClassName();

if (!function.getFunctionLanguage().equals(FunctionLanguage.JAVA)) {
String functionClassName;
if (function.getFunctionLanguage().equals(FunctionLanguage.JAVA)) {
functionClassName = isGeneric ?
FLINK_FUNCTION_PREFIX + function.getClassName() :
function.getClassName();
} else if (function.getFunctionLanguage().equals(FunctionLanguage.PYTHON)) {
functionClassName = FLINK_PYTHON_FUNCTION_PREFIX + function.getClassName();
} else {
throw new UnsupportedOperationException("HiveCatalog supports only creating" +
" JAVA based function for now");
" JAVA or PYTHON based function for now");
}

return new Function(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.functions.hive.HiveGenericUDF;
import org.apache.flink.table.functions.hive.HiveSimpleUDF;
Expand Down Expand Up @@ -67,4 +68,9 @@ protected CatalogFunction createFunction() {
protected CatalogFunction createAnotherFunction() {
return new CatalogFunctionImpl(HiveGenericUDF.class.getCanonicalName());
}

@Override
protected CatalogFunction createPythonFunction() {
return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ public static void cleanEnvironment() throws Exception {
}

public static void testPythonFunctionFactory() {
// flink catalog
flinkTableEnv.sqlUpdate("create function func1 as 'test1.func1' language python");
verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);

// flink catalog
flinkTableEnv.sqlUpdate("alter function func1 as 'test1.func1' language python");
verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);

// flink temporary catalog
flinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python");
verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);
Expand All @@ -91,6 +99,14 @@ public static void testPythonFunctionFactory() {
flinkTableEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
verifyPlan(flinkSourceTable.select("func1(str)"), flinkTableEnv);

// blink catalog
blinkTableEnv.sqlUpdate("create function func1 as 'test1.func1' language python");
verifyPlan(blinkSourceTable.select("func1(str)"), blinkTableEnv);

// blink catalog
blinkTableEnv.sqlUpdate("alter function func1 as 'test1.func1' language python");
verifyPlan(blinkSourceTable.select("func1(str)"), blinkTableEnv);

// blink temporary catalog
blinkTableEnv.sqlUpdate("create temporary function func1 as 'test1.func1' language python");
verifyPlan(blinkSourceTable.select("func1(str)"), blinkTableEnv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
Expand Down Expand Up @@ -999,9 +998,6 @@ private TableResult alterCatalogFunction(AlterCatalogFunctionOperation alterCata
if (alterCatalogFunctionOperation.isTemporary()) {
throw new ValidationException(
"Alter temporary catalog function is not supported");
} else if (function.getFunctionLanguage() == FunctionLanguage.PYTHON) {
throw new ValidationException(
"Alter Python catalog function is not supported");
} else {
Catalog catalog = getCatalogOrThrowException(
alterCatalogFunctionOperation.getFunctionIdentifier().getCatalogName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public Optional<String> getDetailedDescription() {

@Override
public boolean isGeneric() {
if (functionLanguage == FunctionLanguage.PYTHON) {
return true;
}
try {
Class c = Class.forName(className);
if (UserDefinedFunction.class.isAssignableFrom(c)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,13 @@ private Optional<FunctionLookup.Result> resolvePreciseFunctionReference(ObjectId
new ObjectPath(oi.getDatabaseName(), oi.getObjectName()));

FunctionDefinition fd;
if (catalog.getFunctionDefinitionFactory().isPresent()) {
if (catalog.getFunctionDefinitionFactory().isPresent() &&
catalogFunction.getFunctionLanguage() != FunctionLanguage.PYTHON) {
fd = catalog.getFunctionDefinitionFactory().get()
.createFunctionDefinition(oi.getObjectName(), catalogFunction);
} else {
// TODO update the FunctionDefinitionUtil once we drop the old function stack in DDL
fd = FunctionDefinitionUtil.createFunctionDefinition(
oi.getObjectName(), catalogFunction.getClassName());
fd = getFunctionDefinition(oi.getObjectName(), catalogFunction);
}

return Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,9 @@ protected CatalogFunction createFunction() {
protected CatalogFunction createAnotherFunction() {
return new CatalogFunctionImpl(TestSimpleUDF.class.getCanonicalName(), FunctionLanguage.SCALA);
}

@Override
protected CatalogFunction createPythonFunction() {
return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,16 @@ public void testCreateFunction() throws Exception {
assertTrue(catalog.functionExists(path1));
}

@Test
public void testCreatePythonFunction() throws Exception {
catalog.createDatabase(db1, createDb(), false);
CatalogFunction pythonFunction = createPythonFunction();
catalog.createFunction(path1, createPythonFunction(), false);

CatalogFunction actual = catalog.getFunction(path1);
checkEquals(pythonFunction, actual);
}

@Test
public void testCreateFunction_DatabaseNotExistException() throws Exception {
assertFalse(catalog.databaseExists(db1));
Expand Down Expand Up @@ -668,6 +678,23 @@ public void testAlterFunction() throws Exception {
checkEquals(newFunc, actual);
}

@Test
public void testAlterPythonFunction() throws Exception {
catalog.createDatabase(db1, createDb(), false);

CatalogFunction func = createFunction();
catalog.createFunction(path1, func, false);

checkEquals(func, catalog.getFunction(path1));

CatalogFunction newFunc = createPythonFunction();
catalog.alterFunction(path1, newFunc, false);
CatalogFunction actual = catalog.getFunction(path1);

assertFalse(func.getClassName().equals(actual.getClassName()));
checkEquals(newFunc, actual);
}

@Test
public void testAlterFunction_FunctionNotExistException() throws Exception {
exception.expect(FunctionNotExistException.class);
Expand Down Expand Up @@ -1219,6 +1246,11 @@ public void testAlterTableStats_TableNotExistException_ignore() throws Exception
*/
protected abstract CatalogFunction createFunction();

/**
* Create a Python CatalogFunction instance by specific catalog implementation.
*/
protected abstract CatalogFunction createPythonFunction();

/**
* Create another CatalogFunction instance by specific catalog implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,6 @@ private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
);
} else {
FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage());
if (language == FunctionLanguage.PYTHON && !sqlCreateFunction.isTemporary()) {
throw new ValidationException(String.format(
"Unsupported function type for Python function %s, " +
"only temporary function is supported.",
sqlCreateFunction.getFunctionClassName().getValueAs(String.class)));
}
CatalogFunction catalogFunction = new CatalogFunctionImpl(
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
language);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
);
} else {
FunctionLanguage language = parseLanguage(sqlCreateFunction.getFunctionLanguage());
if (language == FunctionLanguage.PYTHON && !sqlCreateFunction.isTemporary()) {
throw new ValidationException(String.format(
"Unsupported function type for Python function %s, " +
"only temporary function is supported.",
sqlCreateFunction.getFunctionClassName().getValueAs(String.class)));
}
CatalogFunction catalogFunction = new CatalogFunctionImpl(
sqlCreateFunction.getFunctionClassName().getValueAs(String.class),
language);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,6 @@ abstract class TableEnvImpl(
val function = alterFunctionOperation.getCatalogFunction
if (alterFunctionOperation.isTemporary) {
throw new ValidationException("Alter temporary catalog function is not supported")
} else if (function.getFunctionLanguage eq FunctionLanguage.PYTHON) {
throw new ValidationException("Alter Python catalog function is not supported");
} else {
val catalog = getCatalogOrThrowException(
alterFunctionOperation.getFunctionIdentifier.getCatalogName)
Expand Down

0 comments on commit 90fed72

Please sign in to comment.