Skip to content

Commit

Permalink
[FLINK-6261] [table] Support TUMBLE, HOP, SESSION group window functi…
Browse files Browse the repository at this point in the history
…ons for SQL queries on batch tables.

- Drop support for group window translation of "GROUP BY FLOOR/CEIL".

This closes apache#3675.
  • Loading branch information
fhueske committed Apr 6, 2017
1 parent e2a4f47 commit 6353947
Show file tree
Hide file tree
Showing 9 changed files with 762 additions and 327 deletions.
90 changes: 74 additions & 16 deletions docs/dev/table_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1418,45 +1418,103 @@ val result2 = tableEnv.sql(
</div>
</div>

#### Group windows
#### Limitations

Joins, set operations, and non-windowed aggregations are not supported yet.

{% top %}

### Group Windows

Streaming SQL supports aggregation on group windows by specifying the windows in the `GROUP BY` clause. The following table describes the syntax of the group windows:
Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.

<table class="table table-bordered">
<thead>
<tr>
<th><code>GROUP BY</code> clause</th>
<th class="text-left" style="width: 30%">Group Window Function</th>
<th class="text-left">Description</th>
</tr>
</thead>

<tbody>
<tr>
<td><code>TUMBLE(mode, interval)</code></td>
<td>A tumbling window over the time period specified by <code>interval</code>.</td>
<td><code>TUMBLE(time_attr, interval)</code></td>
<td>Defines are tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
</tr>
<tr>
<td><code>HOP(mode, slide, size)</code></td>
<td>A sliding window with the length of <code>size</code> and moves every <code>slide</code>.</td>
<td><code>HOP(time_attr, interval, interval)</code></td>
<td>Defines a hopping time window (called sliding window in the Table API). A hopping time window has a fixed duration (second <code>interval</code> parameter) and hops by a specified hop interval (first <code>interval</code> parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Hopping windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
</tr>
<tr>
<td><code>SESSION(mode, gap)</code></td>
<td>A session window that has <code>gap</code> as the gap between two windows.</td>
<td><code>SESSION(time_attr, interval)</code></td>
<td>Defines a session time window. Session time windows do not have a fixed duration but their bounds are defined by a time <code>interval</code> of inactivity, i.e., a session window is closed if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time (stream + batch) or processing-time (stream).</td>
</tr>
</tbody>
</table>

The parameters `interval`, `slide`, `size`, `gap` must be constant time intervals. The `mode` can be either `proctime()` or `rowtime()`, which specifies the window is over the processing time or the event time.
For SQL queries on streaming tables, the `time_attr` argument of the group window function must be one of the `rowtime()` or `proctime()` time-indicators, which distinguish between event or processing time, respectively. For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`.

As an example, the following SQL computes the total number of records over a 15 minute tumbling window over processing time:
The following examples show how to specify SQL queries with group windows on streaming tables.

```
SELECT COUNT(*) FROM $table GROUP BY TUMBLE(proctime(), INTERVAL '15' MINUTE)
```
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

#### Limitations
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// register the DataStream as table "Orders"
tableEnv.registerDataStream("Orders", ds, "user, product, amount");

// compute SUM(amount) per day (in event-time)
Table result1 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");

// compute SUM(amount) per day (in processing-time)
Table result2 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime(), INTERVAL '1' DAY), user");

// compute every hour the SUM(amount) of the last 24 hours in event-time
Table result3 = tableEnv.sql(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime(), INTERVAL '1' HOUR, INTERVAL '1' DAY), product");

The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, and `UNION` clauses. Aggregations or joins are not fully supported yet.
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
Table result4 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// read a DataStream from an external source
val ds: DataStream[(Long, String, Int)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)

// compute SUM(amount) per day (in event-time)
val result1 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user")

// compute SUM(amount) per day (in processing-time)
val result2 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime(), INTERVAL '1' DAY), user")

// compute every hour the SUM(amount) of the last 24 hours in event-time
val result3 = tableEnv.sql(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime(), INTERVAL '1' HOUR, INTERVAL '1' DAY), product")

// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
val result4 = tableEnv.sql(
"SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user")

{% endhighlight %}
</div>
</div>

{% top %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ object FlinkRuleSets {
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.CALC_INSTANCE,
ReduceExpressionsRule.JOIN_INSTANCE,
ProjectToWindowRule.PROJECT
ProjectToWindowRule.PROJECT,

// Transform window to LogicalWindowAggregate
DataSetLogicalWindowAggregateRule.INSTANCE
)

/**
Expand Down Expand Up @@ -132,7 +135,7 @@ object FlinkRuleSets {
*/
val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
// Transform window to LogicalWindowAggregate
LogicalWindowAggregateRule.INSTANCE,
DataStreamLogicalWindowAggregateRule.INSTANCE,

// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.rules.common

import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.plan.hep.HepRelVertex
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
import org.apache.calcite.rex._
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate

import _root_.scala.collection.JavaConversions._

abstract class LogicalWindowAggregateRule(ruleName: String)
extends RelOptRule(
RelOptRule.operand(classOf[LogicalAggregate],
RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())),
ruleName) {

override def matches(call: RelOptRuleCall): Boolean = {
val agg = call.rel(0).asInstanceOf[LogicalAggregate]

val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet

val windowExpressions = getWindowExpressions(agg)
if (windowExpressions.length > 1) {
throw new TableException("Only a single window group function may be used in GROUP BY")
}

!distinctAggs && !groupSets && !agg.indicator && windowExpressions.nonEmpty
}

/**
* Transform LogicalAggregate with windowing expression to LogicalProject
* + LogicalWindowAggregate + LogicalProject.
*
* The transformation adds an additional LogicalProject at the top to ensure
* that the types are equivalent.
*/
override def onMatch(call: RelOptRuleCall): Unit = {
val agg = call.rel[LogicalAggregate](0)
val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]

val (windowExpr, windowExprIdx) = getWindowExpressions(agg).head
val window = translateWindowExpression(windowExpr, project.getInput.getRowType)

val builder = call.builder()
val rexBuilder = builder.getRexBuilder

val inAggGroupExpression = getInAggregateGroupExpression(rexBuilder, windowExpr)
val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
val newAgg = builder
.push(project.getInput)
.project(project.getChildExps.updated(windowExprIdx, inAggGroupExpression))
.aggregate(builder.groupKey(
newGroupSet,
agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
.build().asInstanceOf[LogicalAggregate]

// Create an additional project to conform with types
val outAggGroupExpression = getOutAggregateGroupExpression(rexBuilder, windowExpr)
val transformed = call.builder()
transformed.push(LogicalWindowAggregate.create(
window.toLogicalWindow,
Seq[NamedWindowProperty](),
newAgg))
.project(transformed.fields().patch(windowExprIdx, Seq(outAggGroupExpression), 0))

call.transformTo(transformed.build())
}

private[table] def getWindowExpressions(agg: LogicalAggregate): Seq[(RexCall, Int)] = {

val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
val groupKeys = agg.getGroupSet

// get grouping expressions
val groupExpr = project.getProjects.zipWithIndex.filter(p => groupKeys.get(p._2))

// filter grouping expressions for window expressions
groupExpr.filter { g =>
g._1 match {
case call: RexCall =>
call.getOperator match {
case SqlStdOperatorTable.TUMBLE =>
if (call.getOperands.size() == 2) {
true
} else {
throw TableException("TUMBLE window with alignment is not supported yet.")
}
case SqlStdOperatorTable.HOP =>
if (call.getOperands.size() == 3) {
true
} else {
throw TableException("HOP window with alignment is not supported yet.")
}
case SqlStdOperatorTable.SESSION =>
if (call.getOperands.size() == 2) {
true
} else {
throw TableException("SESSION window with alignment is not supported yet.")
}
case _ => false
}
case _ => false
}
}.map(w => (w._1.asInstanceOf[RexCall], w._2))
}

/** Returns the expression that replaces the window expression before the aggregation. */
private[table] def getInAggregateGroupExpression(
rexBuilder: RexBuilder,
windowExpression: RexCall): RexNode

/** Returns the expression that replaces the window expression after the aggregation. */
private[table] def getOutAggregateGroupExpression(
rexBuilder: RexBuilder,
windowExpression: RexCall): RexNode

/** translate the group window expression in to a Flink Table window. */
private[table] def translateWindowExpression(windowExpr: RexCall, rowType: RelDataType): Window

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan.rules.dataSet

import java.math.BigDecimal
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
import org.apache.flink.table.api.{TableException, Window}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo

class DataSetLogicalWindowAggregateRule
extends LogicalWindowAggregateRule("DataSetLogicalWindowAggregateRule") {

/** Returns the operand of the group window function. */
override private[table] def getInAggregateGroupExpression(
rexBuilder: RexBuilder,
windowExpression: RexCall): RexNode = windowExpression.getOperands.get(0)

/** Returns a zero literal of the correct type. */
override private[table] def getOutAggregateGroupExpression(
rexBuilder: RexBuilder,
windowExpression: RexCall): RexNode = {

val literalType = windowExpression.getOperands.get(0).getType
rexBuilder.makeZeroLiteral(literalType)
}

override private[table] def translateWindowExpression(
windowExpr: RexCall,
rowType: RelDataType): Window = {

def getOperandAsLong(call: RexCall, idx: Int): Long =
call.getOperands.get(idx) match {
case v: RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
case _ => throw new TableException("Only constant window descriptors are supported")
}

def getFieldReference(operand: RexNode): Expression = {
operand match {
case ref: RexInputRef =>
// resolve field name of window attribute
val fieldName = rowType.getFieldList.get(ref.getIndex).getName
val fieldType = rowType.getFieldList.get(ref.getIndex).getType
ResolvedFieldReference(fieldName, FlinkTypeFactory.toTypeInfo(fieldType))
}
}

windowExpr.getOperator match {
case SqlStdOperatorTable.TUMBLE =>
val interval = getOperandAsLong(windowExpr, 1)
val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")

case SqlStdOperatorTable.HOP =>
val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2))
val w = Slide
.over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
.every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")

case SqlStdOperatorTable.SESSION =>
val gap = getOperandAsLong(windowExpr, 1)
val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
}
}
}

object DataSetLogicalWindowAggregateRule {
val INSTANCE = new DataSetLogicalWindowAggregateRule
}
Loading

0 comments on commit 6353947

Please sign in to comment.