Skip to content

Commit

Permalink
[FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from Fl…
Browse files Browse the repository at this point in the history
…inkRelMdColumnUniqueness
  • Loading branch information
lincoln-lil committed Sep 25, 2024
1 parent 00625b0 commit 9df9dce
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,26 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = {
val input = rank.getInput
val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
if (rankFunColumnIndex < 0) {
mq.areColumnsUnique(input, columns, ignoreNulls)
if (RankUtil.isDeduplication(rank)) {
columns != null && util.Arrays.equals(columns.toArray, rank.partitionKey.toArray)
} else {
val childColumns = columns.clear(rankFunColumnIndex)
val isChildColumnsUnique = mq.areColumnsUnique(input, childColumns, ignoreNulls)
if (isChildColumnsUnique != null && isChildColumnsUnique) {
true
val input = rank.getInput

val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
if (rankFunColumnIndex < 0) {
mq.areColumnsUnique(input, columns, ignoreNulls)
} else {
rank.rankType match {
case RankType.ROW_NUMBER =>
val fields = columns.toArray
(rank.partitionKey.toArray :+ rankFunColumnIndex).forall(fields.contains(_))
case _ => false
val childColumns = columns.clear(rankFunColumnIndex)
val isChildColumnsUnique = mq.areColumnsUnique(input, childColumns, ignoreNulls)
if (isChildColumnsUnique != null && isChildColumnsUnique) {
true
} else {
rank.rankType match {
case RankType.ROW_NUMBER =>
val fields = columns.toArray
(rank.partitionKey.toArray :+ rankFunColumnIndex).forall(fields.contains(_))
case _ => false
}
}
}
}
Expand All @@ -277,14 +282,6 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = mq.areColumnsUnique(rel.getInput, columns, ignoreNulls)

def areColumnsUnique(
rel: StreamPhysicalDeduplicate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = {
columns != null && util.Arrays.equals(columns.toArray, rel.getUniqueKeys)
}

def areColumnsUnique(
rel: StreamPhysicalChangelogNormalize,
mq: RelMetadataQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ class FlinkRelMdHandlerTestBase {
new RelDataTypeFieldImpl("rk", 7, longType),
outputRankNumber = true,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowtime = false
sortOnRowTime = false
)

(logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank)
Expand Down Expand Up @@ -755,7 +755,7 @@ class FlinkRelMdHandlerTestBase {
new RelDataTypeFieldImpl("rk", 7, longType),
outputRankNumber = true,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowtime = false
sortOnRowTime = false
)

(logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank)
Expand Down Expand Up @@ -808,7 +808,7 @@ class FlinkRelMdHandlerTestBase {
new RelDataTypeFieldImpl("rn", 7, longType),
outputRankNumber = true,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowtime = false
sortOnRowTime = false
)

(logicalRowNumber, flinkLogicalRowNumber, streamRowNumber)
Expand Down Expand Up @@ -838,6 +838,7 @@ class FlinkRelMdHandlerTestBase {
// select a, b, c, rowtime
// ROW_NUMBER() over (partition by b, c order by rowtime desc) rn from TemporalTable3
// ) t where rn <= 1
// canbe merged into rank
protected lazy val (streamRowTimeDeduplicateFirstRow, streamRowTimeDeduplicateLastRow) = {
buildFirstRowAndLastRowDeduplicateNode(true)
}
Expand All @@ -848,13 +849,18 @@ class FlinkRelMdHandlerTestBase {
val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true)
val streamExchange1 =
new StreamPhysicalExchange(cluster, scan.getTraitSet.replace(hash1), scan, hash1)
val firstRow = new StreamPhysicalDeduplicate(
val firstRow = new StreamPhysicalRank(
cluster,
streamPhysicalTraits,
streamExchange1,
Array(1),
isRowtime,
keepLastRow = false
ImmutableBitSet.of(1),
RelCollations.of(3),
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
new RelDataTypeFieldImpl("rn", 7, longType),
outputRankNumber = false,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowTime = false
)

val builder = typeFactory.builder()
Expand All @@ -877,13 +883,22 @@ class FlinkRelMdHandlerTestBase {
val hash12 = FlinkRelDistribution.hash(Array(1, 2), requireStrict = true)
val streamExchange2 =
new BatchPhysicalExchange(cluster, scan.getTraitSet.replace(hash12), scan, hash12)
val lastRow = new StreamPhysicalDeduplicate(
val lastRow = new StreamPhysicalRank(
cluster,
streamPhysicalTraits,
streamExchange2,
Array(1, 2),
isRowtime,
keepLastRow = true
ImmutableBitSet.of(1, 2),
RelCollations.of(
new RelFieldCollation(
3,
RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)),
RankType.ROW_NUMBER,
new ConstantRankRange(1, 1),
new RelDataTypeFieldImpl("rn", 7, longType),
outputRankNumber = false,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowTime = false
)
val calcOfLastRow = new StreamPhysicalCalc(
cluster,
Expand Down Expand Up @@ -966,7 +981,7 @@ class FlinkRelMdHandlerTestBase {
new RelDataTypeFieldImpl("rk", 7, longType),
outputRankNumber = true,
RankProcessStrategy.UNDEFINED_STRATEGY,
sortOnRowtime = false
sortOnRowTime = false
)

(logicalRankWithVariableRange, flinkLogicalRankWithVariableRange, streamRankWithVariableRange)
Expand Down

0 comments on commit 9df9dce

Please sign in to comment.