Skip to content

Commit

Permalink
[FLINK-35932][table] Add the built-in function REGEXP_COUNT
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanhz authored and lincoln-lil committed Aug 21, 2024
1 parent 7fa4eba commit 8189bba
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 15 deletions.
22 changes: 15 additions & 7 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,14 @@ string:
- sql: REPLACE(string1, string2, string3)
table: STRING1.replace(STRING2, STRING3)
description: Returns a new string which replaces all the occurrences of STRING2 with STRING3 (non-overlapping) from STRING1. E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns 'zab'.
- sql: TRANSLATE(expr, fromStr, toStr)
table: expr.translate(fromStr, toStr)
- sql: REGEXP_COUNT(str, regex)
table: str.regexpCount(regex)
description: |
Translate an expr where all characters in fromStr have been replaced with those in toStr.
Returns the number of times str matches the regex pattern. regex must be a Java regular expression.
If toStr has a shorter length than fromStr, unmatched characters are removed.
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>
Returns a STRING of translated expr.
Returns an `INTEGER` representation of the number of matches. `NULL` if any of the arguments are `NULL` or regex is invalid.
- sql: REGEXP_EXTRACT(string1, string2[, integer])
table: STRING1.regexpExtract(STRING2[, INTEGER1])
description: |
Expand All @@ -368,6 +366,16 @@ string:
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>`
Returns an `ARRAY<STRING>` representation of all the matched substrings. `NULL` if any of the arguments are `NULL` or invalid.
- sql: TRANSLATE(expr, fromStr, toStr)
table: expr.translate(fromStr, toStr)
description: |
Translate an expr where all characters in fromStr have been replaced with those in toStr.
If toStr has a shorter length than fromStr, unmatched characters are removed.
`expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>`
Returns a `STRING` of translated expr.
- sql: INITCAP(string)
table: STRING.initCap()
description: Returns a new form of STRING with the first character of each word converted to uppercase and the rest characters to lowercase. Here a word means a sequences of alphanumeric characters.
Expand Down
22 changes: 15 additions & 7 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,14 @@ string:
返回一个新字符串,它用 STRING3(非重叠)替换 STRING1 中所有出现的 STRING2。
例如 `'hello world'.replace('world', 'flink')` 返回 `'hello flink'`;
`'ababab'.replace('abab', 'z')` 返回 `'zab'`。
- sql: TRANSLATE(expr, fromStr, toStr)
table: expr.translate(fromStr, toStr)
- sql: REGEXP_COUNT(str, regex)
table: str.regexpCount(regex)
description: |
将 expr 中所有出现在 fromStr 之中的字符替换为 toStr 中的相应字符
返回字符串 str 匹配正则表达式模式 regex 的次数。regex 必须是一个 Java 正则表达式
如果 toStr 的长度短于 fromStr,则未匹配的字符将被移除。
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>
返回 STRING 格式的转换结果。
返回一个 `INTEGER` 表示匹配成功的次数。如果任何参数为 `NULL` 或 regex 非法,则返回 `NULL`。
- sql: REGEXP_EXTRACT(string1, string2[, integer])
table: STRING1.regexpExtract(STRING2[, INTEGER1])
description: |
Expand All @@ -435,6 +433,16 @@ string:
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>`
返回一个 `ARRAY<STRING>`,表示所有匹配的子串。如果任何参数为 `NULL`或非法,则返回 `NULL`。
- sql: TRANSLATE(expr, fromStr, toStr)
table: expr.translate(fromStr, toStr)
description: |
将 expr 中所有出现在 fromStr 之中的字符替换为 toStr 中的相应字符。
如果 toStr 的长度短于 fromStr,则未匹配的字符将被移除。
`expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>`
返回 `STRING` 格式的转换结果。
- sql: INITCAP(string)
table: STRING.initCap()
description: |
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ string functions
Expression.rpad
Expression.overlay
Expression.regexp
Expression.regexp_count
Expression.regexp_replace
Expression.regexp_extract
Expression.regexp_extract_all
Expand Down
11 changes: 11 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,17 @@ def regexp(self, regex: Union[str, 'Expression[str]']) -> 'Expression[str]':
"""
return _binary_op("regexp")(self, regex)

def regexp_count(self, regex) -> 'Expression':
"""
Returns the number of times str matches the regex pattern.
regex must be a Java regular expression.
null if any of the arguments are null or regex is invalid.
:param regex: A STRING expression with a matching pattern.
:return: An INTEGER representation of the number of matches.
"""
return _binary_op("regexpCount")(self, regex)

def regexp_replace(self,
regex: Union[str, 'Expression[str]'],
replacement: Union[str, 'Expression[str]']) -> 'Expression[str]':
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def test_expression(self):

# regexp functions
self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
self.assertEqual('regexpExtract(a, b, 3)', str(expr1.regexp_extract(expr2, 3)))
self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', str(expr1.regexp_extract_all(expr2)))
self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', str(expr1.regexp_extract_all(expr2, 3)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.PROCTIME;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RADIANS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_COUNT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE;
Expand Down Expand Up @@ -1136,6 +1137,19 @@ public OutType regexp(InType regex) {
return toApiSpecificExpression(unresolvedCall(REGEXP, toExpr(), objectToExpression(regex)));
}

/**
* Returns the number of times {@code str} matches the {@code regex} pattern. {@code regex} must
* be a Java regular expression.
*
* @param regex A STRING expression with a matching pattern.
* @return An INTEGER representation of the number of matches. <br>
* null if any of the arguments are null or {@code regex} is invalid.
*/
public OutType regexpCount(InType regex) {
return toApiSpecificExpression(
unresolvedCall(REGEXP_COUNT, toExpr(), objectToExpression(regex)));
}

/**
* Returns a string with all substrings that match the regular expression consecutively being
* replaced.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();

public static final BuiltInFunctionDefinition REGEXP_COUNT =
BuiltInFunctionDefinition.newBuilder()
.name("REGEXP_COUNT")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
Arrays.asList("str", "regex"),
Arrays.asList(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING))))
.outputTypeStrategy(explicit(DataTypes.INT()))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.RegexpCountFunction")
.build();

public static final BuiltInFunctionDefinition REGEXP_EXTRACT =
BuiltInFunctionDefinition.newBuilder()
.name("regexpExtract")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,94 @@

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.lit;

/** Test Regexp functions correct behaviour. */
class RegexpFunctionsITCase extends BuiltInFunctionTestBase {

@Override
Stream<TestSetSpec> getTestSetSpecs() {
return Stream.of(regexpExtractTestCases(), regexpExtractAllTestCases()).flatMap(s -> s);
return Stream.of(
regexpCountTestCases(),
regexpExtractTestCases(),
regexpExtractAllTestCases())
.flatMap(s -> s);
}

private Stream<TestSetSpec> regexpCountTestCases() {
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT)
.onFieldsWithData(null, "abcdeabde")
.andDataTypes(DataTypes.STRING(), DataTypes.STRING())
// null input
.testResult(
$("f0").regexpCount($("f1")),
"REGEXP_COUNT(f0, f1)",
null,
DataTypes.INT())
.testResult(
$("f1").regexpCount($("f0")),
"REGEXP_COUNT(f1, f0)",
null,
DataTypes.INT())
// invalid regexp
.testResult(
$("f1").regexpCount("("),
"REGEXP_COUNT(f1, '(')",
null,
DataTypes.INT())
// normal cases
.testResult(
lit("hello world! Hello everyone!").regexpCount("Hello"),
"REGEXP_COUNT('hello world! Hello everyone!', 'Hello')",
1,
DataTypes.INT())
.testResult(
lit("abcabcabc").regexpCount("abcab"),
"REGEXP_COUNT('abcabcabc', 'abcab')",
1,
DataTypes.INT())
.testResult(
lit("abcd").regexpCount("z"),
"REGEXP_COUNT('abcd', 'z')",
0,
DataTypes.INT())
.testResult(
lit("^abc").regexpCount("\\^abc"),
"REGEXP_COUNT('^abc', '\\^abc')",
1,
DataTypes.INT())
.testResult(
lit("a.b.c.d").regexpCount("\\."),
"REGEXP_COUNT('a.b.c.d', '\\.')",
3,
DataTypes.INT())
.testResult(
lit("a*b*c*d").regexpCount("\\*"),
"REGEXP_COUNT('a*b*c*d', '\\*')",
3,
DataTypes.INT())
.testResult(
lit("abc123xyz456").regexpCount("\\d"),
"REGEXP_COUNT('abc123xyz456', '\\d')",
6,
DataTypes.INT())
.testResult(
lit("Helloworld! Hello everyone!").regexpCount("\\bHello\\b"),
"REGEXP_COUNT('Helloworld! Hello everyone!', '\\bHello\\b')",
1,
DataTypes.INT()),
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT, "Validation Error")
.onFieldsWithData(1024)
.andDataTypes(DataTypes.INT())
.testTableApiValidationError(
$("f0").regexpCount("1024"),
"Invalid input arguments. Expected signatures are:\n"
+ "REGEXP_COUNT(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)")
.testSqlValidationError(
"REGEXP_COUNT(f0, '1024')",
"Invalid input arguments. Expected signatures are:\n"
+ "REGEXP_COUNT(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)"));
}

private Stream<TestSetSpec> regexpExtractTestCases() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.functions.scalar;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.SpecializedFunction;

import javax.annotation.Nullable;

import java.util.regex.Matcher;
import java.util.regex.PatternSyntaxException;

import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE;

/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_COUNT}. */
@Internal
public class RegexpCountFunction extends BuiltInScalarFunction {

public RegexpCountFunction(SpecializedFunction.SpecializedContext context) {
super(BuiltInFunctionDefinitions.REGEXP_COUNT, context);
}

public @Nullable Integer eval(@Nullable StringData str, @Nullable StringData regex) {
if (str == null || regex == null) {
return null;
}

Matcher matcher;
try {
matcher = REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
} catch (PatternSyntaxException e) {
return null;
}

int count = 0;
while (matcher.find()) {
count++;
}

return count;
}
}

0 comments on commit 8189bba

Please sign in to comment.