Skip to content

Commit

Permalink
[FLINK-4691] [table] Add group-windows for streaming tables to Table …
Browse files Browse the repository at this point in the history
…API.

This closes apache#2562.
  • Loading branch information
twalthr authored and fhueske committed Oct 26, 2016
1 parent baf057a commit 44f3977
Show file tree
Hide file tree
Showing 69 changed files with 3,996 additions and 421 deletions.
444 changes: 441 additions & 3 deletions docs/dev/table_api.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.table

import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}

/**
* Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
* windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*/
object Tumble {

/**
* Creates a tumbling window. Tumbling windows are consecutive, non-overlapping
* windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*
* @param size the size of the window as time or row-count interval.
* @return a tumbling window
*/
def over(size: String): TumblingWindow = new TumblingWindow(size)
}

/**
* Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
* a specified slide interval. If the slide interval is smaller than the window size, sliding
* windows are overlapping. Thus, an element can be assigned to multiple windows.
*
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
* of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
* window evaluations.
*/
object Slide {

/**
* Creates a sliding window. Sliding windows have a fixed size and slide by
* a specified slide interval. If the slide interval is smaller than the window size, sliding
* windows are overlapping. Thus, an element can be assigned to multiple windows.
*
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
* elements of 15 minutes and evaluates every five minutes. Each element is contained in three
* consecutive window evaluations.
*
* @param size the size of the window as time or row-count interval
* @return a partially specified sliding window
*/
def over(size: String): SlideWithSize = new SlideWithSize(size)
}

/**
* Helper class for creating a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*/
object Session {

/**
* Creates a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*
* @param gap specifies how long (as interval of milliseconds) to wait for new data before
* closing the session window.
* @return a session window
*/
def withGap(gap: String): SessionWindow = new SessionWindow(gap)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.sql.{Date, Time, Timestamp}

import org.apache.calcite.avatica.util.DateTimeUtils._
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval}
import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval, toRowInterval}
import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
import org.apache.flink.api.table.expressions._

Expand Down Expand Up @@ -102,6 +102,16 @@ trait ImplicitExpressionOperations {
def asc = Asc(expr)
def desc = Desc(expr)

/**
* Returns the start time of a window when applied on a window reference.
*/
def start = WindowStart(expr)

/**
* Returns the end time of a window when applied on a window reference.
*/
def end = WindowEnd(expr)

/**
* Ternary conditional operator that decides which of two other expressions should be evaluated
* based on a evaluated boolean condition.
Expand Down Expand Up @@ -356,7 +366,7 @@ trait ImplicitExpressionOperations {
*/
def days = day

/**
/**
* Creates an interval of the given number of hours.
*
* @return interval of milliseconds
Expand All @@ -370,7 +380,7 @@ trait ImplicitExpressionOperations {
*/
def hours = hour

/**
/**
* Creates an interval of the given number of minutes.
*
* @return interval of milliseconds
Expand All @@ -384,7 +394,7 @@ trait ImplicitExpressionOperations {
*/
def minutes = minute

/**
/**
* Creates an interval of the given number of seconds.
*
* @return interval of milliseconds
Expand All @@ -398,7 +408,7 @@ trait ImplicitExpressionOperations {
*/
def seconds = second

/**
/**
* Creates an interval of the given number of milliseconds.
*
* @return interval of milliseconds
Expand All @@ -411,6 +421,16 @@ trait ImplicitExpressionOperations {
* @return interval of milliseconds
*/
def millis = milli

// row interval type

/**
* Creates an interval of rows.
*
* @return interval of rows
*/
def rows = toRowInterval(expr)

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.table

import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow}

/**
* Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
* windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*/
object Tumble {

/**
* Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
* windows. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*
* @param size the size of the window as time or row-count interval.
* @return a tumbling window
*/
def over(size: Expression): TumblingWindow = new TumblingWindow(size)
}

/**
* Helper object for creating a sliding window. Sliding windows have a fixed size and slide by
* a specified slide interval. If the slide interval is smaller than the window size, sliding
* windows are overlapping. Thus, an element can be assigned to multiple windows.
*
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
* of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
* window evaluations.
*/
object Slide {

/**
* Creates a sliding window. Sliding windows have a fixed size and slide by
* a specified slide interval. If the slide interval is smaller than the window size, sliding
* windows are overlapping. Thus, an element can be assigned to multiple windows.
*
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
* elements of 15 minutes and evaluates every five minutes. Each element is contained in three
* consecutive
*
* @param size the size of the window as time or row-count interval
* @return a partially specified sliding window
*/
def over(size: Expression): SlideWithSize = new SlideWithSize(size)
}

/**
* Helper object for creating a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*/
object Session {

/**
* Creates a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*
* @param gap specifies how long (as interval of milliseconds) to wait for new data before
* closing the session window.
* @return a session window
*/
def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ abstract class BatchTableEnvironment(
val m = internalNamePattern.findFirstIn(name)
m match {
case Some(_) =>
throw new ValidationException(s"Illegal Table name. " +
throw new TableException(s"Illegal Table name. " +
s"Please choose a name that does not contain the pattern $internalNamePattern")
case None =>
}
Expand All @@ -96,7 +96,7 @@ abstract class BatchTableEnvironment(
if (isRegistered(tableName)) {
new Table(this, CatalogNode(tableName, getRowType(tableName)))
} else {
throw new ValidationException(s"Table \'$tableName\' was not found in the registry.")
throw new TableException(s"Table \'$tableName\' was not found in the registry.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ package org.apache.flink.api.table

import java.util.Collections

import org.apache.calcite.plan.volcano.VolcanoPlanner
import java.lang.Iterable

import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan._
import org.apache.calcite.plan.volcano.VolcanoPlanner
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.logical.LogicalAggregate
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.api.table.expressions.WindowProperty
import org.apache.flink.api.table.plan.logical.LogicalWindow
import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate

/**
* Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
Expand All @@ -41,10 +49,25 @@ class FlinkRelBuilder(

def getPlanner: RelOptPlanner = cluster.getPlanner

def getCluster = cluster
def getCluster: RelOptCluster = relOptCluster

override def getTypeFactory: FlinkTypeFactory =
super.getTypeFactory.asInstanceOf[FlinkTypeFactory]

def aggregate(
window: LogicalWindow,
groupKey: GroupKey,
namedProperties: Seq[NamedWindowProperty],
aggCalls: Iterable[AggCall])
: RelBuilder = {
// build logical aggregate
val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate]

// build logical window aggregate from it
push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
this
}

}

object FlinkRelBuilder {
Expand All @@ -69,4 +92,11 @@ object FlinkRelBuilder {
new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
}

/**
* Information necessary to create a window aggregate.
*
* Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
*/
case class NamedWindowProperty(name: String, property: WindowProperty)

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
import org.apache.flink.api.table.plan.schema.GenericRelDataType
import org.apache.flink.api.table.typeutils.IntervalTypeInfo
import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple

import scala.collection.mutable
Expand Down Expand Up @@ -106,8 +106,8 @@ object FlinkTypeFactory {
case SqlTimeTypeInfo.DATE => DATE
case SqlTimeTypeInfo.TIME => TIME
case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
case IntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND
case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND

case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
throw TableException("Character type is not supported.")
Expand All @@ -131,8 +131,8 @@ object FlinkTypeFactory {
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MONTHS
case typeName if DAY_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MILLIS
case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS
case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS

case NULL =>
throw TableException("Type NULL is not supported. " +
Expand Down
Loading

0 comments on commit 44f3977

Please sign in to comment.