Skip to content

Commit

Permalink
[FLINK-22901][table] Introduce getUpsertKeys in FlinkRelMetadataQuery
Browse files Browse the repository at this point in the history
This closes apache#16096
  • Loading branch information
JingsongLi authored Jun 21, 2021
1 parent 2c260b5 commit 5957812
Show file tree
Hide file tree
Showing 10 changed files with 893 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.calcite.util.ImmutableBitSet;

import java.lang.reflect.Method;
import java.util.Set;

/** Contains the interfaces for several specified metadata of flink. */
public abstract class FlinkMetadata {
Expand Down Expand Up @@ -238,4 +239,19 @@ interface Handler extends MetadataHandler<WindowProperties> {
RelWindowProperties getWindowProperties(RelNode r, RelMetadataQuery mq);
}
}

/** Metadata about which combinations of columns are upsert identifiers. */
public interface UpsertKeys extends Metadata {
Method METHOD = Types.lookupMethod(UpsertKeys.class, "getUpsertKeys");

MetadataDef<UpsertKeys> DEF =
MetadataDef.of(UpsertKeys.class, UpsertKeys.Handler.class, METHOD);

Set<ImmutableBitSet> getUpsertKeys();

/** Handler API. */
interface Handler extends MetadataHandler<UpsertKeys> {
Set<ImmutableBitSet> getUpsertKeys(RelNode r, RelMetadataQuery mq);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.apache.flink.util.Preconditions;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.util.ImmutableBitSet;

import java.util.Arrays;
import java.util.Set;

/**
* A RelMetadataQuery that defines extended metadata handler in Flink, e.g ColumnInterval,
* ColumnNullCount.
Expand All @@ -45,6 +49,7 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery {
private FlinkMetadata.FlinkDistribution.Handler distributionHandler;
private FlinkMetadata.ModifiedMonotonicity.Handler modifiedMonotonicityHandler;
private FlinkMetadata.WindowProperties.Handler windowPropertiesHandler;
private FlinkMetadata.UpsertKeys.Handler upsertKeysHandler;

/**
* Returns an instance of FlinkRelMetadataQuery. It ensures that cycles do not occur while
Expand Down Expand Up @@ -79,6 +84,7 @@ private FlinkRelMetadataQuery() {
this.distributionHandler = HANDLERS.distributionHandler;
this.modifiedMonotonicityHandler = HANDLERS.modifiedMonotonicityHandler;
this.windowPropertiesHandler = HANDLERS.windowPropertiesHandler;
this.upsertKeysHandler = HANDLERS.upsertKeysHandler;
}

/** Extended handlers. */
Expand All @@ -99,6 +105,8 @@ private static class Handlers {
initialHandler(FlinkMetadata.ModifiedMonotonicity.Handler.class);
private FlinkMetadata.WindowProperties.Handler windowPropertiesHandler =
initialHandler(FlinkMetadata.WindowProperties.Handler.class);
private FlinkMetadata.UpsertKeys.Handler upsertKeysHandler =
initialHandler(FlinkMetadata.UpsertKeys.Handler.class);
}

/**
Expand Down Expand Up @@ -256,4 +264,48 @@ public RelWindowProperties getRelWindowProperties(RelNode rel) {
}
}
}

/**
* Determines the set of upsert minimal keys for this expression. A key is represented as an
* {@link org.apache.calcite.util.ImmutableBitSet}, where each bit position represents a 0-based
* output column ordinal.
*
* <p>Different from the unique keys: In distributed streaming computing, one record may be
* divided into RowKind.UPDATE_BEFORE and RowKind.UPDATE_AFTER. If a key changing join is
* connected downstream, the two records will be divided into different tasks, resulting in
* disorder. In this case, the downstream cannot rely on the order of the original key. So in
* this case, it has unique keys in the traditional sense, but it doesn't have upsert keys.
*
* @return set of keys, or null if this information cannot be determined (whereas empty set
* indicates definitely no keys at all)
*/
public Set<ImmutableBitSet> getUpsertKeys(RelNode rel) {
for (; ; ) {
try {
return upsertKeysHandler.getUpsertKeys(rel, this);
} catch (JaninoRelMetadataProvider.NoHandler e) {
upsertKeysHandler = revise(e.relClass, FlinkMetadata.UpsertKeys.DEF);
}
}
}

/**
* Determines the set of upsert minimal keys in a single key group range, which means can ignore
* exchange by partition keys.
*
* <p>Some optimizations can rely on this ability to do upsert in a single key group range.
*/
public Set<ImmutableBitSet> getUpsertKeysInKeyGroupRange(RelNode rel, int[] partitionKeys) {
if (rel instanceof Exchange) {
Exchange exchange = (Exchange) rel;
if (Arrays.equals(
exchange.getDistribution().getKeys().stream()
.mapToInt(Integer::intValue)
.toArray(),
partitionKeys)) {
rel = exchange.getInput();
}
}
return getUpsertKeys(rel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object FlinkDefaultRelMetadataProvider {
FlinkRelMdPopulationSize.SOURCE,
FlinkRelMdColumnUniqueness.SOURCE,
FlinkRelMdUniqueKeys.SOURCE,
FlinkRelMdUpsertKeys.SOURCE,
FlinkRelMdUniqueGroups.SOURCE,
FlinkRelMdModifiedMonotonicity.SOURCE,
RelMdColumnOrigins.SOURCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.google.common.collect.ImmutableSet
import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.metadata._
import org.apache.calcite.rel.{RelNode, SingleRel}
Expand All @@ -45,6 +45,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}

import java.util
import java.util.Set

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -117,6 +118,18 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
input: RelNode,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
getProjectUniqueKeys(
projects,
input.getCluster.getTypeFactory,
() => mq.getUniqueKeys(input, ignoreNulls),
ignoreNulls)
}

def getProjectUniqueKeys(
projects: JList[RexNode],
typeFactory: RelDataTypeFactory,
getInputUniqueKeys :() => util.Set[ImmutableBitSet],
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
// LogicalProject maps a set of rows to a different set;
// Without knowledge of the mapping function(whether it
// preserves uniqueness), it is only safe to derive uniqueness
Expand Down Expand Up @@ -144,7 +157,6 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
val castOperand = a.getOperands.get(0)
castOperand match {
case castRef: RexInputRef =>
val typeFactory = input.getCluster.getTypeFactory
val castType = typeFactory.createTypeWithNullability(projExpr.getType, true)
val origType = typeFactory.createTypeWithNullability(castOperand.getType, true)
if (castType == origType) {
Expand All @@ -165,7 +177,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
return projUniqueKeySet
}

val childUniqueKeySet = mq.getUniqueKeys(input, ignoreNulls)
val childUniqueKeySet = getInputUniqueKeys()
if (childUniqueKeySet != null) {
// Now add to the projUniqueKeySet the child keys that are fully
// projected.
Expand Down Expand Up @@ -206,6 +218,11 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
rel: Expand,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
getExpandUniqueKeys(rel, () => mq.getUniqueKeys(rel.getInput, ignoreNulls))
}

def getExpandUniqueKeys(
rel: Expand, getInputUniqueKeys :() => util.Set[ImmutableBitSet]): JSet[ImmutableBitSet] = {
// mapping input column index to output index for non-null value columns
val mapInputToOutput = new JHashMap[Int, Int]()
(0 until rel.getRowType.getFieldCount).filter(_ != rel.expandIdIndex).foreach { column =>
Expand All @@ -219,7 +236,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
return null
}

val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
val inputUniqueKeys = getInputUniqueKeys()
if (inputUniqueKeys == null || inputUniqueKeys.isEmpty) {
return inputUniqueKeys
}
Expand Down Expand Up @@ -256,15 +273,18 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
rel: Rank,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
val inputUniqueKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
getRankUniqueKeys(rel, mq.getUniqueKeys(rel.getInput, ignoreNulls))
}

def getRankUniqueKeys(rel: Rank, inputKeys: JSet[ImmutableBitSet]): JSet[ImmutableBitSet] = {
val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
// for Rank node that can convert to Deduplicate, unique key is partition key
val canConvertToDeduplicate: Boolean = {
val rankRange = rel.rankRange
val isRowNumberType = rel.rankType == RankType.ROW_NUMBER
val isLimit1 = rankRange match {
case rankRange: ConstantRankRange =>
rankRange.getRankStart() == 1 && rankRange.getRankEnd() == 1
rankRange.getRankStart == 1 && rankRange.getRankEnd == 1
case _ => false
}
isRowNumberType && isLimit1
Expand All @@ -276,16 +296,16 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
retSet
}
else if (rankFunColumnIndex < 0) {
inputUniqueKeys
inputKeys
} else {
val retSet = new JHashSet[ImmutableBitSet]
rel.rankType match {
case RankType.ROW_NUMBER =>
retSet.add(rel.partitionKey.union(ImmutableBitSet.of(rankFunColumnIndex)))
case _ => // do nothing
}
if (inputUniqueKeys != null && inputUniqueKeys.nonEmpty) {
inputUniqueKeys.foreach {
if (inputKeys != null && inputKeys.nonEmpty) {
inputKeys.foreach {
uniqueKey => retSet.add(uniqueKey)
}
}
Expand Down Expand Up @@ -323,15 +343,15 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
rel: Aggregate,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
getUniqueKeysOnAggregate(rel.getGroupSet.toArray, mq, ignoreNulls)
getUniqueKeysOnAggregate(rel.getGroupSet.toArray)
}

def getUniqueKeys(
rel: BatchPhysicalGroupAggregateBase,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
if (rel.isFinal) {
getUniqueKeysOnAggregate(rel.grouping, mq, ignoreNulls)
getUniqueKeysOnAggregate(rel.grouping)
} else {
null
}
Expand All @@ -341,7 +361,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
rel: StreamPhysicalGroupAggregate,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
getUniqueKeysOnAggregate(rel.grouping, mq, ignoreNulls)
getUniqueKeysOnAggregate(rel.grouping)
}

def getUniqueKeys(
Expand All @@ -353,13 +373,10 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
rel: StreamPhysicalGlobalGroupAggregate,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
getUniqueKeysOnAggregate(rel.grouping, mq, ignoreNulls)
getUniqueKeysOnAggregate(rel.grouping)
}

def getUniqueKeysOnAggregate(
grouping: Array[Int],
mq: RelMetadataQuery,
ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
def getUniqueKeysOnAggregate(grouping: Array[Int]): util.Set[ImmutableBitSet] = {
// group by keys form a unique key
ImmutableSet.of(ImmutableBitSet.of(grouping.indices: _*))
}
Expand All @@ -371,9 +388,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
getUniqueKeysOnWindowAgg(
rel.getRowType.getFieldCount,
rel.getNamedProperties,
rel.getGroupSet.toArray,
mq,
ignoreNulls)
rel.getGroupSet.toArray)
}

def getUniqueKeys(
Expand All @@ -384,9 +399,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
getUniqueKeysOnWindowAgg(
rel.getRowType.getFieldCount,
rel.namedWindowProperties,
rel.grouping,
mq,
ignoreNulls)
rel.grouping)
} else {
null
}
Expand All @@ -397,15 +410,13 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
mq: RelMetadataQuery,
ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
getUniqueKeysOnWindowAgg(
rel.getRowType.getFieldCount, rel.namedWindowProperties, rel.grouping, mq, ignoreNulls)
rel.getRowType.getFieldCount, rel.namedWindowProperties, rel.grouping)
}

private def getUniqueKeysOnWindowAgg(
def getUniqueKeysOnWindowAgg(
fieldCount: Int,
namedProperties: Seq[PlannerNamedWindowProperty],
grouping: Array[Int],
mq: RelMetadataQuery,
ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
grouping: Array[Int]): util.Set[ImmutableBitSet] = {
if (namedProperties.nonEmpty) {
val begin = fieldCount - namedProperties.size
val end = fieldCount - 1
Expand Down Expand Up @@ -478,11 +489,10 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
val leftType = left.getRowType
getJoinUniqueKeys(
join.joinInfo, join.joinType, leftType, leftUniqueKeys, null,
join.joinType, leftType, leftUniqueKeys, null,
mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
// TODO get uniqueKeys from TableSchema of TableSource
null,
mq)
null)
}

private def getJoinUniqueKeys(
Expand All @@ -495,21 +505,18 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
val rightUniqueKeys = mq.getUniqueKeys(right, ignoreNulls)
getJoinUniqueKeys(
joinInfo, joinRelType, left.getRowType, leftUniqueKeys, rightUniqueKeys,
joinRelType, left.getRowType, leftUniqueKeys, rightUniqueKeys,
mq.areColumnsUnique(left, joinInfo.leftSet, ignoreNulls),
mq.areColumnsUnique(right, joinInfo.rightSet, ignoreNulls),
mq)
mq.areColumnsUnique(right, joinInfo.rightSet, ignoreNulls))
}

private def getJoinUniqueKeys(
joinInfo: JoinInfo,
def getJoinUniqueKeys(
joinRelType: JoinRelType,
leftType: RelDataType,
leftUniqueKeys: JSet[ImmutableBitSet],
rightUniqueKeys: JSet[ImmutableBitSet],
isLeftUnique: JBoolean,
isRightUnique: JBoolean,
mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
isRightUnique: JBoolean): JSet[ImmutableBitSet] = {

// first add the different combinations of concatenated unique keys
// from the left and the right, adjusting the right hand side keys to
Expand Down Expand Up @@ -622,7 +629,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu

object FlinkRelMdUniqueKeys {

private val INSTANCE = new FlinkRelMdUniqueKeys
val INSTANCE = new FlinkRelMdUniqueKeys

val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.UNIQUE_KEYS.method, INSTANCE)
Expand Down
Loading

0 comments on commit 5957812

Please sign in to comment.