Skip to content

Commit

Permalink
[FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE
Browse files Browse the repository at this point in the history
Introduces support for WITHIN clause in MATCH_RECOGNIZE that
allows adding a time constraint for a pattern. It reuses the
within function of Pattern in the CEP library. The behavior
is such that the difference between first row in a match and
last row in a match must be smaller than the given period. The
WITHIN clause accepts only a constant millisecond interval value.
  • Loading branch information
dawidwys authored and twalthr committed Dec 3, 2018
1 parent 59ebdb2 commit d73b01b
Showing 3 changed files with 98 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -39,6 +40,7 @@ import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
import org.apache.flink.cep.pattern.conditions.BooleanConditions
import org.apache.flink.cep.{CEP, PatternStream}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.time.Time

import scala.collection.JavaConverters._
import org.apache.flink.table.api._
@@ -95,6 +97,31 @@ class DataStreamMatch(
explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames, getExpressionString)
}

private def translateTimeBound(interval: RexNode): Time = {
interval match {
case x: RexLiteral if x.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME =>
Time.milliseconds(x.getValueAs(classOf[JLong]))
case _ =>
throw new TableException("Only constant intervals with millisecond resolution " +
"are supported as time constraints of patterns.")
}
}

@VisibleForTesting
private[flink] def translatePattern(
config: TableConfig,
inputTypeInfo: TypeInformation[Row]
): (Pattern[Row, Row], Iterable[String]) = {
val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
val cepPattern = if (logicalMatch.interval != null) {
val interval = translateTimeBound(logicalMatch.interval)
logicalMatch.pattern.accept(patternVisitor).within(interval)
} else {
logicalMatch.pattern.accept(patternVisitor)
}
(cepPattern, patternVisitor.names)
}

override def translateToPlan(
tableEnv: StreamTableEnvironment,
queryConfig: StreamQueryConfig)
@@ -120,9 +147,7 @@ class DataStreamMatch(
crowInput,
logicalMatch.orderKeys)

val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
val cepPattern = logicalMatch.pattern.accept(patternVisitor)
val patternNames = patternVisitor.names
val (cepPattern, patternNames) = translatePattern(config, inputTypeInfo)

//TODO remove this once it is supported in CEP library
if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
@@ -138,11 +163,6 @@ class DataStreamMatch(
"pattern with either a simple variable or reluctant quantifier.")
}

if (logicalMatch.interval != null) {
throw new TableException(
"WITHIN clause is not part of the SQL Standard, thus it is not supported.")
}

val inputDS: DataStream[Row] = timestampedInput
.map(new CRowToRowMapFunction)
.setParallelism(timestampedInput.getParallelism)
@@ -232,12 +252,9 @@ class DataStreamMatch(
inputDs
}
}

@VisibleForTesting private[flink] def getLogicalMatch = logicalMatch
}

@VisibleForTesting
private[flink] class PatternVisitor(
private class PatternVisitor(
config: TableConfig,
inputTypeInfo: TypeInformation[Row],
logicalMatch: MatchRecognize)
Original file line number Diff line number Diff line change
@@ -20,10 +20,12 @@ package org.apache.flink.table.`match`

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.TableException
import org.junit.Test

class PatternTranslatorTest extends PatternTranslatorTestBase {

@Test
def simplePattern(): Unit = {
verifyPattern(
@@ -212,6 +214,63 @@ class PatternTranslatorTest extends PatternTranslatorTestBase {
Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C"))
}

@Test
def testWithinClause(): Unit = {
verifyPattern(
"""MATCH_RECOGNIZE (
| ORDER BY proctime
| MEASURES
| A.f0 AS aid
| PATTERN (A B) WITHIN INTERVAL '10 00:00:00.004' DAY TO SECOND
| DEFINE
| A as A.f0 = 1
|) AS T
|""".stripMargin,
Pattern.begin("A", skipToNext()).next("B")
.within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4)))

verifyPattern(
"""MATCH_RECOGNIZE (
| ORDER BY proctime
| MEASURES
| A.f0 AS aid
| PATTERN (A B) WITHIN INTERVAL '10 00' DAY TO HOUR
| DEFINE
| A as A.f0 = 1
|) AS T
|""".stripMargin,
Pattern.begin("A", skipToNext()).next("B")
.within(Time.milliseconds(10 * 24 * 60 * 60 * 1000)))

verifyPattern(
"""MATCH_RECOGNIZE (
| ORDER BY proctime
| MEASURES
| A.f0 AS aid
| PATTERN (A B) WITHIN INTERVAL '10' MINUTE
| DEFINE
| A as A.f0 = 1
|) AS T
|""".stripMargin,
Pattern.begin("A", skipToNext()).next("B")
.within(Time.milliseconds(10 * 60 * 1000)))
}

@Test(expected = classOf[TableException])
def testWithinClauseWithYearMonthResolution(): Unit = {
verifyPattern(
"""MATCH_RECOGNIZE (
| ORDER BY proctime
| MEASURES
| A.f0 AS aid
| PATTERN (A B) WITHIN INTERVAL '2-10' YEAR TO MONTH
| DEFINE
| A as A.f0 = 1
|) AS T
|""".stripMargin,
null /* don't care */)
}

@Test(expected = classOf[TableException])
def testReluctantOptionalNotSupported(): Unit = {
verifyPattern(
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableConfig, TableEnvironment}
import org.apache.flink.table.calcite.FlinkPlannerImpl
import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan, PatternVisitor}
import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan}
import org.apache.flink.types.Row
import org.apache.flink.util.TestLogger
import org.junit.Assert._
@@ -89,11 +89,8 @@ abstract class PatternTranslatorTestBase extends TestLogger{
fail("Expression is converted into more than a Match operation. Use a different test method.")
}

val dataMatch = optimized
.asInstanceOf[DataStreamMatch]

val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo, dataMatch.getLogicalMatch)
val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor)
val dataMatch = optimized.asInstanceOf[DataStreamMatch]
val p = dataMatch.translatePattern(new TableConfig, testTableTypeInfo)._1

compare(expected, p)
}
@@ -108,10 +105,16 @@ abstract class PatternTranslatorTestBase extends TestLogger{
val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
currentRight.getAfterMatchSkipStrategy

val sameTimeWindow = if (currentLeft.getWindowTime != null && currentRight != null) {
currentLeft.getWindowTime.toMilliseconds == currentRight.getWindowTime.toMilliseconds
} else {
currentLeft.getWindowTime == null && currentRight.getWindowTime == null
}

currentLeft = currentLeft.getPrevious
currentRight = currentRight.getPrevious

if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) {
if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy || !sameTimeWindow) {
throw new ComparisonFailure("Compiled different pattern.",
expected.toString,
actual.toString)

0 comments on commit d73b01b

Please sign in to comment.