Skip to content

Commit

Permalink
[FLINK-28743][table-planner] Supports validating the determinism for …
Browse files Browse the repository at this point in the history
…StreamPhysicalMatchRecognize

This closes apache#22981
  • Loading branch information
lincoln-lil authored Jul 14, 2023
1 parent e5324c0 commit 8829b5e
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,18 @@ public StreamPhysicalRel visit(
// transit requireDeterminism transparently
return transmitDeterminismRequirement(rel, requireDeterminism);
} else if (rel instanceof StreamPhysicalMatch) {
// TODO to be supported in FLINK-28743
throw new TableException(
"Unsupported to resolve non-deterministic issue in match-recognize.");
StreamPhysicalMatch match = (StreamPhysicalMatch) rel;
if (inputInsertOnly(match)) {
// similar to over aggregate, output is insert only when input is insert only, so
// required determinism always be satisfied here.
return transmitDeterminismRequirement(match, NO_REQUIRED_DETERMINISM);
} else {
// The DEFINE and MEASURES clauses in match-recognize have similar meanings to the
// WHERE and SELECT clauses in SQL query, we should analyze and transmit the
// determinism requirement via the RexNodes in these two clauses.
throw new UnsupportedOperationException(
"Unsupported to resolve non-deterministic issue in match-recognize when input has updates.");
}
} else {
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3158,6 +3158,86 @@ Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
+- ChangelogNormalize(key=[a])
+- Exchange(distribution=[hash[a]])
+- TableSourceScan(table=[[default_catalog, default_database, upsert_src, project=[a, b, c], metadata=[]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testMatchRecognizeSinkWithPk[nonDeterministicUpdateStrategy=IGNORE]">
<Resource name="sql">
<![CDATA[
insert into sink_with_pk
SELECT T1.a, T1.b, cast(T1.matchProctime as varchar)
FROM v1
MATCH_RECOGNIZE (
PARTITION BY c
ORDER BY proctime
MEASURES
A.a as a,
A.b as b,
MATCH_PROCTIME() as matchProctime
ONE ROW PER MATCH
PATTERN (A)
DEFINE
A AS A.a > 1
) AS T1
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, EXPR$2])
+- LogicalProject(a=[$1], b=[$2], EXPR$2=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL])
+- LogicalMatch(partition=[[2]], order=[[4 ASC-nulls-first]], outputFields=[[c, a, b, matchProctime]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$0, 0), 1)]], inputFields=[[a, b, c, d, proctime]])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, EXPR$2])
+- Calc(select=[a, b, CAST(PROCTIME_MATERIALIZE(matchProctime) AS VARCHAR(2147483647)) AS EXPR$2])
+- Match(partitionBy=[c], orderBy=[proctime ASC], measures=[FINAL(A.a) AS a, FINAL(A.b) AS b, FINAL(MATCH_PROCTIME()) AS matchProctime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A=>(PREV(A.$0, 0), 1)}])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, b, c, d, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
<TestCase name="testMatchRecognizeSinkWithPk[nonDeterministicUpdateStrategy=TRY_RESOLVE]">
<Resource name="sql">
<![CDATA[
insert into sink_with_pk
SELECT T1.a, T1.b, cast(T1.matchProctime as varchar)
FROM v1
MATCH_RECOGNIZE (
PARTITION BY c
ORDER BY proctime
MEASURES
A.a as a,
A.b as b,
MATCH_PROCTIME() as matchProctime
ONE ROW PER MATCH
PATTERN (A)
DEFINE
A AS A.a > 1
) AS T1
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, EXPR$2])
+- LogicalProject(a=[$1], b=[$2], EXPR$2=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL])
+- LogicalMatch(partition=[[2]], order=[[4 ASC-nulls-first]], outputFields=[[c, a, b, matchProctime]], allRows=[false], after=[FLAG(SKIP TO NEXT ROW)], pattern=[_UTF-16LE'A'], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[>(PREV(A.$0, 0), 1)]], inputFields=[[a, b, c, d, proctime]])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, src]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, EXPR$2])
+- Calc(select=[a, b, CAST(PROCTIME_MATERIALIZE(matchProctime) AS VARCHAR(2147483647)) AS EXPR$2])
+- Match(partitionBy=[c], orderBy=[proctime ASC], measures=[FINAL(A.a) AS a, FINAL(A.b) AS b, FINAL(MATCH_PROCTIME()) AS matchProctime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A=>(PREV(A.$0, 0), 1)}])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[a, b, c, d, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,87 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
util.verifyExecPlan(stmtSet)
}

@Test
def testMatchRecognizeSinkWithPk(): Unit = {
util.tableEnv.executeSql(s"""
|create temporary view v1 as
|select *, PROCTIME() as proctime from src
|""".stripMargin)

util.verifyExecPlanInsert(
"""
|insert into sink_with_pk
|SELECT T1.a, T1.b, cast(T1.matchProctime as varchar)
|FROM v1
|MATCH_RECOGNIZE (
|PARTITION BY c
|ORDER BY proctime
|MEASURES
| A.a as a,
| A.b as b,
| MATCH_PROCTIME() as matchProctime
|ONE ROW PER MATCH
|PATTERN (A)
|DEFINE
| A AS A.a > 1
|) AS T1
|""".stripMargin
)
}

@Test
def testMatchRecognizeWithNonDeterministicConditionOnCdcSinkWithPk(): Unit = {
// TODO this should be updated after StreamPhysicalMatch supports consuming updates
thrown.expectMessage("Match Recognize doesn't support consuming update and delete changes")
thrown.expect(classOf[TableException])
util.verifyExecPlanInsert(
"""
|insert into sink_with_pk
|SELECT T.a, T.b, cast(T.matchRowtime as varchar)
|FROM cdc_with_meta_and_wm
|MATCH_RECOGNIZE (
|PARTITION BY c
|ORDER BY op_ts
|MEASURES
| A.a as a,
| A.b as b,
| MATCH_ROWTIME(op_ts) as matchRowtime
|ONE ROW PER MATCH
|PATTERN (A)
|DEFINE
| A AS A.op_ts >= CURRENT_TIMESTAMP
|) AS T
""".stripMargin
)
}

@Test
def testMatchRecognizeOnCdcWithMetaDataSinkWithPk(): Unit = {
// TODO this should be updated after StreamPhysicalMatch supports consuming updates
thrown.expectMessage("Match Recognize doesn't support consuming update and delete changes")
thrown.expect(classOf[TableException])
util.verifyExecPlanInsert(
"""
|insert into sink_with_pk
|SELECT T.a, T.b, cast(T.ts as varchar)
|FROM cdc_with_meta_and_wm
|MATCH_RECOGNIZE (
|PARTITION BY c
|ORDER BY op_ts
|MEASURES
| A.a as a,
| A.b as b,
| A.op_ts as ts,
| MATCH_ROWTIME(op_ts) as matchRowtime
|ONE ROW PER MATCH
|PATTERN (A)
|DEFINE
| A AS A.a > 0
|) AS T
""".stripMargin
)
}

/**
* This upsert test sink does support getting primary key from table schema. We defined a similar
* test sink here not using existing {@link TestingUpsertTableSink} in {@link StreamTestSink}
Expand Down

0 comments on commit 8829b5e

Please sign in to comment.