Skip to content

Commit

Permalink
[FLINK-28508][table][python] Support SPLIT_INDEX and STR_TO_MAP built…
Browse files Browse the repository at this point in the history
…-in function in Table API (apache#20250)
  • Loading branch information
a49a authored Jul 15, 2022
1 parent ffb6b43 commit e85c303
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 6 deletions.
2 changes: 2 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,10 @@ string:
- sql: REVERSE(string)
description: Returns the reversed string. Returns NULL if string is NULL.
- sql: SPLIT_INDEX(string1, string2, integer1)
table: STRING1.splitIndex(STRING2, INTEGER1)
description: Splits string1 by the delimiter string2, returns the integerth (zero-based) string of the split strings. Returns NULL if integer is negative. Returns NULL if any of arguments is NULL.
- sql: STR_TO_MAP(string1[, string2, string3])
table: STRING1.strToMap([STRING2, STRING3])
description: |
Returns a map after splitting the string1 into key/value pairs using delimiters. string2 is the pair delimiter, default is ','. And string3 is the key-value delimiter, default is '='.
Both pair delimiter and key-value delimiter are treated as regular expressions. So special characters (e.g. `<([{\^-=$!|]})?*+.>`) need to be properly escaped before using as a delimiter literally.
Expand Down
4 changes: 3 additions & 1 deletion docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,12 @@ string:
- sql: REVERSE(string)
description: 返回反转的字符串。如果字符串为 `NULL`,则返回 `NULL`。
- sql: SPLIT_INDEX(string1, string2, integer1)
table: STRING1.splitIndex(STRING2, INTEGER1)
description: |
通过分隔符 string2 拆分 string1,返回拆分字符串的第 integer(从零开始)个字符串。如果整数为负,则返回 `NULL`。
通过分隔符 string2 拆分 string1,返回分隔后这组字符串的第 integer(从零开始)个字符串。如果整数为负,则返回 `NULL`。
如果有任一参数为 `NULL`,则返回 `NULL`。
- sql: STR_TO_MAP(string1[, string2, string3])
table: STRING1.strToMap([STRING2, STRING3])
description: |
使用分隔符将 string1 拆分为键值对后返回一个 map。string2 是 pair 分隔符,默认为 ','。string3 是键值分隔符,默认为 '='。
pair 分隔符与键值分隔符均为正则表达式,当使用特殊字符作为分隔符时请提前进行转义,例如 `<([{\^-=$!|]})?*+.>`。
Expand Down
23 changes: 21 additions & 2 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
'JsonQueryOnEmptyOrError'
]


_aggregation_doc = """
{op_desc}
Expand Down Expand Up @@ -161,7 +160,7 @@ def _make_aggregation_doc():
Expression.sum: "Returns the sum of the numeric field across all input values. "
"If all values are null, null is returned.",
Expression.sum0: "Returns the sum of the numeric field across all input values. "
"If all values are null, 0 is returned.",
"If all values are null, 0 is returned.",
Expression.min: "Returns the minimum value of field across all input values.",
Expression.max: "Returns the maximum value of field across all input values.",
Expression.count: "Returns the number of input rows for which the field is not null.",
Expand Down Expand Up @@ -1308,6 +1307,26 @@ def over(self, alias) -> 'Expression':
"""
return _binary_op("over")(self, alias)

