From d12d49ed22691ff2d4a2b619c95b6c382ba2e9ee Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Sun, 26 Nov 2023 00:44:41 +0100 Subject: [PATCH] [FLINK-33646][table] Cleanup the rest usage of deprecated TableEnvironment#registerFunction and in docs --- .../docs/dev/table/functions/udfs.md | 8 +-- docs/content.zh/docs/dev/table/tableApi.md | 63 ++++++++++--------- docs/content/docs/dev/table/tableApi.md | 63 ++++++++++--------- .../flink/table/api/AggregatedTable.java | 3 +- .../flink/table/api/FlatAggregateTable.java | 3 +- .../apache/flink/table/api/GroupedTable.java | 6 +- .../harness/TableAggregateHarnessTest.scala | 4 +- .../runtime/stream/sql/AggregateITCase.scala | 4 +- .../stream/sql/GroupWindowITCase.scala | 2 +- .../stream/sql/MatchRecognizeITCase.scala | 17 ++++- .../stream/sql/OverAggregateITCase.scala | 10 +-- .../utils/UserDefinedFunctionTestUtils.scala | 10 +++ 12 files changed, 108 insertions(+), 85 deletions(-) diff --git a/docs/content.zh/docs/dev/table/functions/udfs.md b/docs/content.zh/docs/dev/table/functions/udfs.md index ab619d06868ad..bd213b80bf98e 100644 --- a/docs/content.zh/docs/dev/table/functions/udfs.md +++ b/docs/content.zh/docs/dev/table/functions/udfs.md @@ -1282,7 +1282,7 @@ public static class WeightedAvg extends AggregateFunction // 注册函数 StreamTableEnvironment tEnv = ... -tEnv.registerFunction("top2", new Top2()); +tEnv.createTemporarySystemFunction("top2", Top2.class); // 初始化表 Table tab = ...; @@ -1940,7 +1940,7 @@ public static class Top2 extends TableAggregateFunction // 注册函数 StreamTableEnvironment tEnv = ... -tEnv.registerFunction("top2", new Top2()); +tEnv.createTemporarySystemFunction("top2", Top2.class); // 初始化表 Table tab = ...; diff --git a/docs/content.zh/docs/dev/table/tableApi.md b/docs/content.zh/docs/dev/table/tableApi.md index 5a440b96770d3..dc7564be7919a 100644 --- a/docs/content.zh/docs/dev/table/tableApi.md +++ b/docs/content.zh/docs/dev/table/tableApi.md @@ -825,7 +825,7 @@ result = orders.over_window(Over Table orders = tEnv.from("Orders"); // 对 user-defined aggregate functions 使用互异(互不相同、去重)聚合 -tEnv.registerFunction("myUdagg", new MyUdagg()); +tEnv.createTemporarySystemFunction("myUdagg", MyUdagg.class); orders.groupBy("users") .select( $("users"), @@ -1026,8 +1026,7 @@ join 表和表函数的结果。左(外部)表的每一行都会 join 表函 {{< tab "Java" >}} ```java // 注册 User-Defined Table Function -TableFunction> split = new MySplitUDTF(); -tableEnv.registerFunction("split", split); +tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class); // join Table orders = tableEnv.from("Orders"); @@ -1075,8 +1074,7 @@ join 表和表函数的结果。左(外部)表的每一行都会 join 表函 {{< tab "Java" >}} ```java // 注册 User-Defined Table Function -TableFunction> split = new MySplitUDTF(); -tableEnv.registerFunction("split", split); +tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class); // join Table orders = tableEnv.from("Orders"); @@ -1128,7 +1126,7 @@ Table ratesHistory = tableEnv.from("RatesHistory"); TemporalTableFunction rates = ratesHistory.createTemporalTableFunction( "r_proctime", "r_currency"); -tableEnv.registerFunction("rates", rates); +tableEnv.createTemporarySystemFunction("rates", rates); // 基于时间属性和键与“Orders”表关联 Table orders = tableEnv.from("Orders"); @@ -2175,19 +2173,14 @@ table = input.over_window([w: OverWindow].alias("w")) 使用用户定义的标量函数或内置标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。 ```java +@FunctionHint(input = @DataTypeHint("STRING"), output = @DataTypeHint("ROW")) public class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } - - @Override - public TypeInformation getResultType(Class[] signature) { - return Types.ROW(Types.STRING(), Types.STRING()); - } } -ScalarFunction func = new MyMapFunction(); -tableEnv.registerFunction("func", func); +tableEnv.createTemporarySystemFunction("func", MyMapFunction.class); Table table = input .map(call("func", $("c")).as("a", "b")); @@ -2252,6 +2245,7 @@ table = input.map(pandas_func).alias('a', 'b') 使用表函数执行 `flatMap` 操作。 ```java +@FunctionHint(input = @DataTypeHint("STRING"), output = @DataTypeHint("ROW")) public class MyFlatMapFunction extends TableFunction { public void eval(String str) { @@ -2262,15 +2256,9 @@ public class MyFlatMapFunction extends TableFunction { } } } - - @Override - public TypeInformation getResultType() { - return Types.ROW(Types.STRING(), Types.INT()); - } } -TableFunction func = new MyFlatMapFunction(); -tableEnv.registerFunction("func", func); +tableEnv.createTemporarySystemFunction("func", MyFlatMapFunction.class); Table table = input .flatMap(call("func", $("c")).as("a", "b")); @@ -2364,13 +2352,21 @@ public class MyMinMax extends AggregateFunction { } @Override - public TypeInformation getResultType() { - return new RowTypeInfo(Types.INT, Types.INT); + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + return TypeInference.newBuilder() + .typedArguments(DataTypes.INT()) + .accumulatorTypeStrategy( + TypeStrategies.explicit( + DataTypes.STRUCTURED( + MyMinMaxAcc.class, + DataTypes.FIELD("min", DataTypes.INT()), + DataTypes.FIELD("max", DataTypes.INT())))) + .outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT())) + .build(); } } -AggregateFunction myAggFunc = new MyMinMax(); -tableEnv.registerFunction("myAggFunc", myAggFunc); +tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class); Table table = input .groupBy($("key")) .aggregate(call("myAggFunc", $("a")).as("x", "y")) @@ -2406,9 +2402,17 @@ class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] { Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max)) } - override def getResultType: TypeInformation[Row] = { - new RowTypeInfo(Types.INT, Types.INT) - } + override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = + TypeInference.newBuilder + .typedArguments(DataTypes.INT) + .accumulatorTypeStrategy( + TypeStrategies.explicit( + DataTypes.STRUCTURED( + classOf[MyMinMaxAcc], + DataTypes.FIELD("min", DataTypes.INT), + DataTypes.FIELD("max", DataTypes.INT)))) + .outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT)) + .build } val myAggFunc = new MyMinMax @@ -2491,8 +2495,7 @@ t.aggregate(pandas_udaf.alias("a", "b")) \ {{< tabs "group-window-agg" >}} {{< tab "Java" >}} ```java -AggregateFunction myAggFunc = new MyMinMax(); -tableEnv.registerFunction("myAggFunc", myAggFunc); +tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class); Table table = input .window(Tumble.over(lit(5).minutes()) @@ -2596,7 +2599,7 @@ public class Top2 extends TableAggregateFunction, Top2A } } -tEnv.registerFunction("top2", new Top2()); +tEnv.createTemporarySystemFunction("top2", Top2.class); Table orders = tableEnv.from("Orders"); Table result = orders .groupBy($("key")) diff --git a/docs/content/docs/dev/table/tableApi.md b/docs/content/docs/dev/table/tableApi.md index ed367a1dfa05d..788e65053e67a 100644 --- a/docs/content/docs/dev/table/tableApi.md +++ b/docs/content/docs/dev/table/tableApi.md @@ -824,7 +824,7 @@ User-defined aggregation function can also be used with `DISTINCT` modifiers. To Table orders = tEnv.from("Orders"); // Use distinct aggregation for user-defined aggregate functions -tEnv.registerFunction("myUdagg", new MyUdagg()); +tEnv.createTemporarySystemFunction("myUdagg", MyUdagg.class); orders.groupBy("users") .select( $("users"), @@ -1025,8 +1025,7 @@ A row of the left (outer) table is dropped, if its table function call returns a {{< tab "Java" >}} ```java // register User-Defined Table Function -TableFunction> split = new MySplitUDTF(); -tableEnv.registerFunction("split", split); +tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class); // join Table orders = tableEnv.from("Orders"); @@ -1074,8 +1073,7 @@ Currently, the predicate of a table function left outer join can only be empty o {{< tab "Java" >}} ```java // register User-Defined Table Function -TableFunction> split = new MySplitUDTF(); -tableEnv.registerFunction("split", split); +tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class); // join Table orders = tableEnv.from("Orders"); @@ -1127,7 +1125,7 @@ Table ratesHistory = tableEnv.from("RatesHistory"); TemporalTableFunction rates = ratesHistory.createTemporalTableFunction( "r_proctime", "r_currency"); -tableEnv.registerFunction("rates", rates); +tableEnv.createTemporarySystemFunction("rates", rates); // join with "Orders" based on the time attribute and key Table orders = tableEnv.from("Orders"); @@ -2174,19 +2172,14 @@ The row-based operations generate outputs with multiple columns. Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type. ```java +@FunctionHint(input = @DataTypeHint("STRING"), output = @DataTypeHint("ROW")) public class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } - - @Override - public TypeInformation getResultType(Class[] signature) { - return Types.ROW(Types.STRING, Types.STRING); - } } -ScalarFunction func = new MyMapFunction(); -tableEnv.registerFunction("func", func); +tableEnv.createTemporarySystemFunction("func", MyMapFunction.class); Table table = input .map(call("func", $("c"))).as("a", "b"); @@ -2251,6 +2244,7 @@ table = input.map(pandas_func).alias('a', 'b') Performs a `flatMap` operation with a table function. ```java +@FunctionHint(output = @DataTypeHint("ROW")) public class MyFlatMapFunction extends TableFunction { public void eval(String str) { @@ -2261,15 +2255,9 @@ public class MyFlatMapFunction extends TableFunction { } } } - - @Override - public TypeInformation getResultType() { - return Types.ROW(Types.STRING, Types.INT); - } } -TableFunction func = new MyFlatMapFunction(); -tableEnv.registerFunction("func", func); +tableEnv.createTemporarySystemFunction("func", MyFlatMapFunction.class); Table table = input .flatMap(call("func", $("c"))).as("a", "b"); @@ -2363,13 +2351,21 @@ public class MyMinMax extends AggregateFunction { } @Override - public TypeInformation getResultType() { - return new RowTypeInfo(Types.INT, Types.INT); + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + return TypeInference.newBuilder() + .typedArguments(DataTypes.INT()) + .accumulatorTypeStrategy( + TypeStrategies.explicit( + DataTypes.STRUCTURED( + MyMinMaxAcc.class, + DataTypes.FIELD("min", DataTypes.INT()), + DataTypes.FIELD("max", DataTypes.INT())))) + .outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT())) + .build(); } } -AggregateFunction myAggFunc = new MyMinMax(); -tableEnv.registerFunction("myAggFunc", myAggFunc); +tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class); Table table = input .groupBy($("key")) .aggregate(call("myAggFunc", $("a")).as("x", "y")) @@ -2405,9 +2401,17 @@ class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] { Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max)) } - override def getResultType: TypeInformation[Row] = { - new RowTypeInfo(Types.INT, Types.INT) - } + override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = + TypeInference.newBuilder + .typedArguments(DataTypes.INT) + .accumulatorTypeStrategy( + TypeStrategies.explicit( + DataTypes.STRUCTURED( + classOf[MyMinMaxAcc], + DataTypes.FIELD("min", DataTypes.INT), + DataTypes.FIELD("max", DataTypes.INT)))) + .outputTypeStrategy(TypeStrategies.explicit(DataTypes.INT)) + .build } val myAggFunc = new MyMinMax @@ -2490,8 +2494,7 @@ Groups and aggregates a table on a [group window](#group-window) and possibly on {{< tabs "group-window-agg" >}} {{< tab "Java" >}} ```java -AggregateFunction myAggFunc = new MyMinMax(); -tableEnv.registerFunction("myAggFunc", myAggFunc); +tableEnv.createTemporarySystemFunction("myAggFunc", MyMinMax.class); Table table = input .window(Tumble.over(lit(5).minutes()) @@ -2596,7 +2599,7 @@ public class Top2 extends TableAggregateFunction, Top2A } } -tEnv.registerFunction("top2", new Top2()); +tEnv.createTemporarySystemFunction("top2", Top2.class); Table orders = tableEnv.from("Orders"); Table result = orders .groupBy($("key")) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/AggregatedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/AggregatedTable.java index 199fb5826b62e..88249a39b45ab 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/AggregatedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/AggregatedTable.java @@ -32,8 +32,7 @@ public interface AggregatedTable { *

