Skip to content

Commit

Permalink
[FLINK-23054][table] Rank update optimization should based on upsert key
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jun 29, 2021
1 parent e1117f0 commit 4642c1f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -135,17 +134,18 @@ public String toString() {
static List<RankProcessStrategy> analyzeRankProcessStrategies(
StreamPhysicalRel rank, ImmutableBitSet partitionKey, RelCollation orderKey) {

RelMetadataQuery mq = rank.getCluster().getMetadataQuery();
FlinkRelMetadataQuery mq = (FlinkRelMetadataQuery) rank.getCluster().getMetadataQuery();
List<RelFieldCollation> fieldCollations = orderKey.getFieldCollations();
boolean isUpdateStream = !ChangelogPlanUtils.inputInsertOnly(rank);
RelNode input = rank.getInput(0);

if (isUpdateStream) {
Set<ImmutableBitSet> uniqueKeys = mq.getUniqueKeys(input);
if (uniqueKeys == null
|| uniqueKeys.isEmpty()
// unique key should contains partition key
|| uniqueKeys.stream().noneMatch(k -> k.contains(partitionKey))) {
Set<ImmutableBitSet> upsertKeys =
mq.getUpsertKeysInKeyGroupRange(input, partitionKey.toArray());
if (upsertKeys == null
|| upsertKeys.isEmpty()
// upsert key should contains partition key
|| upsertKeys.stream().noneMatch(k -> k.contains(partitionKey))) {
// and we fall back to using retract rank
return Collections.singletonList(RETRACT_STRATEGY);
} else {
Expand Down Expand Up @@ -197,7 +197,7 @@ static List<RankProcessStrategy> analyzeRankProcessStrategies(
if (isMonotonic) {
// TODO: choose a set of primary key
return Arrays.asList(
new UpdateFastStrategy(uniqueKeys.iterator().next().toArray()),
new UpdateFastStrategy(upsertKeys.iterator().next().toArray()),
RETRACT_STRATEGY);
} else {
return Collections.singletonList(RETRACT_STRATEGY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.rules.logical

import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, RankUtil}
import org.apache.flink.table.runtime.operators.rank.VariableRankRange
Expand Down Expand Up @@ -116,11 +117,12 @@ class CalcRankTransposeRule
private def getKeyFields(rank: FlinkLogicalRank): Array[Int] = {
val partitionKey = rank.partitionKey.toArray
val orderKey = rank.orderKey.getFieldCollations.map(_.getFieldIndex).toArray
val uniqueKeys = rank.getCluster.getMetadataQuery.getUniqueKeys(rank.getInput)
val keysInUniqueKeys = if (uniqueKeys == null || uniqueKeys.isEmpty) {
val upsertKeys = FlinkRelMetadataQuery.reuseOrCreate(rank.getCluster.getMetadataQuery)
.getUpsertKeysInKeyGroupRange(rank.getInput, partitionKey)
val keysInUniqueKeys = if (upsertKeys == null || upsertKeys.isEmpty) {
Array[Int]()
} else {
uniqueKeys.flatMap(_.toArray).toArray
upsertKeys.flatMap(_.toArray).toArray
}
val rankRangeKey = rank.rankRange match {
case v: VariableRankRange => Array(v.getRankEndIndex)
Expand Down

0 comments on commit 4642c1f

Please sign in to comment.