From 34d51002796afd800fc99d47e3dd06acbd403531 Mon Sep 17 00:00:00 2001 From: Jing Date: Mon, 2 Aug 2021 16:01:57 +0800 Subject: [PATCH] [FLINK-23544][table-planner] Window TVF Supports session window in plan This closes #16669 --- .../functions/sql/FlinkSqlOperatorTable.java | 1 + .../sql/SqlSessionTableFunction.java | 67 +++++ .../functions/sql/SqlWindowTableFunction.java | 3 + .../plan/logical/SessionWindowSpec.java | 74 ++++++ .../planner/plan/logical/WindowSpec.java | 3 +- .../stream/StreamExecWindowTableFunction.java | 5 + .../TwoStageOptimizedWindowAggregateRule.java | 7 + .../StreamPhysicalLocalWindowAggregate.scala | 13 +- .../StreamPhysicalWindowAggregate.scala | 64 +++-- .../rules/logical/SplitAggregateRule.scala | 10 +- .../table/planner/plan/utils/WindowUtil.scala | 4 + .../plan/stream/sql/WindowRankTest.xml | 54 ++++ .../stream/sql/agg/WindowAggregateTest.xml | 238 ++++++++++++++++++ .../plan/stream/sql/join/WindowJoinTest.xml | 143 +++++++++++ .../plan/stream/sql/WindowRankTest.scala | 79 +++++- .../stream/sql/agg/WindowAggregateTest.scala | 64 +++++ .../plan/stream/sql/join/WindowJoinTest.scala | 106 ++++++++ .../stream/sql/WindowAggregateITCase.scala | 87 +++++++ .../window/WindowTableFunctionOperator.java | 3 + 19 files changed, 1004 insertions(+), 21 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 07e0772319746..d030db9478481 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -1179,4 +1179,5 @@ public List getAuxiliaryFunctions() { public static final SqlFunction TUMBLE = new SqlTumbleTableFunction(); public static final SqlFunction HOP = new SqlHopTableFunction(); public static final SqlFunction CUMULATE = new SqlCumulateTableFunction(); + public static final SqlFunction SESSION = new SqlSessionTableFunction(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java new file mode 100644 index 0000000000000..629da14794865 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; + +/** + * SqlSessionTableFunction implements an operator for session. + * + *

It allows three parameters: + * + *

    + *
  1. a table + *
  2. a descriptor to provide a time attribute column name from the input table + *
  3. an interval parameter to specify an inactive activity gap to break sessions + *
