Skip to content

Commit

Permalink
[FLINK-22038][table-planner-blink] Update TopN to be without rowNumbe…
Browse files Browse the repository at this point in the history
…r if rowNumber field is never used by the successor Calc

This closes apache#16024
  • Loading branch information
beyond1920 authored and godfreyhe committed Jun 3, 2021
1 parent a364daa commit 362aadc
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;

import java.util.List;
import java.util.stream.Collectors;

/**
* Planner rule that removes the output column of rank number iff the rank number column is not used
* by successor calc.
*/
public class RedundantRankNumberColumnRemoveRule extends RelOptRule {
public static final RedundantRankNumberColumnRemoveRule INSTANCE =
new RedundantRankNumberColumnRemoveRule();

public RedundantRankNumberColumnRemoveRule() {
super(
operand(FlinkLogicalCalc.class, operand(FlinkLogicalRank.class, any())),
"RedundantRankNumberColumnRemoveRule");
}

@Override
public boolean matches(RelOptRuleCall call) {
FlinkLogicalCalc calc = call.rel(0);
ImmutableBitSet usedFields = getUsedFields(calc.getProgram());
FlinkLogicalRank rank = call.rel(1);
return rank.outputRankNumber() && !usedFields.get(rank.getRowType().getFieldCount() - 1);
}

@Override
public void onMatch(RelOptRuleCall call) {
FlinkLogicalCalc calc = call.rel(0);
FlinkLogicalRank rank = call.rel(1);
FlinkLogicalRank newRank =
new FlinkLogicalRank(
rank.getCluster(),
rank.getTraitSet(),
rank.getInput(),
rank.partitionKey(),
rank.orderKey(),
rank.rankType(),
rank.rankRange(),
rank.rankNumberType(),
false);
RexProgram oldProgram = calc.getProgram();
Pair<List<RexNode>, RexNode> projectsAndCondition = getProjectsAndCondition(oldProgram);
RexProgram newProgram =
RexProgram.create(
newRank.getRowType(),
projectsAndCondition.left,
projectsAndCondition.right,
oldProgram.getOutputRowType(),
rank.getCluster().getRexBuilder());
FlinkLogicalCalc newCalc = FlinkLogicalCalc.create(newRank, newProgram);
call.transformTo(newCalc);
}

private ImmutableBitSet getUsedFields(RexProgram program) {
Pair<List<RexNode>, RexNode> projectsAndCondition = getProjectsAndCondition(program);
return RelOptUtil.InputFinder.bits(projectsAndCondition.left, projectsAndCondition.right);
}

private Pair<List<RexNode>, RexNode> getProjectsAndCondition(RexProgram program) {
List<RexNode> projects =
program.getProjectList().stream()
.map(program::expandLocalRef)
.collect(Collectors.toList());
RexNode condition = null;
if (program.getCondition() != null) {
condition = program.expandLocalRef(program.getCondition());
}
return Pair.of(projects, condition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ object FlinkBatchRuleSets {
// transpose calc past rank to reduce rank input fields
CalcRankTransposeRule.INSTANCE,
// remove output of rank number when it is a constant
RankNumberColumnRemoveRule.INSTANCE,
ConstantRankNumberColumnRemoveRule.INSTANCE,

// calc rules
CoreRules.FILTER_CALC_MERGE,
Expand Down Expand Up @@ -383,7 +383,9 @@ object FlinkBatchRuleSets {
PythonCalcSplitRule.EXPAND_PROJECT,
PythonCalcSplitRule.PUSH_CONDITION,
PythonCalcSplitRule.REWRITE_PROJECT,
PythonMapMergeRule.INSTANCE
PythonMapMergeRule.INSTANCE,
// remove output of rank number when it is not used by successor calc
RedundantRankNumberColumnRemoveRule.INSTANCE
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ object FlinkStreamRuleSets {
// transpose calc past rank to reduce rank input fields
CalcRankTransposeRule.INSTANCE,
// remove output of rank number when it is a constant
RankNumberColumnRemoveRule.INSTANCE,
ConstantRankNumberColumnRemoveRule.INSTANCE,
// split distinct aggregate to reduce data skew
SplitAggregateRule.INSTANCE,
// transpose calc past snapshot
Expand All @@ -374,6 +374,8 @@ object FlinkStreamRuleSets {
PythonCorrelateSplitRule.INSTANCE,
// merge calc after calc transpose
FlinkCalcMergeRule.INSTANCE,
// remove output of rank number when it is not used by successor calc
RedundantRankNumberColumnRemoveRule.INSTANCE,
// remove the trivial calc that is produced by PushWatermarkIntoTableSourceScanAcrossCalcRule.
// because [[PushWatermarkIntoTableSourceScanAcrossCalcRule]] will push the rowtime computed
// column into the source. After FlinkCalcMergeRule applies, it may produces a trivial calc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import java.math.{BigDecimal => JBigDecimal}
* Planner rule that removes the output column of rank number
* iff there is a equality condition for the rank column.
*/
class RankNumberColumnRemoveRule
class ConstantRankNumberColumnRemoveRule
extends RelOptRule(
operand(classOf[FlinkLogicalRank], any()),
"RankFunctionColumnRemoveRule") {
"ConstantRankNumberColumnRemoveRule") {

override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
Expand Down Expand Up @@ -79,6 +79,6 @@ class RankNumberColumnRemoveRule
}
}

object RankNumberColumnRemoveRule {
val INSTANCE = new RankNumberColumnRemoveRule
object ConstantRankNumberColumnRemoveRule {
val INSTANCE = new ConstantRankNumberColumnRemoveRule
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b, 2:BIGINT AS $2])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], global=[true], select=[a, b, c, w0$o0])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], global=[true], select=[a, b, c])
+- Sort(orderBy=[b ASC, a ASC, c ASC])
+- Exchange(distribution=[hash[b]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], global=[false], select=[a, b, c])
Expand Down Expand Up @@ -280,6 +280,80 @@ Calc(select=[CAST(rna) AS rn1, CAST(w0$o0) AS rn2], where=[(w0$o0 <= 200)])
+- Sort(orderBy=[a ASC, c ASC, b DESC])
+- Exchange(distribution=[hash[a, c]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testRedundantRankNumberColumnRemove">
<Resource name="sql">
<![CDATA[
SELECT
CONCAT('http://txmov2.a.yximgs.com', uri) AS url,
reqcount AS download_count,
start_time AS `timestamp`
FROM
(
SELECT
uri,
reqcount,
rownum_2,
start_time
FROM
(
SELECT
uri,
reqcount,
start_time,
RANK() OVER (
PARTITION BY start_time
ORDER BY
reqcount DESC
) AS rownum_2
FROM
(
SELECT
uri,
reqcount,
start_time,
RANK() OVER (
PARTITION BY start_time, bucket_id
ORDER BY
reqcount DESC
) AS rownum_1
FROM MyTable1
)
WHERE
rownum_1 <= 100000
)
WHERE
rownum_2 <= 100000
)
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(url=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', $0)], download_count=[$1], timestamp=[$2])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_2=[RANK() OVER (PARTITION BY $2 ORDER BY $1 DESC NULLS LAST)])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_1=[RANK() OVER (PARTITION BY $2, $3 ORDER BY $1 DESC NULLS LAST)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', uri) AS url, reqcount AS download_count, start_time AS timestamp])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time], orderBy=[reqcount DESC], global=[true], select=[uri, reqcount, start_time])
+- Sort(orderBy=[start_time ASC, reqcount DESC])
+- Exchange(distribution=[hash[start_time]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time], orderBy=[reqcount DESC], global=[false], select=[uri, reqcount, start_time])
+- Sort(orderBy=[start_time ASC, reqcount DESC])
+- Calc(select=[uri, reqcount, start_time])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time, bucket_id], orderBy=[reqcount DESC], global=[true], select=[uri, reqcount, start_time, bucket_id])
+- Sort(orderBy=[start_time ASC, bucket_id ASC, reqcount DESC])
+- Exchange(distribution=[hash[start_time, bucket_id]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time, bucket_id], orderBy=[reqcount DESC], global=[false], select=[uri, reqcount, start_time, bucket_id])
+- Sort(orderBy=[start_time ASC, bucket_id ASC, reqcount DESC])
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[uri, reqcount, start_time, bucket_id])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[a], select=[a, SUM(b) AS EXPR$1])
+- Calc(select=[a, b])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c, w0$o0])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c])
+- Sort(orderBy=[a ASC, c ASC, b ASC])
+- Exchange(distribution=[hash[a]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[false], select=[a, b, c])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, b, 2:BIGINT AS $2])
+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], select=[a, b, c, w0$o0])
+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], select=[a, b, c])
+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[a], select=[a, SUM(b) AS EXPR$1])
+- Calc(select=[a, b])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c, w0$o0])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c])
+- Sort(orderBy=[a ASC, c ASC, b ASC])
+- Exchange(distribution=[hash[a]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[false], select=[a, b, c])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ LogicalProject(a=[$0])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1, w0$o0])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1])
+- Sort(orderBy=[a ASC, $f1 ASC])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS $f1])
+- Exchange(distribution=[hash[a]])
Expand Down Expand Up @@ -68,7 +68,7 @@ LogicalProject(a=[$0])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1, w0$o0])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1])
+- Sort(orderBy=[a ASC, $f1 ASC])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS $f1])
+- Exchange(distribution=[hash[a]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,74 @@ Calc(select=[a, b, c])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testRedundantRankNumberColumnRemove">
<Resource name="sql">
<![CDATA[
SELECT
CONCAT('http://txmov2.a.yximgs.com', uri) AS url,
reqcount AS download_count,
start_time AS `timestamp`
FROM
(
SELECT
uri,
reqcount,
rownum_2,
start_time
FROM
(
SELECT
uri,
reqcount,
start_time,
ROW_NUMBER() OVER (
PARTITION BY start_time
ORDER BY
reqcount DESC
) AS rownum_2
FROM
(
SELECT
uri,
reqcount,
start_time,
ROW_NUMBER() OVER (
PARTITION BY start_time, bucket_id
ORDER BY
reqcount DESC
) AS rownum_1
FROM MyTable1
)
WHERE
rownum_1 <= 100000
)
WHERE
rownum_2 <= 100000
)
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(url=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', $0)], download_count=[$1], timestamp=[$2])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_2=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $1 DESC NULLS LAST)])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_1=[ROW_NUMBER() OVER (PARTITION BY $2, $3 ORDER BY $1 DESC NULLS LAST)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', uri) AS url, reqcount AS download_count, start_time AS timestamp])
+- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time], orderBy=[reqcount DESC], select=[uri, reqcount, start_time])
+- Exchange(distribution=[hash[start_time]])
+- Calc(select=[uri, reqcount, start_time])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time, bucket_id], orderBy=[reqcount DESC], select=[uri, reqcount, start_time, bucket_id])
+- Exchange(distribution=[hash[start_time, bucket_id]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[uri, reqcount, start_time, bucket_id])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit 362aadc

Please sign in to comment.