From e1d93566365e6fe6a8f780c88fc73df2b4466c29 Mon Sep 17 00:00:00 2001 From: "Luning (Lucas) Wang" Date: Mon, 18 Jul 2022 13:40:09 +0800 Subject: [PATCH] [FLINK-28509][table][python] Support REVERSE built-in function in Table API (#20278) --- docs/data/sql_functions.yml | 1 + docs/data/sql_functions_zh.yml | 1 + flink-python/pyflink/table/expression.py | 7 +++++++ flink-python/pyflink/table/tests/test_expression.py | 1 + .../flink/table/api/internal/BaseExpressions.java | 10 ++++++++++ .../table/functions/BuiltInFunctionDefinitions.java | 8 ++++++++ .../expressions/converter/DirectConvertRule.java | 2 ++ .../planner/expressions/ScalarFunctionsTest.scala | 4 ++-- 8 files changed, 32 insertions(+), 2 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 35ab90cec496a..9589f76edec11 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -364,6 +364,7 @@ string: table: STRING1.regexp(STRING2) description: Returns TRUE if any (possibly empty) substring of string1 matches the Java regular expression string2, otherwise FALSE. Returns NULL if any of arguments is NULL. - sql: REVERSE(string) + table: STRING.reverse() description: Returns the reversed string. Returns NULL if string is NULL. - sql: SPLIT_INDEX(string1, string2, integer1) table: STRING1.splitIndex(STRING2, INTEGER1) diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index ce0f95494599d..2f63d985857bd 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -462,6 +462,7 @@ string: 如果 string1 的任何(可能为空)子字符串与 Java 正则表达式 string2 匹配,则返回 TRUE,否则返回 FALSE。 如果有任一参数为 `NULL`,则返回 `NULL`。 - sql: REVERSE(string) + table: STRING.reverse() description: 返回反转的字符串。如果字符串为 `NULL`,则返回 `NULL`。 - sql: SPLIT_INDEX(string1, string2, integer1) table: STRING1.splitIndex(STRING2, INTEGER1) diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index fe48674f5b035..db1cc1f2d78e7 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1307,6 +1307,13 @@ def over(self, alias) -> 'Expression': """ return _binary_op("over")(self, alias) + @property + def reverse(self) -> 'Expression[str]': + """ + Reverse each character in current string. + """ + return _unary_op("reverse")(self) + def split_index(self, separator: Union[str, 'Expression[str]'], index: Union[int, 'Expression[int]']) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index bb201dcf22c77..07c5f2b694322 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -162,6 +162,7 @@ def test_expression(self): self.assertEqual('rtrim(a)', str(expr1.rtrim)) self.assertEqual('repeat(a, 3)', str(expr1.repeat(3))) self.assertEqual("over(a, 'w')", str(expr1.over('w'))) + self.assertEqual('reverse(a)', str(expr1.reverse)) self.assertEqual("splitIndex(a, ',', 3)", str(expr1.split_index(',', 3))) self.assertEqual("strToMap(a)", str(expr1.str_to_map())) self.assertEqual("strToMap(a, ';', ':')", str(expr1.str_to_map(';', ':'))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index bcd69ba1ce5bf..125f19c36337c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -140,6 +140,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REVERSE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RIGHT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROUND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROWTIME; @@ -1151,6 +1152,15 @@ public OutType repeat(InType n) { return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), objectToExpression(n))); } + /** + * Reverse each character in current string. + * + * @return a new string which character order is reverse to current string. + */ + public OutType reverse() { + return toApiSpecificExpression(unresolvedCall(REVERSE, toExpr())); + } + /** * Split target string with custom separator and pick the index-th(start with 0) result. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index b2c00f432c5cf..5703e268ebf8f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -910,6 +910,14 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) .build(); + public static final BuiltInFunctionDefinition REVERSE = + BuiltInFunctionDefinition.newBuilder() + .name("reverse") + .kind(SCALAR) + .inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) + .build(); + public static final BuiltInFunctionDefinition SPLIT_INDEX = BuiltInFunctionDefinition.newBuilder() .name("splitIndex") diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java index ba62cca026120..47aad8acb16f2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java @@ -134,6 +134,8 @@ public class DirectConvertRule implements CallExpressionConvertRule { BuiltInFunctionDefinitions.REGEXP, FlinkSqlOperatorTable.REGEXP); DEFINITION_OPERATOR_MAP.put( BuiltInFunctionDefinitions.REGEXP_REPLACE, FlinkSqlOperatorTable.REGEXP_REPLACE); + DEFINITION_OPERATOR_MAP.put( + BuiltInFunctionDefinitions.REVERSE, FlinkSqlOperatorTable.REVERSE); DEFINITION_OPERATOR_MAP.put( BuiltInFunctionDefinitions.SPLIT_INDEX, FlinkSqlOperatorTable.SPLIT_INDEX); DEFINITION_OPERATOR_MAP.put( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala index 761996663a99b..7edb65f439816 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala @@ -850,8 +850,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { @Test def testReverse(): Unit = { - testSqlApi("reverse(f38)", "==ABDIQA") - testSqlApi("reverse(f40)", "NULL") + testAllApis('f38.reverse(), "reverse(f38)", "==ABDIQA") + testAllApis('f40.reverse(), "reverse(f40)", "NULL") testSqlApi("reverse('hi')", "ih") testSqlApi("reverse('hhhi')", "ihhh") testSqlApi("reverse(CAST(null as VARCHAR))", "NULL")