Skip to content

Commit

Permalink
[FLINK-24168][table-planner] Update MATCH_ROWTIME function which coul…
Browse files Browse the repository at this point in the history
…d receive 0 argument or 1 argument

This closes apache#17205
  • Loading branch information
beyond1920 authored and godfreyhe committed Sep 18, 2021
1 parent aa1cdd0 commit 475a662
Show file tree
Hide file tree
Showing 10 changed files with 599 additions and 249 deletions.
5 changes: 3 additions & 2 deletions docs/content.zh/docs/dev/table/sql/queries/match_recognize.md
Original file line number Diff line number Diff line change
Expand Up @@ -930,11 +930,12 @@ FROM Ticker
<tbody>
<tr>
<td>
<code>MATCH_ROWTIME()</code><br/>
<code>MATCH_ROWTIME([rowtime_field])</code><br/>
</td>
<td>
<p>返回映射到给定模式的最后一行的时间戳。</p>
<p>结果属性是<a href="{{< ref "docs/dev/table/concepts/time_attributes" >}}">行时间属性</a>,可用于后续基于时间的操作,例如 <a href="{{< ref "docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a> 和 <a href="#aggregations">group window or over window aggregations</a>。</p>
<p>函数可以没有入参,这种情况下函数返回结果是 TIMESTAMP 类型且具有事件时间属性;也可以有一个入参,这个参数值必须是 TIMESTAMP 类型或者 TIMESTAMP_LTZ 类型,且必须有事件时间属性,这种情况下函数返回结果的数据类型和输入参数的一致,且必须有事件时间属性。</p>
<p>结果属性是<a href="{{< ref "docs/dev/table/concepts/time_attributes" >}}">事件时间属性</a>,可用于后续基于时间的操作,例如 <a href="{{< ref "docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a> 和 <a href="#aggregations">group window or over window aggregations</a>。</p>
</td>
</tr>
<tr>
Expand Down
3 changes: 2 additions & 1 deletion docs/content/docs/dev/table/sql/queries/match_recognize.md
Original file line number Diff line number Diff line change
Expand Up @@ -1039,9 +1039,10 @@ use [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" >}}). To
<tbody>
<tr>
<td>
<code>MATCH_ROWTIME()</code><br/>
<code>MATCH_ROWTIME([rowtime_field])</code><br/>
</td>
<td><p>Returns the timestamp of the last row that was mapped to the given pattern.</p>
<p>The function accepts zero or one operand which is a field reference with rowtime attribute. If there is no operand, the function will return rowtime attribute with TIMESTAMP type. Otherwise, the return type will be same with the operand type.</p>
<p>The resulting attribute is a <a href="{{< ref "docs/dev/table/concepts/time_attributes" >}}">rowtime attribute</a>
that can be used in subsequent time-based operations such as
<a href="{{< ref "docs/dev/table/sql/queries/joins" >}}#interval-joins">interval joins</a> and <a href="#aggregations">group window or over
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,9 @@ public static synchronized FlinkSqlOperatorTable instance() {
return instance;
}

private static final SqlReturnTypeInference ROWTIME_TYPE_INFERENCE =
createTimeIndicatorReturnType(true, false);
private static final SqlReturnTypeInference PROCTIME_TYPE_INFERENCE =
createTimeIndicatorReturnType(false, true);

private static SqlReturnTypeInference createTimeIndicatorReturnType(
boolean isRowTime, boolean isTimestampLtz) {
return ReturnTypes.explicit(
factory -> {
if (isRowTime) {
return ((FlinkTypeFactory) factory)
.createRowtimeIndicatorType(false, isTimestampLtz);
} else {
return ((FlinkTypeFactory) factory).createProctimeIndicatorType(false);
}
});
}
ReturnTypes.explicit(
factory -> ((FlinkTypeFactory) factory).createProctimeIndicatorType(false));

@Override
public void lookupOperatorOverloads(
Expand Down Expand Up @@ -120,19 +106,10 @@ public void lookupOperatorOverloads(
false);

/**
* Function used to access a event time attribute with TIMESTAMP or TIMESTAMP_LTZ type from
* MATCH_RECOGNIZE, for TIMESTAMP_LTZ type, we rewrite the return type in
* [org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter].
* Function used to access an event time attribute with TIMESTAMP or TIMESTAMP_LTZ type from
* MATCH_RECOGNIZE.
*/
public static final SqlFunction MATCH_ROWTIME =
new CalciteSqlFunction(
"MATCH_ROWTIME",
SqlKind.OTHER_FUNCTION,
ROWTIME_TYPE_INFERENCE,
null,
OperandTypes.NILADIC,
SqlFunctionCategory.MATCH_RECOGNIZE,
true);
public static final SqlFunction MATCH_ROWTIME = new MatchRowTimeFunction();

/** Function used to access a processing time attribute from MATCH_RECOGNIZE. */
public static final SqlFunction MATCH_PROCTIME =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.functions.sql;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperandCountRange;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.type.SqlOperandCountRanges;

import java.util.List;

/**
* The function used to access a rowtime attribute with TIMESTAMP or TIMESTAMP_LTZ type from
* MATCH_RECOGNIZE clause. The function accepts zero or one operand which is a field reference with
* rowtime attribute. If there is no operand, the function will return rowtime attribute with
* TIMESTAMP type. Otherwise, the return type will be same with the operand type.
*/
public class MatchRowTimeFunction extends SqlFunction {

public MatchRowTimeFunction() {
super(
"MATCH_ROWTIME",
SqlKind.OTHER_FUNCTION,
null,
null,
null,
SqlFunctionCategory.MATCH_RECOGNIZE);
}

@Override
public String getAllowedSignatures(String opNameToUse) {
return "MATCH_ROWTIME([rowtime_field])";
}

@Override
public SqlOperandCountRange getOperandCountRange() {
return SqlOperandCountRanges.between(0, 1);
}

public String getSignatureTemplate(final int operandsCount) {
switch (operandsCount) {
case 0:
return "{}";
case 1:
return "{0})";
default:
throw new AssertionError();
}
}

@Override
public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
List<SqlNode> operands = callBinding.operands();
int n = operands.size();
assert n == 0 || n == 1;
if (n == 0) {
return true;
} else {
SqlNode operand = callBinding.operand(0);
if (operand.getKind() != SqlKind.IDENTIFIER) {
if (throwOnFailure) {
throw new ValidationException(
String.format(
"The function %s requires a field reference as argument, but actual argument is not a simple field reference.",
callBinding.getOperator().getName()));
} else {
return false;
}
}
RelDataType operandType = callBinding.getOperandType(0);
if (FlinkTypeFactory.isRowtimeIndicatorType(operandType)) {
return true;
} else {
if (throwOnFailure) {
throw new ValidationException(
String.format(
"The function %s requires argument to be a row time attribute type, but is '%s'.",
callBinding.getOperator().getName(), operandType));
} else {
return false;
}
}
}
}

@Override
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
// Returns rowtime if there is no argument
if (opBinding.getOperandCount() == 0) {
final FlinkTypeFactory typeFactory = (FlinkTypeFactory) opBinding.getTypeFactory();
return typeFactory.createRowtimeIndicatorType(false, false);
}
return opBinding.getOperandType(0);
}

@Override
public boolean isDeterministic() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.flink.table.planner.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, isTimestampLtzIndicatorType}
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.utils.MatchUtil