Example: * *

{@code
-     * AggregateFunction aggFunc = new MyAggregateFunction();
-     * tableEnv.registerFunction("aggFunc", aggFunc);
+     * tableEnv.createTemporarySystemFunction("aggFunc", MyAggregateFunction.class);
      * table.groupBy($("key"))
      *   .aggregate(call("aggFunc", $("a"), $("b")).as("f0", "f1", "f2"))
      *   .select($("key"), $("f0"), $("f1"));
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FlatAggregateTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FlatAggregateTable.java
index 1eeebfa9648fa..bd5b31ecf0d6c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FlatAggregateTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/FlatAggregateTable.java
@@ -38,8 +38,7 @@ public interface FlatAggregateTable {
      * 

Example: * *

{@code
-     * TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
-     * tableEnv.registerFunction("tableAggFunc", tableAggFunc);
+     * tableEnv.createTemporarySystemFunction("tableAggFunc", MyTableAggregateFunction.class);
      * tab.groupBy($("key"))
      *   .flatAggregate(call("tableAggFunc", $("a"), $("b")).as("x", "y", "z"))
      *   .select($("key"), $("x"), $("y"), $("z"));
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupedTable.java
index b6e086414529c..d57682ac483aa 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupedTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupedTable.java
@@ -51,8 +51,7 @@ public interface GroupedTable {
      * 

Example: * *

{@code
-     * AggregateFunction aggFunc = new MyAggregateFunction();
-     * tableEnv.registerFunction("aggFunc", aggFunc);
+     * tableEnv.createTemporarySystemFunction("aggFunc", MyAggregateFunction.class);
      * tab.groupBy($("key"))
      *   .aggregate(call("aggFunc", $("a"), $("b")).as("f0", "f1", "f2"))
      *   .select($("key"), $("f0"), $("f1"));
@@ -76,8 +75,7 @@ public interface GroupedTable {
      * 

Example: * *

{@code
-     * TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
-     * tableEnv.registerFunction("tableAggFunc", tableAggFunc);
+     * tableEnv.createTemporarySystemFunction("tableAggFunc", MyTableAggregateFunction.class);
      * tab.groupBy($("key"))
      *   .flatAggregate(call("tableAggFunc", $("a"), $("b")).as("x", "y", "z"))
      *   .select($("key"), $("x"), $("y"), $("z"));
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
index afb859791f809..3d4ff727976b9 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
@@ -56,7 +56,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
   @TestTemplate
   def testTableAggregate(): Unit = {
     val top3 = new Top3WithMapView
-    tEnv.registerFunction("top3", top3)
+    tEnv.createTemporarySystemFunction("top3", top3)
     val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
     val resultTable = source
       .groupBy('a)
@@ -161,7 +161,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(
   private def createTableAggregateWithRetract()
       : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], Array[LogicalType]) = {
     val top3 = new Top3WithRetractInput
-    tEnv.registerFunction("top3", top3)
+    tEnv.createTemporarySystemFunction("top3", top3)
     val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
     val resultTable = source
       .groupBy('a)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index cc9e3bbfbdf33..c2783437f069a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -452,7 +452,7 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State
 
     val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c)
     tEnv.createTemporaryView("T", t)
-    tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull)
+    tEnv.createTemporarySystemFunction("CntNullNonNull", classOf[CountNullNonNull])
     val t1 = tEnv.sqlQuery("SELECT b, count(*), CntNullNonNull(DISTINCT c)  FROM T GROUP BY b")
 
     val sink = new TestingRetractSink
@@ -965,7 +965,7 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State
 
     val t = failingDataSource(data).toTable(tEnv, 'a, 'b)
     tEnv.createTemporaryView("MyTable", t)
-    tEnv.registerFunction("pojoFunc", new MyPojoAggFunction)
+    tEnv.createTemporarySystemFunction("pojoFunc", classOf[MyPojoAggFunction])
     tEnv.createTemporarySystemFunction("pojoToInt", MyPojoFunc)
 
     val sql = "SELECT pojoToInt(pojoFunc(b)) FROM MyTable group by a"
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
index 9f92f12439cd3..db70538d02dc2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala
@@ -94,7 +94,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
 
   @TestTemplate
   def testEventTimeSlidingWindow(): Unit = {
-    tEnv.registerFunction("concat_distinct_agg", new ConcatDistinctAggFunction())
+    tEnv.createTemporarySystemFunction("concat_distinct_agg", classOf[ConcatDistinctAggFunction])
     val sql =
       """
         |SELECT
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
index 990904cf6d8f1..14086a0ef4c45 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.catalog.DataTypeFactory
 import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, ScalarFunction}
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg
@@ -30,6 +31,7 @@ import org.apache.flink.table.planner.runtime.utils.{StreamingWithStateTestBase,
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.planner.utils.TableTestUtil
+import org.apache.flink.table.types.inference.{TypeInference, TypeStrategies}
 import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
 import org.apache.flink.types.Row
 
@@ -727,7 +729,7 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
           BasicTypeInfo.INT_TYPE_INFO))
       .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
     tEnv.createTemporaryView("MyTable", t)
-    tEnv.registerFunction("weightedAvg", new WeightedAvg)
+    tEnv.createTemporarySystemFunction("weightedAvg", classOf[WeightedAvg])
 
     val sqlQuery =
       s"""
@@ -819,8 +821,8 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
       .fromCollection(data)
       .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
     tEnv.createTemporaryView("MyTable", t)
-    tEnv.createTemporarySystemFunction("prefix", new PrefixingScalarFunc)
-    tEnv.registerFunction("countFrom", new RichAggFunc)
+    tEnv.createTemporarySystemFunction("prefix", classOf[PrefixingScalarFunc])
+    tEnv.createTemporarySystemFunction("countFrom", classOf[RichAggFunc])
     val prefix = "PREF"
     val startFrom = 4
     UserDefinedFunctionTestUtils
@@ -896,4 +898,13 @@ private class RichAggFunc extends AggregateFunction[Long, CountAcc] {
   override def createAccumulator(): CountAcc = CountAcc(start)
 
   override def getValue(accumulator: CountAcc): Long = accumulator.count
+
+  override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
+    TypeInference.newBuilder
+      .typedArguments(DataTypes.BIGINT())
+      .accumulatorTypeStrategy(TypeStrategies.explicit(
+        DataTypes.STRUCTURED(classOf[CountAcc], DataTypes.FIELD("count", DataTypes.BIGINT()))))
+      .outputTypeStrategy(TypeStrategies.explicit(DataTypes.BIGINT()))
+      .build
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
index 3466df1072735..3c4767eb8800d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
@@ -485,7 +485,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.createTemporaryView("T1", t1)
-    tEnv.registerFunction("LTCNT", new LargerThanCount)
+    tEnv.createTemporarySystemFunction("LTCNT", classOf[LargerThanCount])
 
     val sqlQuery = "SELECT " +
       "  c, b, " +
@@ -557,7 +557,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.createTemporaryView("T1", t1)
-    tEnv.createTemporarySystemFunction("LTCNT", new LargerThanCount)
+    tEnv.createTemporarySystemFunction("LTCNT", classOf[LargerThanCount])
 
     val sqlQuery = "SELECT " +
       " c, a, " +
@@ -778,7 +778,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.createTemporaryView("T1", t1)
-    tEnv.registerFunction("LTCNT", new LargerThanCount)
+    tEnv.createTemporarySystemFunction("LTCNT", new LargerThanCount)
 
     val sink = new TestingAppendSink
     tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
@@ -846,7 +846,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     tEnv.createTemporaryView("T1", t1)
-    tEnv.registerFunction("LTCNT", new LargerThanCount)
+    tEnv.createTemporarySystemFunction("LTCNT", classOf[LargerThanCount])
 
     val sink = new TestingAppendSink
     tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
@@ -1283,7 +1283,7 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
 
     val table = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'proctime.proctime)
     tEnv.createTemporaryView("MyTable", table)
-    tEnv.registerFunction("PairCount", new CountPairs)
+    tEnv.createTemporarySystemFunction("PairCount", classOf[CountPairs])
 
     val sqlQuery = "SELECT a, b, " +
       "  PairCount(a, b) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " +
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
index a9213387a2ed4..bac0e9edfad1b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -178,6 +178,16 @@ object UserDefinedFunctionTestUtils {
     override def createAccumulator(): Tuple1[Long] = Tuple1.of(0L)
 
     override def getValue(acc: Tuple1[Long]): Long = acc.f0
+
+    override def getTypeInference(typeFactory: DataTypeFactory): TypeInference = {
+      TypeInference.newBuilder
+        .typedArguments(DataTypes.STRING(), DataTypes.STRING())
+        .accumulatorTypeStrategy(TypeStrategies.explicit(
+          DataTypes.STRUCTURED(classOf[Tuple1[Long]], DataTypes.FIELD("f0", DataTypes.BIGINT()))))
+        .outputTypeStrategy(TypeStrategies.explicit(DataTypes.BIGINT()))
+        .build
+    }
+
   }
 
   // ------------------------------------------------------------------------------------