Skip to content

Commit

Permalink
[FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from Fl…
Browse files Browse the repository at this point in the history
…inkRelMdModifiedMonotonicity
  • Loading branch information
lincoln-lil committed Sep 25, 2024
1 parent e81b108 commit 026f233
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCo
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable, TableSourceTable}
import org.apache.flink.table.planner.plan.stats.{WithLower, WithUpper}
import org.apache.flink.table.planner.plan.utils.RankUtil
import org.apache.flink.types.RowKind

import org.apache.calcite.plan.hep.HepRelVertex
Expand Down Expand Up @@ -186,70 +187,78 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
}

def getRelModifiedMonotonicity(rel: Rank, mq: RelMetadataQuery): RelModifiedMonotonicity = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
val inputMonotonicity = fmq.getRelModifiedMonotonicity(rel.getInput)
rel match {
case physicalRank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
getPhysicalRankModifiedMonotonicity(physicalRank, mq)

// If child monotonicity is null, we should return early.
if (inputMonotonicity == null) {
return null
}
case _ =>
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
val inputMonotonicity = fmq.getRelModifiedMonotonicity(rel.getInput)

// if partitionBy a update field or partitionBy a field whose mono is null, just return null
if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
return null
}
// If child monotonicity is null, we should return early.
if (inputMonotonicity == null) {
return null
}

val fieldCount = rel.getRowType.getFieldCount
// if partitionBy a update field or partitionBy a field whose mono is null, just return null
if (rel.partitionKey.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) {
return null
}

// init current mono
val currentMonotonicity = notMonotonic(fieldCount)
// 1. partitionBy field is CONSTANT
rel.partitionKey.foreach(e => currentMonotonicity.fieldMonotonicities(e) = CONSTANT)
// 2. row number filed is CONSTANT
if (rel.outputRankNumber) {
currentMonotonicity.fieldMonotonicities(fieldCount - 1) = CONSTANT
}
// 3. time attribute field is increasing
(0 until fieldCount).foreach(
e => {
if (FlinkTypeFactory.isTimeIndicatorType(rel.getRowType.getFieldList.get(e).getType)) {
inputMonotonicity.fieldMonotonicities(e) = INCREASING
val fieldCount = rel.getRowType.getFieldCount

// init current mono
val currentMonotonicity = notMonotonic(fieldCount)
// 1. partitionBy field is CONSTANT
rel.partitionKey.foreach(e => currentMonotonicity.fieldMonotonicities(e) = CONSTANT)
// 2. row number filed is CONSTANT
if (rel.outputRankNumber) {
currentMonotonicity.fieldMonotonicities(fieldCount - 1) = CONSTANT
}
// 3. time attribute field is increasing
(0 until fieldCount).foreach(
e => {
if (FlinkTypeFactory.isTimeIndicatorType(rel.getRowType.getFieldList.get(e).getType)) {
inputMonotonicity.fieldMonotonicities(e) = INCREASING
}
})
val fieldCollations = rel.orderKey.getFieldCollations
if (fieldCollations.nonEmpty) {
// 4. process the first collation field, we can only deduce the first collation field
val firstCollation = fieldCollations.get(0)
// Collation field index in child node will be same with Rank node,
// see ProjectToLogicalProjectAndWindowRule for details.
val fieldMonotonicity =
inputMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex)
val result = fieldMonotonicity match {
case SqlMonotonicity.INCREASING | SqlMonotonicity.CONSTANT
if firstCollation.direction == RelFieldCollation.Direction.DESCENDING =>
INCREASING
case SqlMonotonicity.DECREASING | SqlMonotonicity.CONSTANT
if firstCollation.direction == RelFieldCollation.Direction.ASCENDING =>
DECREASING
case _ => NOT_MONOTONIC
}
currentMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) = result
}
})
val fieldCollations = rel.orderKey.getFieldCollations
if (fieldCollations.nonEmpty) {
// 4. process the first collation field, we can only deduce the first collation field
val firstCollation = fieldCollations.get(0)
// Collation field index in child node will be same with Rank node,
// see ProjectToLogicalProjectAndWindowRule for details.
val fieldMonotonicity = inputMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex)
val result = fieldMonotonicity match {
case SqlMonotonicity.INCREASING | SqlMonotonicity.CONSTANT
if firstCollation.direction == RelFieldCollation.Direction.DESCENDING =>
INCREASING
case SqlMonotonicity.DECREASING | SqlMonotonicity.CONSTANT
if firstCollation.direction == RelFieldCollation.Direction.ASCENDING =>
DECREASING
case _ => NOT_MONOTONIC
}
currentMonotonicity.fieldMonotonicities(firstCollation.getFieldIndex) = result
}

currentMonotonicity
currentMonotonicity
}
}

def getRelModifiedMonotonicity(
rel: StreamPhysicalDeduplicate,
private def getPhysicalRankModifiedMonotonicity(
rank: StreamPhysicalRank,
mq: RelMetadataQuery): RelModifiedMonotonicity = {
if (allAppend(mq, rel.getInput)) {
if (rel.keepLastRow || rel.isRowtime) {
// Can't use RankUtil.canConvertToDeduplicate directly because modifyKindSetTrait is undefined.
if (allAppend(mq, rank.getInput)) {
if (RankUtil.keepLastDeduplicateRow(rank.orderKey) || rank.sortOnRowTime) {
val mono = new RelModifiedMonotonicity(
Array.fill(rel.getRowType.getFieldCount)(NOT_MONOTONIC))
rel.getUniqueKeys.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
Array.fill(rank.getRowType.getFieldCount)(NOT_MONOTONIC))
rank.partitionKey.toArray.foreach(e => mono.fieldMonotonicities(e) = CONSTANT)
mono
} else {
// FirstRow do not generate updates.
new RelModifiedMonotonicity(Array.fill(rel.getRowType.getFieldCount)(CONSTANT))
new RelModifiedMonotonicity(Array.fill(rank.getRowType.getFieldCount)(CONSTANT))
}
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class StreamPhysicalRank(
val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)

if (RankUtil.canConvertToDeduplicate(this)) {
val keepLastRow = RankUtil.keepLastRow(orderKey)
val keepLastRow = RankUtil.keepLastDeduplicateRow(orderKey)

new StreamExecDeduplicate(
unwrapTableConfig(this),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,10 @@ object RankUtil {
isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly
}

/** Determines if the given order key indicates that the last row should be kept. */
def keepLastRow(orderKey: RelCollation): Boolean = {
/**
* Determines if the given order key indicates that the last row should be kept for deduplication.
*/
def keepLastDeduplicateRow(orderKey: RelCollation): Boolean = {
// order by timeIndicator desc ==> lastRow, otherwise is firstRow
if (orderKey.getFieldCollations.size() != 1) {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ class FlinkRelMdHandlerTestBase {
new RelDataTypeFieldImpl("rn", 7, longType),
outputRankNumber = false,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowTime = false
sortOnRowTime = isRowtime
)

val builder = typeFactory.builder()
Expand Down

0 comments on commit 026f233

Please sign in to comment.