Skip to content

Commit

Permalink
[FLINK-35963][table] Add the built-in function REGEXP_SUBSTR
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanhz authored and lincoln-lil committed Aug 21, 2024
1 parent cbbb1cd commit e8c1d1b
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 34 deletions.
8 changes: 8 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,14 @@ string:
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
Returns an `INTEGER` representation of the first matched substring index. `NULL` if any of the arguments are `NULL` or regex is invalid.
- sql: REGEXP_SUBSTR(str, regex)
table: str.regexpSubstr(regex)
description: |
Returns the first substring in str that matches regex.
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
Returns an `STRING` representation of the first matched substring. `NULL` if any of the arguments are `NULL` or regex if invalid or pattern is not found.
- sql: TRANSLATE(expr, fromStr, toStr)
table: expr.translate(fromStr, toStr)
description: |
Expand Down
8 changes: 8 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,14 @@ string:
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
返回一个 `INTEGER` 表示 str 中第一个匹配 regex 的子字符串索引。如果任何参数为 `NULL` 或 regex 非法,则返回 `NULL`。
- sql: REGEXP_SUBSTR(str, regex)
table: str.regexpSubStr(regex)
description: |
返回 str 中第一个匹配 regex 的子字符串。
`str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
返回一个 `STRING` 表示 str 中第一个匹配 regex 的子字符串。如果任何参数为 `NULL` 或 regex 非法或匹配失败,则返回 `NULL`。
- sql: TRANSLATE(expr, fromStr, toStr)
table: expr.translate(fromStr, toStr)
description: |
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ string functions
Expression.regexp_extract
Expression.regexp_extract_all
Expression.regexp_instr
Expression.regexp_substr
Expression.from_base64
Expression.to_base64
Expression.ascii
Expand Down
10 changes: 10 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,16 @@ def regexp_instr(self, regex) -> 'Expression':
"""
return _binary_op("regexpInstr")(self, regex)

def regexp_substr(self, regex) -> 'Expression':
"""
Returns the first substring in str that matches regex.
null if any of the arguments are null or regex is invalid or pattern is not found.
:param regex: A STRING expression with a matching pattern.
:return: A STRING representation of the first matched substring.
"""
return _binary_op("regexpSubstr")(self, regex)

@property
def from_base64(self) -> 'Expression[str]':
"""
Expand Down
1 change: 1 addition & 0 deletions flink-python/pyflink/table/tests/test_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def test_expression(self):
self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', str(expr1.regexp_extract_all(expr2, 3)))
self.assertEqual("regexpReplace(a, b, 'abc')", str(expr1.regexp_replace(expr2, 'abc')))
self.assertEqual("REGEXP_INSTR(a, b)", str(expr1.regexp_instr(expr2)))
self.assertEqual("REGEXP_SUBSTR(a, b)", str(expr1.regexp_substr(expr2)))

# temporal functions
self.assertEqual('cast(a, DATE)', str(expr1.to_date))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_INSTR;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_SUBSTR;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REVERSE;
Expand Down Expand Up @@ -1222,6 +1223,19 @@ public OutType regexpInstr(InType regex) {
unresolvedCall(REGEXP_INSTR, toExpr(), objectToExpression(regex)));
}

/**
* Returns the first substring in {@code str} that matches {@code regex}.
*
* @param regex A STRING expression with a matching pattern.
* @return A STRING representation of the first matched substring. <br>
* null if any of the arguments are null or {@code regex} is invalid or pattern is not
* found.
*/
public OutType regexpSubstr(InType regex) {
return toApiSpecificExpression(
unresolvedCall(REGEXP_SUBSTR, toExpr(), objectToExpression(regex)));
}

/**
* Returns a string by quotes a string as a JSON value and wrapping it with double quote
* characters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.RegexpInstrFunction")
.build();

public static final BuiltInFunctionDefinition REGEXP_SUBSTR =
BuiltInFunctionDefinition.newBuilder()
.name("REGEXP_SUBSTR")
.kind(SCALAR)
.inputTypeStrategy(
sequence(
Arrays.asList("str", "regex"),
Arrays.asList(
logical(LogicalTypeFamily.CHARACTER_STRING),
logical(LogicalTypeFamily.CHARACTER_STRING))))
.outputTypeStrategy(explicit(DataTypes.STRING()))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.RegexpSubstrFunction")
.build();

public static final BuiltInFunctionDefinition JSON_QUOTE =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_QUOTE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ Stream<TestSetSpec> getTestSetSpecs() {
regexpCountTestCases(),
regexpExtractTestCases(),
regexpExtractAllTestCases(),
regexpInstrTestCases())
regexpInstrTestCases(),
regexpSubstrTestCases())
.flatMap(s -> s);
}

Expand Down Expand Up @@ -312,4 +313,78 @@ private Stream<TestSetSpec> regexpInstrTestCases() {
"Invalid input arguments. Expected signatures are:\n"
+ "REGEXP_INSTR(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)"));
}

private Stream<TestSetSpec> regexpSubstrTestCases() {
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_SUBSTR)
.onFieldsWithData(null, "abcdeabde", "100-200, 300-400")
.andDataTypes(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING())
// null input
.testResult(
$("f0").regexpSubstr($("f1")),
"REGEXP_SUBSTR(f0, f1)",
null,
DataTypes.STRING())
.testResult(
$("f1").regexpSubstr($("f0")),
"REGEXP_SUBSTR(f1, f0)",
null,
DataTypes.STRING())
// invalid regexp
.testResult(
$("f1").regexpSubstr("("),
"REGEXP_SUBSTR(f1, '(')",
null,
DataTypes.STRING())
// not found
.testResult(
$("f2").regexpSubstr("[a-z]"),
"REGEXP_SUBSTR(f2, '[a-z]')",
null,
DataTypes.STRING())
// border chars
.testResult(
lit("Helloworld! Hello everyone!").regexpSubstr("\\bHello\\b"),
"REGEXP_SUBSTR('Helloworld! Hello everyone!', '\\bHello\\b')",
"Hello",
DataTypes.STRING())
.testResult(
$("f2").regexpSubstr("(\\d+)-(\\d+)$"),
"REGEXP_SUBSTR(f2, '(\\d+)-(\\d+)$')",
"300-400",
DataTypes.STRING())
// normal cases
.testResult(
lit("hello world! Hello everyone!").regexpSubstr("Hello"),
"REGEXP_SUBSTR('hello world! Hello everyone!', 'Hello')",
"Hello",
DataTypes.STRING())
.testResult(
lit("a.b.c.d").regexpSubstr("\\."),
"REGEXP_SUBSTR('a.b.c.d', '\\.')",
".",
DataTypes.STRING())
.testResult(
lit("abc123xyz456").regexpSubstr("\\d"),
"REGEXP_SUBSTR('abc123xyz456', '\\d')",
"1",
DataTypes.STRING())
.testResult(
$("f2").regexpSubstr("(\\d+)-(\\d+)"),
"REGEXP_SUBSTR(f2, '(\\d+)-(\\d+)')",
"100-200",
DataTypes.STRING()),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.REGEXP_SUBSTR, "Validation Error")
.onFieldsWithData(1024)
.andDataTypes(DataTypes.INT())
.testTableApiValidationError(
$("f0").regexpSubstr("1024"),
"Invalid input arguments. Expected signatures are:\n"
+ "REGEXP_SUBSTR(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)")
.testSqlValidationError(
"REGEXP_SUBSTR(f0, '1024')",
"Invalid input arguments. Expected signatures are:\n"
+ "REGEXP_SUBSTR(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.DecimalDataUtils;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.binary.BinaryStringDataUtil;
import org.apache.flink.table.utils.EncodingUtils;
Expand All @@ -30,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -46,6 +49,7 @@
import java.util.regex.MatchResult;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

import static org.apache.flink.table.data.DecimalDataUtils.castFrom;
import static org.apache.flink.table.data.DecimalDataUtils.castToIntegral;
Expand All @@ -64,7 +68,7 @@ public class SqlFunctionUtils {

private static final Logger LOG = LoggerFactory.getLogger(SqlFunctionUtils.class);

public static final ThreadLocalCache<String, Pattern> REGEXP_PATTERN_CACHE =
private static final ThreadLocalCache<String, Pattern> REGEXP_PATTERN_CACHE =
new ThreadLocalCache<String, Pattern>() {
@Override
public Pattern getNewInstance(String regex) {
Expand Down Expand Up @@ -472,6 +476,21 @@ public static String regexpExtract(String str, String regex) {
return regexpExtract(str, regex, 0);
}

/**
* Returns a Matcher object that represents the result of matching given StringData against a
* specified regular expression pattern.
*/
public static Matcher getRegexpMatcher(@Nullable StringData str, @Nullable StringData regex) {
if (str == null || regex == null) {
return null;
}
try {
return REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
} catch (PatternSyntaxException e) {
return null;
}
}

/**
* Parse string as key-value string and return the value matches key name. example:
* keyvalue('k1=v1;k2=v2', ';', '=', 'k2') = 'v2' keyvalue('k1:v1,k2:v2', ',', ':', 'k3') = NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import javax.annotation.Nullable;

import java.util.regex.Matcher;
import java.util.regex.PatternSyntaxException;

import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE;
import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.getRegexpMatcher;

/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_COUNT}. */
@Internal
Expand All @@ -39,22 +38,15 @@ public RegexpCountFunction(SpecializedFunction.SpecializedContext context) {
}

public @Nullable Integer eval(@Nullable StringData str, @Nullable StringData regex) {
if (str == null || regex == null) {
return null;
}

Matcher matcher;
try {
matcher = REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
} catch (PatternSyntaxException e) {
Matcher matcher = getRegexpMatcher(str, regex);
if (matcher == null) {
return null;
}

int count = 0;
while (matcher.find()) {
count++;
}

return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.PatternSyntaxException;

import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE;
import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.getRegexpMatcher;

/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT_ALL}. */
@Internal
Expand All @@ -49,25 +48,22 @@ public RegexpExtractAllFunction(SpecializedFunction.SpecializedContext context)

public @Nullable ArrayData eval(
@Nullable StringData str, @Nullable StringData regex, @Nullable Number extractIndex) {
if (str == null || regex == null || extractIndex == null) {
if (extractIndex == null || extractIndex.longValue() < 0) {
return null;
}

Matcher matcher;
try {
matcher = REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
} catch (PatternSyntaxException e) {
Matcher matcher = getRegexpMatcher(str, regex);
if (matcher == null) {
return null;
}

long groupIndex = extractIndex.longValue();
if (groupIndex < 0 || matcher.groupCount() < groupIndex) {
if (matcher.groupCount() < extractIndex.longValue()) {
return null;
}

List<StringData> list = new ArrayList<>();
while (matcher.find()) {
list.add(BinaryStringData.fromString(matcher.group((int) groupIndex)));
list.add(BinaryStringData.fromString(matcher.group(extractIndex.intValue())));
}

return new GenericArrayData(list.toArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import javax.annotation.Nullable;

import java.util.regex.Matcher;
import java.util.regex.PatternSyntaxException;

import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE;
import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.getRegexpMatcher;

/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_INSTR}. */
@Internal
Expand All @@ -39,17 +38,10 @@ public RegexpInstrFunction(SpecializedContext context) {
}

public @Nullable Integer eval(@Nullable StringData str, @Nullable StringData regex) {
if (str == null || regex == null) {
Matcher matcher = getRegexpMatcher(str, regex);
if (matcher == null) {
return null;
}

Matcher matcher;
try {
matcher = REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
} catch (PatternSyntaxException e) {
return null;
}

return matcher.find() ? matcher.start() + 1 : 0;
}
}
Loading

0 comments on commit e8c1d1b

Please sign in to comment.