Skip to content

Commit

Permalink
[FLINK-23014][table-planner] Support streaming window Deduplicate in …
Browse files Browse the repository at this point in the history
…planner

This closes apache#17632
  • Loading branch information
beyond1920 authored and godfreyhe committed Nov 11, 2021
1 parent 384af4a commit 9699484
Show file tree
Hide file tree
Showing 18 changed files with 2,064 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.deduplicate.window.RowTimeWindowDeduplicateOperatorBuilder;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.time.ZoneId;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Stream {@link ExecNode} for Window Deduplicate. */
@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamExecWindowDeduplicate extends ExecNodeBase<RowData>
implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {

private static final long WINDOW_RANK_MEMORY_RATIO = 100;

public static final String FIELD_NAME_PARTITION_KEYS = "partitionKeys";
public static final String FIELD_NAME_ORDER_KEY = "orderKey";
public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
public static final String FIELD_NAME_WINDOWING = "windowing";

@JsonProperty(FIELD_NAME_PARTITION_KEYS)
private final int[] partitionKeys;

@JsonProperty(FIELD_NAME_ORDER_KEY)
private final int orderKey;

@JsonProperty(FIELD_NAME_KEEP_LAST_ROW)
private final boolean keepLastRow;

@JsonProperty(FIELD_NAME_WINDOWING)
private final WindowingStrategy windowing;

public StreamExecWindowDeduplicate(
int[] partitionKeys,
int orderKey,
boolean keepLastRow,
WindowingStrategy windowing,
InputProperty inputProperty,
RowType outputType,
String description) {
this(
partitionKeys,
orderKey,
keepLastRow,
windowing,
getNewNodeId(),
Collections.singletonList(inputProperty),
outputType,
description);
}

@JsonCreator
public StreamExecWindowDeduplicate(
@JsonProperty(FIELD_NAME_PARTITION_KEYS) int[] partitionKeys,
@JsonProperty(FIELD_NAME_ORDER_KEY) int orderKey,
@JsonProperty(FIELD_NAME_KEEP_LAST_ROW) boolean keepLastRow,
@JsonProperty(FIELD_NAME_WINDOWING) WindowingStrategy windowing,
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.partitionKeys = checkNotNull(partitionKeys);
this.orderKey = orderKey;
this.keepLastRow = keepLastRow;
this.windowing = checkNotNull(windowing);
}

@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
// validate window strategy
if (!windowing.isRowtime()) {
throw new TableException("Processing time Window Deduplication is not supported yet.");
}
int windowEndIndex;
if (windowing instanceof WindowAttachedWindowingStrategy) {
windowEndIndex = ((WindowAttachedWindowingStrategy) windowing).getWindowEnd();
} else {
throw new UnsupportedOperationException(
windowing.getClass().getName() + " is not supported yet.");
}

ExecEdge inputEdge = getInputEdges().get(0);
Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);

TableConfig tableConfig = planner.getTableConfig();
ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(windowing.getTimeAttributeType(), tableConfig);

RowType inputType = (RowType) inputEdge.getOutputType();
RowDataKeySelector selector =
KeySelectorUtil.getRowDataSelector(partitionKeys, InternalTypeInfo.of(inputType));

OneInputStreamOperator<RowData, RowData> operator =
RowTimeWindowDeduplicateOperatorBuilder.builder()
.inputSerializer(new RowDataSerializer(inputType))
.shiftTimeZone(shiftTimeZone)
.keySerializer(
(PagedTypeSerializer<RowData>)
selector.getProducedType().toSerializer())
.keepLastRow(keepLastRow)
.rowtimeIndex(orderKey)
.windowEndIndex(windowEndIndex)
.build();

OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
getDescription(),
SimpleOperatorFactory.of(operator),
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism(),
WINDOW_RANK_MEMORY_RATIO);

// set KeyType and Selector for state
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
return transform;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowDeduplicate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowJoin;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowRank;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowTableFunction;
Expand All @@ -42,6 +43,10 @@ public interface SimplifyWindowTableFunctionRules {
new SimplifyWindowTableFunctionWithWindowRankRule();
SimplifyWindowTableFunctionWithCalcWindowRankRule WITH_CALC_WINDOW_RANK =
new SimplifyWindowTableFunctionWithCalcWindowRankRule();
SimplifyWindowTableFunctionWithWindowDeduplicateRule WITH_WINDOW_DEDUPLICATE =
new SimplifyWindowTableFunctionWithWindowDeduplicateRule();
SimplifyWindowTableFunctionWithCalcWindowDeduplicateRule WITH_CALC_WINDOW_DEDUPLICATE =
new SimplifyWindowTableFunctionWithCalcWindowDeduplicateRule();
SimplifyWindowTableFunctionRuleWithWindowJoinRule WITH_WINDOW_JOIN =
new SimplifyWindowTableFunctionRuleWithWindowJoinRule();
SimplifyWindowTableFunctionRuleWithLeftCalcWindowJoinRule WITH_LEFT_CALC_WINDOW_JOIN =
Expand Down Expand Up @@ -137,6 +142,36 @@ class SimplifyWindowTableFunctionWithCalcWindowRankRule
}
}

class SimplifyWindowTableFunctionWithWindowDeduplicateRule
extends SimplifyWindowTableFunctionWithWindowRankRuleBase {

SimplifyWindowTableFunctionWithWindowDeduplicateRule() {
super(
operand(
StreamPhysicalWindowDeduplicate.class,
operand(
StreamPhysicalExchange.class,
operand(StreamPhysicalWindowTableFunction.class, any()))),
"SimplifyWindowTableFunctionWithWindowDeduplicateRule");
}
}