def split_index(self, separator: Union[str, 'Expression[str]'],
index: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
Split target string with custom separator and pick the index-th(start with 0) result.
"""
return _ternary_op("splitIndex")(self, separator, index)

def str_to_map(self, list_delimiter: Union[str, 'Expression[str]'] = None,
key_value_delimiter: Union[str, 'Expression[str]'] = None) -> 'Expression[dict]':
"""
Creates a map by parsing text. Split text into key-value pairs using two delimiters. The
first delimiter separates pairs, and the second delimiter separates key and value. Both
list_delimiter and key_value_delimiter are treated as regular expressions.
Default delimiters are used: ',' as list_delimiter and '=' as key_value_delimiter.
"""
if list_delimiter is None or key_value_delimiter is None:
return _unary_op("strToMap")(self)
else:
return _ternary_op("strToMap")(self, list_delimiter, key_value_delimiter)

# ---------------------------- temporal functions ----------------------------------

@property
Expand Down
3 changes: 3 additions & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ def test_expression(self):
self.assertEqual('rtrim(a)', str(expr1.rtrim))
self.assertEqual('repeat(a, 3)', str(expr1.repeat(3)))
self.assertEqual("over(a, 'w')", str(expr1.over('w')))
self.assertEqual("splitIndex(a, ',', 3)", str(expr1.split_index(',', 3)))
self.assertEqual("strToMap(a)", str(expr1.str_to_map()))
self.assertEqual("strToMap(a, ';', ':')", str(expr1.str_to_map(';', ':')))

# temporal functions
self.assertEqual('cast(a, DATE)', str(expr1.to_date))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,11 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIMILAR;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SINH;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT_INDEX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SQRT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_POP;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_SAMP;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STR_TO_MAP;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUBSTR;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUBSTRING;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUM;
Expand Down Expand Up @@ -1149,6 +1151,52 @@ public OutType repeat(InType n) {
return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), objectToExpression(n)));
}

/**
* Split target string with custom separator and pick the index-th(start with 0) result.
*
* @param separator custom separator.
* @param index index of the result which you want.
* @return the string at the index of split results.
*/
public OutType splitIndex(InType separator, InType index) {
return toApiSpecificExpression(
unresolvedCall(
SPLIT_INDEX,
toExpr(),
objectToExpression(separator),
objectToExpression(index)));
}

/**
* Creates a map by parsing text. Split text into key-value pairs using two delimiters. The
* first delimiter separates pairs, and the second delimiter separates key and value. If only
* one parameter is given, default delimiters are used: ',' as delimiter1 and '=' as delimiter2.
* Both delimiters are treated as regular expressions.
*
* @return the map
*/
public OutType strToMap() {
return toApiSpecificExpression(unresolvedCall(STR_TO_MAP, toExpr()));
}

/**
* Creates a map by parsing text. Split text into key-value pairs using two delimiters. The
* first delimiter separates pairs, and the second delimiter separates key and value. Both
* {@code listDelimiter} and {@code keyValueDelimiter} are treated as regular expressions.
*
* @param listDelimiter the delimiter to separates pairs
* @param keyValueDelimiter the delimiter to separates key and value
* @return the map
*/
public OutType strToMap(InType listDelimiter, InType keyValueDelimiter) {
return toApiSpecificExpression(
unresolvedCall(
STR_TO_MAP,
toExpr(),
objectToExpression(listDelimiter),
objectToExpression(keyValueDelimiter)));
}

// Temporal operations

/** Parses a date string in the form "yyyy-MM-dd" to a SQL Date. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,35 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();

public static final BuiltInFunctionDefinition SPLIT_INDEX =
BuiltInFunctionDefinition.newBuilder()
.name("splitIndex")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeRoot.INTEGER)))
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();

public static final BuiltInFunctionDefinition STR_TO_MAP =
BuiltInFunctionDefinition.newBuilder()
.name("strToMap")
.kind(SCALAR)
.inputTypeStrategy(
or(
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING))))
.outputTypeStrategy(
nullableIfArgs(
explicit(
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))))
.build();

// --------------------------------------------------------------------------------------------
// Math functions
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public class DirectConvertRule implements CallExpressionConvertRule {
BuiltInFunctionDefinitions.REGEXP, FlinkSqlOperatorTable.REGEXP);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.REGEXP_REPLACE, FlinkSqlOperatorTable.REGEXP_REPLACE);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.SPLIT_INDEX, FlinkSqlOperatorTable.SPLIT_INDEX);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.STR_TO_MAP, FlinkSqlOperatorTable.STR_TO_MAP);

// math functions
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.MINUS, FlinkSqlOperatorTable.MINUS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {

@Test
def testSplitIndex(): Unit = {
testSqlApi("split_index(f38, 'I', 0)", "AQ")
testAllApis('f38.splitIndex("I", 0), "split_index(f38, 'I', 0)", "AQ")
testSqlApi("split_index(f38, 'I', 2)", "NULL")
testSqlApi("split_index(f38, 'I', -1)", "NULL")
testSqlApi("split_index(f38, CAST(null as VARCHAR), 0)", "NULL")
Expand Down Expand Up @@ -2589,8 +2589,11 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {

@Test
def testStringToMap(): Unit = {
testSqlApi("STR_TO_MAP('k1=v1,k2=v2')", "{k1=v1, k2=v2}")
testSqlApi("STR_TO_MAP('k1:v1;k2: v2', ';', ':')", "{k1=v1, k2= v2}")
testAllApis("k1=v1,k2=v2".strToMap(), "STR_TO_MAP('k1=v1,k2=v2')", "{k1=v1, k2=v2}")
testAllApis(
"k1:v1;k2: v2".strToMap(";", ":"),
"STR_TO_MAP('k1:v1;k2: v2', ';', ':')",
"{k1=v1, k2= v2}")
testSqlApi("STR_TO_MAP('k1$$v1|k2$$ v2', '\\|', '\\$\\$')", "{k1=v1, k2= v2}")

// test empty
Expand Down

0 comments on commit e85c303

Please sign in to comment.