From d73b01b138dbc6dec078150d8cf6f9410fe68c7c Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Wed, 28 Nov 2018 13:31:59 +0100 Subject: [PATCH] [FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE Introduces support for WITHIN clause in MATCH_RECOGNIZE that allows adding a time constraint for a pattern. It reuses the within function of Pattern in the CEP library. The behavior is such that the difference between first row in a match and last row in a match must be smaller than the given period. The WITHIN clause accepts only a constant millisecond interval value. --- .../nodes/datastream/DataStreamMatch.scala | 41 +++++++++---- .../table/match/PatternTranslatorTest.scala | 59 +++++++++++++++++++ .../match/PatternTranslatorTestBase.scala | 17 +++--- 3 files changed, 98 insertions(+), 19 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala index 19b298cbf98e7..493c0923ed2ff 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala @@ -28,6 +28,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.SqlMatchRecognize.AfterOption +import org.apache.calcite.sql.`type`.SqlTypeFamily import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.api.common.typeinfo.TypeInformation @@ -39,6 +40,7 @@ import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty import org.apache.flink.cep.pattern.conditions.BooleanConditions import org.apache.flink.cep.{CEP, PatternStream} import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.windowing.time.Time import scala.collection.JavaConverters._ import org.apache.flink.table.api._ @@ -95,6 +97,31 @@ class DataStreamMatch( explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames, getExpressionString) } + private def translateTimeBound(interval: RexNode): Time = { + interval match { + case x: RexLiteral if x.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME => + Time.milliseconds(x.getValueAs(classOf[JLong])) + case _ => + throw new TableException("Only constant intervals with millisecond resolution " + + "are supported as time constraints of patterns.") + } + } + + @VisibleForTesting + private[flink] def translatePattern( + config: TableConfig, + inputTypeInfo: TypeInformation[Row] + ): (Pattern[Row, Row], Iterable[String]) = { + val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch) + val cepPattern = if (logicalMatch.interval != null) { + val interval = translateTimeBound(logicalMatch.interval) + logicalMatch.pattern.accept(patternVisitor).within(interval) + } else { + logicalMatch.pattern.accept(patternVisitor) + } + (cepPattern, patternVisitor.names) + } + override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig) @@ -120,9 +147,7 @@ class DataStreamMatch( crowInput, logicalMatch.orderKeys) - val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch) - val cepPattern = logicalMatch.pattern.accept(patternVisitor) - val patternNames = patternVisitor.names + val (cepPattern, patternNames) = translatePattern(config, inputTypeInfo) //TODO remove this once it is supported in CEP library if (NFACompiler.canProduceEmptyMatches(cepPattern)) { @@ -138,11 +163,6 @@ class DataStreamMatch( "pattern with either a simple variable or reluctant quantifier.") } - if (logicalMatch.interval != null) { - throw new TableException( - "WITHIN clause is not part of the SQL Standard, thus it is not supported.") - } - val inputDS: DataStream[Row] = timestampedInput .map(new CRowToRowMapFunction) .setParallelism(timestampedInput.getParallelism) @@ -232,12 +252,9 @@ class DataStreamMatch( inputDs } } - - @VisibleForTesting private[flink] def getLogicalMatch = logicalMatch } -@VisibleForTesting -private[flink] class PatternVisitor( +private class PatternVisitor( config: TableConfig, inputTypeInfo: TypeInformation[Row], logicalMatch: MatchRecognize) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala index 9bf651fc0b82d..53368c02b581c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala @@ -20,10 +20,12 @@ package org.apache.flink.table.`match` import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._ import org.apache.flink.cep.pattern.Pattern +import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.TableException import org.junit.Test class PatternTranslatorTest extends PatternTranslatorTestBase { + @Test def simplePattern(): Unit = { verifyPattern( @@ -212,6 +214,63 @@ class PatternTranslatorTest extends PatternTranslatorTestBase { Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C")) } + @Test + def testWithinClause(): Unit = { + verifyPattern( + """MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | A.f0 AS aid + | PATTERN (A B) WITHIN INTERVAL '10 00:00:00.004' DAY TO SECOND + | DEFINE + | A as A.f0 = 1 + |) AS T + |""".stripMargin, + Pattern.begin("A", skipToNext()).next("B") + .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4))) + + verifyPattern( + """MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | A.f0 AS aid + | PATTERN (A B) WITHIN INTERVAL '10 00' DAY TO HOUR + | DEFINE + | A as A.f0 = 1 + |) AS T + |""".stripMargin, + Pattern.begin("A", skipToNext()).next("B") + .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000))) + + verifyPattern( + """MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | A.f0 AS aid + | PATTERN (A B) WITHIN INTERVAL '10' MINUTE + | DEFINE + | A as A.f0 = 1 + |) AS T + |""".stripMargin, + Pattern.begin("A", skipToNext()).next("B") + .within(Time.milliseconds(10 * 60 * 1000))) + } + + @Test(expected = classOf[TableException]) + def testWithinClauseWithYearMonthResolution(): Unit = { + verifyPattern( + """MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | A.f0 AS aid + | PATTERN (A B) WITHIN INTERVAL '2-10' YEAR TO MONTH + | DEFINE + | A as A.f0 = 1 + |) AS T + |""".stripMargin, + null /* don't care */) + } + @Test(expected = classOf[TableException]) def testReluctantOptionalNotSupported(): Unit = { verifyPattern( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala index 883ce0adf8102..34b528c4a55c0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala @@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableConfig, TableEnvironment} import org.apache.flink.table.calcite.FlinkPlannerImpl -import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan, PatternVisitor} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan} import org.apache.flink.types.Row import org.apache.flink.util.TestLogger import org.junit.Assert._ @@ -89,11 +89,8 @@ abstract class PatternTranslatorTestBase extends TestLogger{ fail("Expression is converted into more than a Match operation. Use a different test method.") } - val dataMatch = optimized - .asInstanceOf[DataStreamMatch] - - val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo, dataMatch.getLogicalMatch) - val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor) + val dataMatch = optimized.asInstanceOf[DataStreamMatch] + val p = dataMatch.translatePattern(new TableConfig, testTableTypeInfo)._1 compare(expected, p) } @@ -108,10 +105,16 @@ abstract class PatternTranslatorTestBase extends TestLogger{ val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy == currentRight.getAfterMatchSkipStrategy + val sameTimeWindow = if (currentLeft.getWindowTime != null && currentRight != null) { + currentLeft.getWindowTime.toMilliseconds == currentRight.getWindowTime.toMilliseconds + } else { + currentLeft.getWindowTime == null && currentRight.getWindowTime == null + } + currentLeft = currentLeft.getPrevious currentRight = currentRight.getPrevious - if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) { + if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy || !sameTimeWindow) { throw new ComparisonFailure("Compiled different pattern.", expected.toString, actual.toString)