class SimplifyWindowTableFunctionWithCalcWindowDeduplicateRule
extends SimplifyWindowTableFunctionWithWindowRankRuleBase {

SimplifyWindowTableFunctionWithCalcWindowDeduplicateRule() {
super(
operand(
StreamPhysicalWindowDeduplicate.class,
operand(
StreamPhysicalExchange.class,
operand(
StreamPhysicalCalc.class,
operand(StreamPhysicalWindowTableFunction.class, any())))),
"SimplifyWindowTableFunctionWithCalcWindowDeduplicateRule");
}
}

abstract class SimplifyWindowTableFunctionWithWindowJoinRuleBase
extends SimplifyWindowTableFunctionRuleBase {
SimplifyWindowTableFunctionWithWindowJoinRuleBase(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream

import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.logical.WindowingStrategy
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.utils._

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataType

import java.util

import scala.collection.JavaConverters._

/**
* Stream physical RelNode which deduplicate on keys and keeps only first row or last row for each
* window. This node is an optimization of [[StreamPhysicalWindowRank]].
* Compared to [[StreamPhysicalWindowRank]], this node could access/write state with higher
* performance. The RelNode also requires PARTITION BY clause contains start and end columns of
* the windowing TVF.
*/
class StreamPhysicalWindowDeduplicate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputRel: RelNode,
partitionKeys: Array[Int],
orderKey: Int,
keepLastRow: Boolean,
windowing: WindowingStrategy)
extends SingleRel(cluster, traitSet, inputRel)
with StreamPhysicalRel {

override def requireWatermark: Boolean = windowing.isRowtime

override def deriveRowType(): RelDataType = getInput.getRowType

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new StreamPhysicalWindowDeduplicate(
cluster,
traitSet,
inputs.get(0),
partitionKeys,
orderKey,
keepLastRow,
windowing)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val inputRowType = inputRel.getRowType
val inputFieldNames = inputRowType.getFieldNames.asScala.toArray
val keep = if (keepLastRow) "LastRow" else "FirstRow"
val orderString = if (windowing.isRowtime) "ROWTIME" else "PROCTIME"
pw.input("input", getInput)
.item("window", windowing.toSummaryString(inputFieldNames))
.item("keep", keep)
.item("partitionKeys", RelExplainUtil.fieldToString(partitionKeys, inputRowType))
.item("orderKey", inputFieldNames(orderKey))
.item("order", orderString)
}

override def translateToExecNode(): ExecNode[_] = {
new StreamExecWindowDeduplicate(
partitionKeys,
orderKey,
keepLastRow,
windowing,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(window, children, providedTrait, requiredTrait, requester)

case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank =>
// WindowAggregate and WindowRank support insert-only in input
case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
_: StreamPhysicalWindowDeduplicate =>
// WindowAggregate, WindowRank, WindowDeduplicate support insert-only in input
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
val providedTrait = ModifyKindSetTrait.INSERT_ONLY
createNewNode(rel, children, providedTrait, requiredTrait, requester)
Expand Down Expand Up @@ -480,11 +481,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
createNewNode(rel, children, requiredTrait)

case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
_: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalWindowDeduplicate | _: StreamPhysicalDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowAggregate, WindowTableAggregate, Deduplicate, TemporalSort, CEP,
// OverAggregate, and IntervalJoin, WindowJoin require nothing about UpdateKind.
// WindowAggregate, WindowTableAggregate, WindowRank, WindowDeduplicate, Deduplicate,
// TemporalSort, CEP, OverAggregate, and IntervalJoin, WindowJoin require nothing about
// UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ object FlinkStreamRuleSets {
StreamPhysicalTemporalSortRule.INSTANCE,
// rank
StreamPhysicalRankRule.INSTANCE,
StreamPhysicalDeduplicateRule.RANK_INSTANCE,
StreamPhysicalDeduplicateRule.INSTANCE,
// expand
StreamPhysicalExpandRule.INSTANCE,
// group agg
Expand All @@ -460,6 +460,7 @@ object FlinkStreamRuleSets {
PullUpWindowTableFunctionIntoWindowAggregateRule.INSTANCE,
ExpandWindowTableFunctionTransposeRule.INSTANCE,
StreamPhysicalWindowRankRule.INSTANCE,
StreamPhysicalWindowDeduplicateRule.INSTANCE,
// join
StreamPhysicalJoinRule.INSTANCE,
StreamPhysicalIntervalJoinRule.INSTANCE,
Expand Down Expand Up @@ -507,6 +508,8 @@ object FlinkStreamRuleSets {
// simplify window tvf
SimplifyWindowTableFunctionRules.WITH_CALC_WINDOW_RANK,
SimplifyWindowTableFunctionRules.WITH_WINDOW_RANK,
SimplifyWindowTableFunctionRules.WITH_CALC_WINDOW_DEDUPLICATE,
SimplifyWindowTableFunctionRules.WITH_WINDOW_DEDUPLICATE,
SimplifyWindowTableFunctionRules.WITH_LEFT_RIGHT_CALC_WINDOW_JOIN,
SimplifyWindowTableFunctionRules.WITH_LEFT_CALC_WINDOW_JOIN,
SimplifyWindowTableFunctionRules.WITH_RIGHT_CALC_WINDOW_JOIN,
Expand Down
Loading

0 comments on commit 9699484

Please sign in to comment.