Skip to content

Commit

Permalink
[FLINK-28509][table][python] Support REVERSE built-in function in Tab…
Browse files Browse the repository at this point in the history
…le API (apache#20278)
  • Loading branch information
a49a authored Jul 18, 2022
1 parent 6eee0c6 commit e1d9356
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ string:
table: STRING1.regexp(STRING2)
description: Returns TRUE if any (possibly empty) substring of string1 matches the Java regular expression string2, otherwise FALSE. Returns NULL if any of arguments is NULL.
- sql: REVERSE(string)
table: STRING.reverse()
description: Returns the reversed string. Returns NULL if string is NULL.
- sql: SPLIT_INDEX(string1, string2, integer1)
table: STRING1.splitIndex(STRING2, INTEGER1)
Expand Down
1 change: 1 addition & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ string:
如果 string1 的任何(可能为空)子字符串与 Java 正则表达式 string2 匹配,则返回 TRUE,否则返回 FALSE。
如果有任一参数为 `NULL`,则返回 `NULL`。
- sql: REVERSE(string)
table: STRING.reverse()
description: 返回反转的字符串。如果字符串为 `NULL`,则返回 `NULL`。
- sql: SPLIT_INDEX(string1, string2, integer1)
table: STRING1.splitIndex(STRING2, INTEGER1)
Expand Down
7 changes: 7 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,13 @@ def over(self, alias) -> 'Expression':
"""
return _binary_op("over")(self, alias)

@property
def reverse(self) -> 'Expression[str]':
"""
Reverse each character in current string.
"""
return _unary_op("reverse")(self)

def split_index(self, separator: Union[str, 'Expression[str]'],
index: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def test_expression(self):
self.assertEqual('rtrim(a)', str(expr1.rtrim))
self.assertEqual('repeat(a, 3)', str(expr1.repeat(3)))
self.assertEqual("over(a, 'w')", str(expr1.over('w')))
self.assertEqual('reverse(a)', str(expr1.reverse))
self.assertEqual("splitIndex(a, ',', 3)", str(expr1.split_index(',', 3)))
self.assertEqual("strToMap(a)", str(expr1.str_to_map()))
self.assertEqual("strToMap(a, ';', ':')", str(expr1.str_to_map(';', ':')))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REVERSE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RIGHT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROUND;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ROWTIME;
Expand Down Expand Up @@ -1151,6 +1152,15 @@ public OutType repeat(InType n) {
return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), objectToExpression(n)));
}

/**
* Reverse each character in current string.
*
* @return a new string which character order is reverse to current string.
*/
public OutType reverse() {
return toApiSpecificExpression(unresolvedCall(REVERSE, toExpr()));
}

/**
* Split target string with custom separator and pick the index-th(start with 0) result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,14 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();

public static final BuiltInFunctionDefinition REVERSE =
BuiltInFunctionDefinition.newBuilder()
.name("reverse")
.kind(SCALAR)
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();

public static final BuiltInFunctionDefinition SPLIT_INDEX =
BuiltInFunctionDefinition.newBuilder()
.name("splitIndex")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public class DirectConvertRule implements CallExpressionConvertRule {
BuiltInFunctionDefinitions.REGEXP, FlinkSqlOperatorTable.REGEXP);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.REGEXP_REPLACE, FlinkSqlOperatorTable.REGEXP_REPLACE);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.REVERSE, FlinkSqlOperatorTable.REVERSE);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.SPLIT_INDEX, FlinkSqlOperatorTable.SPLIT_INDEX);
DEFINITION_OPERATOR_MAP.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {

@Test
def testReverse(): Unit = {
testSqlApi("reverse(f38)", "==ABDIQA")
testSqlApi("reverse(f40)", "NULL")
testAllApis('f38.reverse(), "reverse(f38)", "==ABDIQA")
testAllApis('f40.reverse(), "reverse(f40)", "NULL")
testSqlApi("reverse('hi')", "ih")
testSqlApi("reverse('hhhi')", "ihhh")
testSqlApi("reverse(CAST(null as VARCHAR))", "NULL")
Expand Down

0 comments on commit e1d9356

Please sign in to comment.