Skip to content

Commit

Permalink
[FLINK-32578][table-planner] Fix wrong plan which group by window tim…
Browse files Browse the repository at this point in the history
…e columns on a proctime window operator may result hang for ever

This closes apache#23001
  • Loading branch information
lincoln-lil authored Jul 19, 2023
1 parent 5430912 commit 698b128
Show file tree
Hide file tree
Showing 6 changed files with 851 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.rules.physical.stream

import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalGroupAggregate
Expand Down Expand Up @@ -50,10 +49,7 @@ class StreamPhysicalGroupAggregateRule(config: Config) extends ConverterRule(con
}

// check not window aggregate
val fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery)
val windowProperties = fmq.getRelWindowProperties(agg.getInput)
val grouping = agg.getGroupSet
!WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)
!WindowUtil.isValidWindowAggregate(agg)
}

override def convert(rel: RelNode): RelNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ class StreamPhysicalWindowAggregateRule(config: Config) extends ConverterRule(co
return false
}

val fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery)
val windowProperties = fmq.getRelWindowProperties(agg.getInput)
val grouping = agg.getGroupSet
WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)
WindowUtil.isValidWindowAggregate(agg)
}

override def convert(rel: RelNode): RelNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlW
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
import org.apache.flink.table.planner.plan.logical._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalJoin, FlinkLogicalRank, FlinkLogicalTableFunctionScan}
import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import org.apache.flink.table.planner.typeutils.RowTypeUtils
Expand All @@ -32,16 +33,19 @@ import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDat
import org.apache.flink.table.types.logical.TimestampType
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType

import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rel.core.{Aggregate, AggregateCall, Calc}
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.util.ImmutableBitSet
import org.apache.calcite.util.{ImmutableBitSet, Util}

import java.time.Duration
import java.util.Collections

import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -310,6 +314,21 @@ object WindowUtil {
groupingTypes ++ accTypes.map(fromDataTypeToLogicalType) ++ sliceEndType)
}

/**
* For rowtime window, return true if the given aggregate grouping contains window start and end.
* For proctime window, we should also check if it exists a neighbour windowTableFunctionCall.
*/
def isValidWindowAggregate(agg: FlinkLogicalAggregate): Boolean = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(agg.getCluster.getMetadataQuery)
val windowProperties = fmq.getRelWindowProperties(agg.getInput)
val grouping = agg.getGroupSet
if (WindowUtil.groupingContainsWindowStartEnd(grouping, windowProperties)) {
windowProperties.isRowtime || existNeighbourWindowTableFunc(agg.getInput)
} else {
false
}
}

// ------------------------------------------------------------------------------------------
// Private Helpers
// ------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -343,4 +362,34 @@ object WindowUtil {
}
}

private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {

@tailrec
def find(rel: RelNode): Unit = {
rel match {
case rss: RelSubset =>
val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
find(innerRel)

case scan: FlinkLogicalTableFunctionScan =>
if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
throw new Util.FoundOne
}
find(scan.getInput(0))

// proctime attribute comes from these operators can not be used directly for proctime
// window aggregate, so further traversal of child nodes is unnecessary
case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _: FlinkLogicalJoin =>

case sr: SingleRel => find(sr.getInput)
}
}

try {
find(rel)
} catch {
case _: Util.FoundOne => return true
}
false
}
}
Loading

0 comments on commit 698b128

Please sign in to comment.