Skip to content

Commit

Permalink
[FLINK-15675][python][docs] Add exception and documentation that Pyth…
Browse files Browse the repository at this point in the history
…on UDF is not supported in old Planner under batch mode

This closes apache#10907
  • Loading branch information
hequn8128 authored and dianfu committed Jan 20, 2020
1 parent ad96288 commit 360afd9
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ $ python --version
$ python -m pip install apache-beam==2.15.0
{% endhighlight %}

<span class="label label-info">Note</span> Currently, Python UDF is supported in Blink planner both under streaming and batch mode while is only supported under streaming mode in old planner.

It supports to use both Java/Scala scalar functions and Python scalar functions in Python Table API and SQL. In order to define a Python scalar function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method. An evaluation method must be named `eval`. Evaluation method can also support variable arguments, such as `eval(*args)`.

The following example shows how to define your own Java and Python hash code functions, register them in the TableEnvironment, and call them in a query. Note that you can configure your scalar function via a constructor before it is registered:
Expand Down
2 changes: 2 additions & 0 deletions docs/dev/table/functions/udfs.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ $ python --version
$ python -m pip install apache-beam==2.15.0
{% endhighlight %}

<span class="label label-info">Note</span> Currently, Python UDF is supported in Blink planner both under streaming and batch mode while is only supported under streaming mode in old planner.

It supports to use both Java/Scala scalar functions and Python scalar functions in Python Table API and SQL. In order to define a Python scalar function, one can extend the base class `ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The behavior of a Python scalar function is determined by the evaluation method. An evaluation method must be named `eval`. Evaluation method can also support variable arguments, such as `eval(*args)`.

The following example shows how to define your own Java and Python hash code functions, register them in the TableEnvironment, and call them in a query. Note that you can configure your scalar function via a constructor before it is registered:
Expand Down
2 changes: 2 additions & 0 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,8 @@ def register_function(self, name, function):
:param function: The python user-defined function to register.
:type function: pyflink.table.udf.UserDefinedFunctionWrapper
"""
if not self._is_blink_planner and isinstance(self, BatchTableEnvironment):
raise Exception("Python UDF is not supported in old planner under batch mode!")
self._j_tenv.registerFunction(name, function._judf(self._is_blink_planner,
self.get_config()._j_table_config))

Expand Down
14 changes: 13 additions & 1 deletion flink-python/pyflink/table/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from pyflink.table.udf import ScalarFunction, udf
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase
PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase, \
PyFlinkBatchTableTestCase


class UserDefinedFunctionTests(object):
Expand Down Expand Up @@ -478,6 +479,17 @@ class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
pass


class PyFlinkBatchUserDefinedFunctionTests(PyFlinkBatchTableTestCase):

def test_invalid_register_udf(self):
self.assertRaises(
Exception,
lambda: self.t_env.register_function(
"add_one",
udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()))
)


class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
PyFlinkBlinkStreamTableTestCase):
def test_deterministic(self):
Expand Down

0 comments on commit 360afd9

Please sign in to comment.