import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
Expand All @@ -27,10 +29,12 @@ import org.apache.calcite.rel.core.Match
import org.apache.calcite.rel.logical.LogicalMatch
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.util.ImmutableBitSet
import org.apache.calcite.util.{ImmutableBitSet, Litmus}

import java.util

import scala.collection.JavaConversions._

class FlinkLogicalMatch(
cluster: RelOptCluster,
traitSet: RelTraitSet,
Expand Down Expand Up @@ -65,6 +69,28 @@ class FlinkLogicalMatch(
interval)
with FlinkLogicalRel {

override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = {
val inputContainsRowTimeLtz = input.getRowType.getFieldList.exists { field =>
isRowtimeIndicatorType(field.getType) && isTimestampLtzIndicatorType(field.getType)
}
if (inputContainsRowTimeLtz) {
val containMatchRowTimeWithoutArgs = getMeasures.values().exists(
MatchUtil.isFinalOnMatchRowTimeWithoutArgs)
if (containMatchRowTimeWithoutArgs) {
litmus.fail(
"MATCH_ROWTIME(rowtimeField) should be used when input stream contains " +
"rowtime attribute with TIMESTAMP_LTZ type.\n" +
"Please pass rowtime attribute field as input argument of " +
"MATCH_ROWTIME(rowtimeField) function.")
} else {
litmus.succeed()
}
} else {
litmus.succeed()
}
}


override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new FlinkLogicalMatch(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.flink.table.planner.codegen.MatchCodeGenerator.ALL_PATTERN_VAR
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.plan.logical.MatchRecognize
import org.apache.flink.table.planner.plan.nodes.exec.spec.{MatchSpec, PartitionSpec}
import org.apache.flink.table.planner.calcite.FlinkTypeFactory.isRowtimeIndicatorType

import org.apache.calcite.rex.{RexCall, RexNode, RexPatternFieldRef}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
Expand Down Expand Up @@ -85,23 +84,35 @@ object MatchUtil {
}
}

def isFinalOnRowTimeIndicator(rex: RexNode): Boolean = {
def isFinalOnMatchTimeIndicator(rex: RexNode): Boolean = {
rex match {
case call: RexCall =>
call.getOperator match {
case SqlStdOperatorTable.FINAL =>
call.getOperands.size == 1 && isRowtimeIndicatorType(call.getOperands.head.getType)
call.getOperands.size == 1 && isMatchTimeIndicator(call.getOperands.head)
case _ => false
}
case _ => false
}
}

def isMatchRowTimeIndicator(rex: RexNode): Boolean = {
def isMatchRowTimeWithoutArgs(rex: RexNode): Boolean = {
rex match {
case call: RexCall =>
call.getOperator match {
case FlinkSqlOperatorTable.MATCH_ROWTIME => true
case FlinkSqlOperatorTable.MATCH_ROWTIME => call.getOperands.isEmpty
case _ => false
}
case _ => false
}
}

def isFinalOnMatchRowTimeWithoutArgs(rex: RexNode): Boolean = {
rex match {
case call: RexCall =>
call.getOperator match {
case SqlStdOperatorTable.FINAL =>
call.getOperands.size == 1 && isMatchRowTimeWithoutArgs(call.getOperands.head)
case _ => false
}
case _ => false
Expand Down
Loading

0 comments on commit 475a662

Please sign in to comment.