+ */ +public class SqlSessionTableFunction extends SqlWindowTableFunction { + + public SqlSessionTableFunction() { + super(SqlKind.SESSION.name(), new OperandMetadataImpl()); + } + + /** Operand type checker for SESSION. */ + private static class OperandMetadataImpl extends AbstractOperandMetadata { + OperandMetadataImpl() { + super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SESSION_GAP), 3); + } + + @Override + public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) { + if (!checkTableAndDescriptorOperands(callBinding, 1)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + if (!checkIntervalOperands(callBinding, 2)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + // check time attribute + return throwExceptionOrReturnFalse( + checkTimeColumnDescriptorOperand(callBinding, 1), throwOnFailure); + } + + @Override + public String getAllowedSignatures(SqlOperator op, String opName) { + return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime interval)"; + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java index 41926ffea80f3..3c23dde3bbb59 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java @@ -83,6 +83,9 @@ public class SqlWindowTableFunction extends SqlFunction implements SqlTableFunct /** The slide interval, only used for HOP window. */ protected static final String PARAM_STEP = "STEP"; + /** The session gap interval, only used for SESSION window. */ + protected static final String PARAM_SESSION_GAP = "GAP"; + /** * Type-inference strategy whereby the row type of a table function call is a ROW, which is * combined from the row type of operand #0 (which is a TABLE) and two additional fields. The diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java new file mode 100644 index 0000000000000..962e5ba0b12c3 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.logical; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +import java.time.Duration; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.TimeUtils.formatWithHighestUnit; + +/** Logical representation of a session window specification. */ +@JsonTypeName("SessionWindow") +public class SessionWindowSpec implements WindowSpec { + public static final String FIELD_NAME_GAP = "gap"; + + @JsonProperty(FIELD_NAME_GAP) + private final Duration gap; + + @JsonCreator + public SessionWindowSpec(@JsonProperty(FIELD_NAME_GAP) Duration gap) { + this.gap = checkNotNull(gap); + } + + @Override + public String toSummaryString(String windowing) { + return String.format("SESSION(%s, gap=[%s])", windowing, formatWithHighestUnit(gap)); + } + + public Duration getGap() { + return gap; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SessionWindowSpec that = (SessionWindowSpec) o; + return gap.equals(that.gap); + } + + @Override + public int hashCode() { + return Objects.hash(SessionWindowSpec.class, gap); + } + + @Override + public String toString() { + return String.format("SESSION(gap=[%s])", formatWithHighestUnit(gap)); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java index bb3c261f78be9..fbde8a9f6056c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java @@ -26,7 +26,8 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = TumblingWindowSpec.class), @JsonSubTypes.Type(value = HoppingWindowSpec.class), - @JsonSubTypes.Type(value = CumulativeWindowSpec.class) + @JsonSubTypes.Type(value = CumulativeWindowSpec.class), + @JsonSubTypes.Type(value = SessionWindowSpec.class) }) public interface WindowSpec { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java index 5fd2025f9a625..af0a6c52cc8fc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java @@ -25,6 +25,7 @@ import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec; import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec; +import org.apache.flink.table.planner.plan.logical.SessionWindowSpec; import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy; import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec; import org.apache.flink.table.planner.plan.logical.WindowSpec; @@ -119,6 +120,10 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { + "3. join with join condition contains window starts equality of input tables " + "and window ends equality of input tables.\n", windowSummary)); + } else if (windowingStrategy.getWindow() instanceof SessionWindowSpec) { + // WindowTableFunctionOperator is not suitable for Session Window because can't do + // state-less window assigning for input row per record for Session Window. + throw new TableException("Session Window TableFunction is not supported yet."); } else if (!windowingStrategy.isRowtime()) { throw new TableException("Processing time Window TableFunction is not supported yet."); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java index 8eeb79c192ccb..b956729563c9f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.plan.logical.SessionWindowSpec; import org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy; import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy; import org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy; @@ -95,6 +96,12 @@ public boolean matches(RelOptRuleCall call) { return false; } + // session window doesn't support two-phase, + // otherwise window assigner results may be different + if (windowing.getWindow() instanceof SessionWindowSpec) { + return false; + } + // all aggregate function should support merge() method if (!AggregateUtil.doAllSupportPartialMerge(windowAgg.aggInfoList().aggInfos())) { return false; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala index 2823aab9919ae..7035e99942c65 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty, PlannerSliceEnd, PlannerWindowReference} -import org.apache.flink.table.planner.plan.logical.{TimeAttributeWindowingStrategy, WindowAttachedWindowingStrategy, WindowingStrategy} +import org.apache.flink.table.planner.plan.logical.{SessionWindowSpec, TimeAttributeWindowingStrategy, WindowAttachedWindowingStrategy, WindowingStrategy} import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.rules.physical.stream.TwoStageOptimizedWindowAggregateRule @@ -69,8 +69,17 @@ class StreamPhysicalLocalWindowAggregate( override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = { windowing match { - case _: WindowAttachedWindowingStrategy | _: TimeAttributeWindowingStrategy => + case _: WindowAttachedWindowingStrategy => // pass + case tws: TimeAttributeWindowingStrategy => + tws.getWindow match { + case _: SessionWindowSpec => + return litmus.fail("StreamPhysicalLocalWindowAggregate should not accept " + + "TimeAttributeWindowingStrategy with Session window. " + + "This should never happen, please open an issue.") + case _ => + // pass + } case _ => return litmus.fail("StreamPhysicalLocalWindowAggregate should only accepts " + "WindowAttachedWindowingStrategy and TimeAttributeWindowingStrategy, " + diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala index eaa70e2abef3e..0837c1d83bf50 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala @@ -18,22 +18,23 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream +import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis +import org.apache.flink.table.expressions.FieldReferenceExpression import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty, PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart} -import org.apache.flink.table.planner.plan.logical.WindowingStrategy -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate +import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty, PlannerWindowReference} +import org.apache.flink.table.planner.plan.logical.{SessionGroupWindow, SessionWindowSpec, TimeAttributeWindowingStrategy, WindowingStrategy} +import org.apache.flink.table.planner.plan.nodes.exec.stream.{StreamExecGroupWindowAggregate, StreamExecWindowAggregate} import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.utils.{AggregateInfoList, AggregateUtil, FlinkRelOptUtil, RelExplainUtil, WindowUtil} import org.apache.flink.table.planner.plan.utils.WindowUtil.checkEmitConfiguration -import org.apache.flink.table.types.logical.utils.LogicalTypeUtils +import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType + import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.{Aggregate, AggregateCall} +import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.calcite.util.ImmutableBitSet import java.util -import java.util.Collections import scala.collection.JavaConverters._ @@ -104,14 +105,45 @@ class StreamPhysicalWindowAggregate( override def translateToExecNode(): ExecNode[_] = { checkEmitConfiguration(FlinkRelOptUtil.getTableConfigFromContext(this)) - new StreamExecWindowAggregate( - grouping, - aggCalls.toArray, - windowing, - namedWindowProperties.toArray, - InputProperty.DEFAULT, - FlinkTypeFactory.toLogicalRowType(getRowType), - getRelDetailedDescription - ) + windowing.getWindow match { + case windowSpec: SessionWindowSpec => + windowing match { + case timeWindowStrategy: TimeAttributeWindowingStrategy => + val timeAttributeFieldName = getInput.getRowType.getFieldNames.get( + timeWindowStrategy.getTimeAttributeIndex) + val timeAttributeType = windowing.getTimeAttributeType + val logicalWindow = SessionGroupWindow( + new PlannerWindowReference("w$", timeAttributeType), + new FieldReferenceExpression( + timeAttributeFieldName, + fromLogicalTypeToDataType(timeAttributeType), + 0, + timeWindowStrategy.getTimeAttributeIndex), + intervalOfMillis(windowSpec.getGap.toMillis) + ) + new StreamExecGroupWindowAggregate( + grouping, + aggCalls.toArray, + logicalWindow, + namedWindowProperties.toArray, + false, + InputProperty.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + case _ => + throw new UnsupportedOperationException(s"$windowing is not supported yet.") + } + case _ => + new StreamExecWindowAggregate( + grouping, + aggCalls.toArray, + windowing, + namedWindowProperties.toArray, + InputProperty.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala index 31d1f2527e7e8..bbe710d0c4716 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala @@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkLogicalRelFactories, FlinkRelBuilder} import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable, SqlFirstLastValueAggFunction} +import org.apache.flink.table.planner.plan.logical.SessionWindowSpec import org.apache.flink.table.planner.plan.PartialFinalType import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.FlinkRelNode @@ -138,11 +139,18 @@ class SplitAggregateRule extends RelOptRule( val windowProps = fmq.getRelWindowProperties(agg.getInput) val isWindowAgg = WindowUtil.groupingContainsWindowStartEnd(agg.getGroupSet, windowProps) val isProctimeWindowAgg = isWindowAgg && !windowProps.isRowtime + + // disable distinct split for session window, + // otherwise window assigner results may be different + val isSessionWindowAgg = isWindowAgg && + windowProps.getWindowSpec.isInstanceOf[SessionWindowSpec] + // TableAggregate is not supported. see also FLINK-21923. val isTableAgg = AggregateUtil.isTableAggregate(agg.getAggCallList) agg.partialFinalType == PartialFinalType.NONE && agg.containsDistinctCall() && - splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg && !isTableAgg + splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg && + !isTableAgg && !isSessionWindowAgg } override def onMatch(call: RelOptRuleCall): Unit = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala index 37056b8f058ee..edd8f0e57d57b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala @@ -218,6 +218,10 @@ object WindowUtil { val step = getOperandAsLong(windowCall.operands(2)) val maxSize = getOperandAsLong(windowCall.operands(3)) new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset) + + case FlinkSqlOperatorTable.SESSION => + val gap = getOperandAsLong(windowCall.operands(2)) + new SessionWindowSpec(Duration.ofMillis(gap)) } new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType, timeIndex) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml index eb96034a22cd5..b54632f03dbd9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml @@ -127,6 +127,60 @@ Calc(select=[window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + + + + + 1000) as max_d, + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv + FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + GROUP BY a, window_start, window_end, window_time + ) +) +WHERE rownum <= 3 + ]]> + + + ($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + + + (b, 1000)) AS $f5, b, e, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index d92fc1b7d2bc4..215d3703b9b8c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -1638,6 +1638,244 @@ Sink(table=[default_catalog.default_database.s1], fields=[window_start, window_e +- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[COUNT(*) AS count1$0, slice_end('w$) AS $slice_end]) +- Calc(select=[rowtime]) +- Reused(reference_id=[1]) +]]> + + + + + 1000), + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + + + ($1, 1000))], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + + + (b, 1000)) AS $f4, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + + + + + 1000), + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + + + ($1, 1000))], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + + + (b, 1000)) AS $f4, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + + + + + 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + + + ($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + + + (b, 1000)) AS $f4, b, e, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + + + + + 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + + + ($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + + + (b, 1000)) AS $f4, b, e, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + + + + + 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + + + ($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + + + (b, 1000)) AS $f4, b, e, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + + + + + 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + + + ($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + + + (b, 1000)) AS $f4, b, e, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml index eb6af76120605..aa51eb2fc70b4 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml @@ -1214,6 +1214,149 @@ Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS w +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala index b11a7f5defc32..fae3d5c5b448b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala @@ -82,7 +82,25 @@ class WindowRankTest extends TableTestBase { } @Test - def testUnsupportedWindowTVF_TumbleOnProctime(): Unit = { + def testUnsupportedWindowTVF_Session(): Unit = { + val sql = + """ + |SELECT window_start, window_end, window_time, a, b, c, d, e + |FROM ( + |SELECT *, + | ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY b DESC) as rownum + |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) + |) + |WHERE rownum <= 3 + """.stripMargin + + thrown.expectMessage("Session Window TableFunction is not supported yet.") + thrown.expect(classOf[TableException]) + util.verifyExplain(sql) + } + + @Test + def testUnsupportedWindowTVF_Proctime(): Unit = { val sql = """ |SELECT window_start, window_end, window_time, a, b, c, d, e @@ -390,6 +408,65 @@ class WindowRankTest extends TableTestBase { util.verifyExplain(sql) } + @Test + def testOnSessionWindowAggregate(): Unit = { + val sql = + """ + |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv + |FROM ( + |SELECT *, + | ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY cnt DESC) as rownum + |FROM ( + | SELECT + | a, + | window_start, + | window_end, + | window_time, + | count(*) as cnt, + | sum(d) as sum_d, + | max(d) filter (where b > 1000) as max_d, + | weightedAvg(b, e) AS wAvg, + | count(distinct c) AS uv + | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + | GROUP BY a, window_start, window_end, window_time + | ) + |) + |WHERE rownum <= 3 + """.stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testOnSessionWindowAggregateOnProctime(): Unit = { + val sql = + """ + |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv + |FROM ( + |SELECT *, + | ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER BY cnt DESC) as rownum + |FROM ( + | SELECT + | a, + | window_start, + | window_end, + | window_time, + | count(*) as cnt, + | sum(d) as sum_d, + | max(d) filter (where b > 1000) as max_d, + | weightedAvg(b, e) AS wAvg, + | count(distinct c) AS uv + | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + | GROUP BY a, window_start, window_end, window_time + | ) + |) + |WHERE rownum <= 3 + """.stripMargin + + thrown.expect(classOf[TableException]) + thrown.expectMessage("Processing time Window TopN is not supported yet.") + util.verifyExplain(sql) + } + // ---------------------------------------------------------------------------------------- // Tests for queries window rank could propagate time attribute // ---------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index 90a6810d40634..8588f56e55881 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -425,6 +425,70 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl util.verifyRelPlan(sql) } + @Test + def testSession_OnRowtime(): Unit = { + // Session window does not two-phase optimization + val sql = + """ + |SELECT + | a, + | window_start, + | window_end, + | count(*), + | sum(d), + | max(d) filter (where b > 1000), + | weightedAvg(b, e) AS wAvg, + | count(distinct c) AS uv + |FROM TABLE( + | SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) + |GROUP BY a, window_start, window_end + """.stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testSession_OnProctime(): Unit = { + // Session window does not two-phase optimization + val sql = + """ + |SELECT + | a, + | window_start, + | window_end, + | count(*), + | sum(d), + | max(d) filter (where b > 1000), + | weightedAvg(b, e) AS wAvg, + | count(distinct c) AS uv + |FROM TABLE( + | SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) + |GROUP BY a, window_start, window_end + """.stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testSession_DistinctSplitEnabled(): Unit = { + // Session window does not split-distinct optimization + util.tableEnv.getConfig.getConfiguration.setBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) + val sql = + """ + |SELECT + | a, + | window_start, + | window_end, + | count(*), + | sum(d), + | max(d) filter (where b > 1000), + | count(distinct c) AS uv + |FROM TABLE( + | SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) + |GROUP BY a, window_start, window_end + """.stripMargin + util.verifyRelPlan(sql) + } + @Test def testCumulate_DistinctSplitEnabled(): Unit = { util.tableEnv.getConfig.getConfiguration.setBoolean( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala index 08c6139655ef3..c7035c296e462 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala @@ -372,6 +372,48 @@ class WindowJoinTest extends TableTestBase { util.verifyExplain(sql) } + @Test + def testUnsupportedWindowTVF_SessionOnRowtime(): Unit = { + val sql = + """ + |SELECT * + |FROM ( + | SELECT * + | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + |) L + |JOIN ( + | SELECT * + | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + """.stripMargin + + thrown.expectMessage("Session Window TableFunction is not supported yet.") + thrown.expect(classOf[TableException]) + util.verifyExplain(sql) + } + + @Test + def testUnsupportedWindowTVF_SessionOnProctime(): Unit = { + val sql = + """ + |SELECT L.a, L.b, L.c, R.a, R.b, R.c + |FROM ( + | SELECT * + | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + |) L + |JOIN ( + | SELECT * + | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + """.stripMargin + + thrown.expectMessage("Processing time Window Join is not supported yet.") + thrown.expect(classOf[TableException]) + util.verifyExplain(sql) + } + // ---------------------------------------------------------------------------------------- // Tests for invalid queries Join on window Aggregate // because left window strategy is not equals to right window strategy. @@ -884,6 +926,70 @@ class WindowJoinTest extends TableTestBase { util.verifyRelPlan(sql) } + @Test + def testOnSessionWindowAggregate(): Unit = { + val sql = + """ + |SELECT L.*, R.* + |FROM ( + | SELECT + | a, + | window_start, + | window_end, + | window_time, + | count(*) as cnt, + | count(distinct c) AS uv + | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + | GROUP BY a, window_start, window_end, window_time + |) L + |JOIN ( + | SELECT + | a, + | window_start, + | window_end, + | window_time, + | count(*) as cnt, + | count(distinct c) AS uv + | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + | GROUP BY a, window_start, window_end, window_time + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + """.stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testOnSessionWindowAggregateOnProctime(): Unit = { + val sql = + """ + |SELECT L.*, R.* + |FROM ( + | SELECT + | a, + | window_start, + | window_end, + | window_time, + | count(*) as cnt, + | count(distinct c) AS uv + | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + | GROUP BY a, window_start, window_end, window_time + |) L + |JOIN ( + | SELECT + | a, + | window_start, + | window_end, + | window_time, + | count(*) as cnt, + | count(distinct c) AS uv + | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + | GROUP BY a, window_start, window_end, window_time + |) R + |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + """.stripMargin + util.verifyRelPlan(sql) + } + @Test def testWindowJoinWithNonEqui(): Unit = { val sql = diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 3e79333f722f9..b02258f443ec1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala._ +import org.apache.flink.table.api._ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.config.OptimizerConfigOptions @@ -27,6 +28,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink} +import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset import org.apache.flink.table.planner.utils.AggregatePhaseStrategy import org.apache.flink.table.planner.utils.AggregatePhaseStrategy._ import org.apache.flink.types.Row @@ -900,6 +902,91 @@ class WindowAggregateITCase( CumulateWindowRollupExpectedData.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n")) } + + @Test + def testEventTimeSessionWindow(): Unit = { + //To verify the "merge" functionality, we create this test with the following characteristics: + // 1. set the Parallelism to 1, and have the test data out of order + // 2. create a waterMark with 10ms offset to delay the window emission by 10ms + val sessionData = List( + (1L, 1, "Hello", "a"), + (2L, 2, "Hello", "b"), + (8L, 8, "Hello", "a"), + (9L, 9, "Hello World", "b"), + (4L, 4, "Hello", "c"), + (16L, 16, "Hello", "d")) + + val stream = failingDataSource(sessionData) + .assignTimestampsAndWatermarks( + new TimestampAndWatermarkWithOffset[(Long, Int, String, String)](10L)) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'string, 'name) + tEnv.registerTable("T", table) + + val sql = + """ + |SELECT + | `string`, + | window_start, + | window_time, + | COUNT(1), + | SUM(1), + | COUNT(`int`), + | SUM(`int`), + | COUNT(DISTINCT name) + |FROM TABLE( + | SESSION(TABLE T, DESCRIPTOR(rowtime), INTERVAL '0.005' SECOND)) + |GROUP BY `string`, window_start, window_end, window_time + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1", + "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1", + "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { + // create a watermark with 10ms offset to delay the window emission by 10ms to verify merge + val sessionWindowTestData = List( + (1L, 2, "Hello"), // (1, Hello) - window + (2L, 2, "Hello"), // (1, Hello) - window, deduped + (8L, 2, "Hello"), // (2, Hello) - window, deduped during merge + (10L, 3, "Hello"), // (2, Hello) - window, forwarded during merge + (9L, 9, "Hello World"), // (1, Hello World) - window + (4L, 1, "Hello"), // (1, Hello) - window, triggering merge + (16L, 16, "Hello")) // (3, Hello) - window (not merged) + + val stream = failingDataSource(sessionWindowTestData) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L)) + val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + tEnv.registerTable("MyTable", table) + + val sqlQuery = + """ + |SELECT c, + | COUNT(DISTINCT b), + | window_end + |FROM TABLE( + | SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '0.005' SECOND)) + |GROUP BY c, window_start, window_end + """.stripMargin + val sink = new TestingAppendSink + tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink) + env.execute() + + val expected = Seq( + "Hello World,1,1970-01-01T00:00:00.014", // window starts at [9L] till {14L} + "Hello,1,1970-01-01T00:00:00.021", // window starts at [16L] till {21L}, not merged + "Hello,3,1970-01-01T00:00:00.015" // window starts at [1L,2L], + // merged with [8L,10L], by [4L], till {15L} + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } } object WindowAggregateITCase { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java index 22911d7420237..51dc68e9f4287 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java @@ -43,6 +43,9 @@ * *

Note: The operator only works for row-time. * + *

Note: The operator is not suitable for Session Window because can't do state-less window + * assigning for input row per record for Session Window. + * *

Note: The operator emits per record instead of at the end of window. */ public class WindowTableFunctionOperator extends TableStreamOperator