Skip to content

Commit

Permalink
[FLINK-11918][table] Deprecated some Window APIs and Rename Window to…
Browse files Browse the repository at this point in the history
… GroupWindow

This closes apache#7985
  • Loading branch information
hequn8128 authored and sunjincheng121 committed Mar 15, 2019
1 parent c02eec9 commit 231e4a5
Show file tree
Hide file tree
Showing 30 changed files with 358 additions and 208 deletions.
14 changes: 7 additions & 7 deletions docs/dev/table/tableApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -1271,14 +1271,14 @@ orders.insertInto("OutOrders")

Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.

Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute.
Windows are defined using the `window(w: GroupWindow)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute.
The following example shows how to define a window aggregation on a table.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Table table = input
.window([Window w].as("w")) // define window with alias w
.window([GroupWindow w].as("w")) // define window with alias w
.groupBy("w") // group the table by window w
.select("b.sum"); // aggregate
{% endhighlight %}
Expand All @@ -1287,7 +1287,7 @@ Table table = input
<div data-lang="scala" markdown="1">
{% highlight scala %}
val table = input
.window([w: Window] as 'w) // define window with alias w
.window([w: GroupWindow] as 'w) // define window with alias w
.groupBy('w) // group the table by window w
.select('b.sum) // aggregate
{% endhighlight %}
Expand All @@ -1301,7 +1301,7 @@ The following example shows how to define a window aggregation with additional g
<div data-lang="java" markdown="1">
{% highlight java %}
Table table = input
.window([Window w].as("w")) // define window with alias w
.window([GroupWindow w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, b.sum"); // aggregate
{% endhighlight %}
Expand All @@ -1310,7 +1310,7 @@ Table table = input
<div data-lang="scala" markdown="1">
{% highlight scala %}
val table = input
.window([w: Window] as 'w) // define window with alias w
.window([w: GroupWindow] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'b.sum) // aggregate
{% endhighlight %}
Expand All @@ -1323,7 +1323,7 @@ Window properties such as the start, end, or rowtime timestamp of a time window
<div data-lang="java" markdown="1">
{% highlight java %}
Table table = input
.window([Window w].as("w")) // define window with alias w
.window([GroupWindow w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
{% endhighlight %}
Expand All @@ -1332,7 +1332,7 @@ Table table = input
<div data-lang="scala" markdown="1">
{% highlight scala %}
val table = input
.window([w: Window] as 'w) // define window with alias w
.window([w: GroupWindow] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps
{% endhighlight %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,16 @@
package org.apache.flink.table.api.java

import org.apache.flink.table.api._
import org.apache.flink.table.expressions.ExpressionParser

/**
* Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
* windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Tumble]].
*/
object Tumble {

/**
* Creates a tumbling window. Tumbling windows are consecutive, non-overlapping
* windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*
* @param size the size of the window as time or row-count interval.
* @return a partially defined tumbling window
*/
def over(size: String): TumbleWithSize = new TumbleWithSize(size)
}
@Deprecated
object Tumble extends TumbleBase

/**
* Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
Expand All @@ -47,74 +38,27 @@ object Tumble {
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
* of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
* window evaluations.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Slide]].
*/
object Slide {

/**
* Creates a sliding window. Sliding windows have a fixed size and slide by
* a specified slide interval. If the slide interval is smaller than the window size, sliding
* windows are overlapping. Thus, an element can be assigned to multiple windows.
*
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
* elements of 15 minutes and evaluates every five minutes. Each element is contained in three
* consecutive window evaluations.
*
* @param size the size of the window as time or row-count interval
* @return a partially specified sliding window
*/
def over(size: String): SlideWithSize = new SlideWithSize(size)
}
@Deprecated
object Slide extends SlideBase

/**
* Helper class for creating a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Session]].
*/
object Session {

/**
* Creates a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*
* @param gap specifies how long (as interval of milliseconds) to wait for new data before
* closing the session window.
* @return a partially defined session window
*/
def withGap(gap: String): SessionWithGap = new SessionWithGap(gap)
}
@Deprecated
object Session extends SessionBase

/**
* Helper class for creating an over window. Similar to SQL, over window aggregates compute an
* aggregate for each input row over a range of its neighboring rows.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Over]].
*/
object Over {

/**
* Specifies the time attribute on which rows are ordered.
*
* For streaming tables, reference a rowtime or proctime time attribute here
* to specify the time mode.
*
* For batch tables, refer to a timestamp or long attribute.
*
* @param orderBy field reference
* @return an over window with defined order
*/
def orderBy(orderBy: String): OverWindowPartitionedOrdered = {
new OverWindowPartitionedOrdered(Seq(), ExpressionParser.parseExpression(orderBy))
}

/**
* Partitions the elements on some partition keys.
*
* Each partition is individually sorted and aggregate functions are applied to each
* partition separately.
*
* @param partitionBy list of field references
* @return an over window with defined partitioning
*/
def partitionBy(partitionBy: String): OverWindowPartitioned = {
new OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy))
}
}
@Deprecated
object Over extends OverBase
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,17 @@
package org.apache.flink.table.api.scala

import org.apache.flink.table.api._
import org.apache.flink.table.expressions.Expression

/**
* Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
* windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Tumble]].
*/
object Tumble {

/**
* Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
* windows. For example, a tumbling window of 5 minutes size groups
* elements in 5 minutes intervals.
*
* @param size the size of the window as time or row-count interval.
* @return a partially defined tumbling window
*/
def over(size: Expression): TumbleWithSize = new TumbleWithSize(size)
}
@deprecated(
"This class will be replaced by org.apache.flink.table.api.Tumble.", "1.8")
object Tumble extends TumbleBase

/**
* Helper object for creating a sliding window. Sliding windows have a fixed size and slide by
Expand All @@ -47,74 +39,30 @@ object Tumble {
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements
* of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive
* window evaluations.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Slide]].
*/
object Slide {

/**
* Creates a sliding window. Sliding windows have a fixed size and slide by
* a specified slide interval. If the slide interval is smaller than the window size, sliding
* windows are overlapping. Thus, an element can be assigned to multiple windows.
*
* For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
* elements of 15 minutes and evaluates every five minutes. Each element is contained in three
* consecutive
*
* @param size the size of the window as time or row-count interval
* @return a partially specified sliding window
*/
def over(size: Expression): SlideWithSize = new SlideWithSize(size)
}
@deprecated(
"This class will be replaced by org.apache.flink.table.api.Slide.", "1.8")
object Slide extends SlideBase

/**
* Helper object for creating a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Session]].
*/
object Session {

/**
* Creates a session window. The boundary of session windows are defined by
* intervals of inactivity, i.e., a session window is closes if no event appears for a defined
* gap period.
*
* @param gap specifies how long (as interval of milliseconds) to wait for new data before
* closing the session window.
* @return a partially defined session window
*/
def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap)
}
@deprecated(
"This class will be replaced by org.apache.flink.table.api.Session.", "1.8")
object Session extends SessionBase

/**
* Helper class for creating an over window. Similar to SQL, over window aggregates compute an
* aggregate for each input row over a range of its neighboring rows.
*
* @deprecated This class will be replaced by [[org.apache.flink.table.api.Over]].
*/
object Over {

/**
* Specifies the time attribute on which rows are ordered.
*
* For streaming tables, reference a rowtime or proctime time attribute here
* to specify the time mode.
*
* For batch tables, refer to a timestamp or long attribute.
*
* @param orderBy field reference
* @return an over window with defined order
*/
def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = {
new OverWindowPartitionedOrdered(Seq(), orderBy)
}

/**
* Partitions the elements on some partition keys.
*
* Each partition is individually sorted and aggregate functions are applied to each
* partition separately.
*
* @param partitionBy list of field references
* @return an over window with defined partitioning
*/
def partitionBy(partitionBy: Expression*): OverWindowPartitioned = {
new OverWindowPartitioned(partitionBy)
}
}
@deprecated(
"This class will be replaced by org.apache.flink.table.api.Over.", "1.8")
object Over extends OverBase
Original file line number Diff line number Diff line change
Expand Up @@ -1091,11 +1091,39 @@ class Table(
*
* @param window window that specifies how elements are grouped.
* @return A windowed table.
*
* @deprecated Will be removed in a future release. Please use Table.window(window: GroupWindow)
* instead.
*/
@deprecated(
"This method will be removed. Please use Table.window(window: GroupWindow) instead.",
"1.8")
@Deprecated
def window(window: Window): WindowedTable = {
new WindowedTable(this, window)
}

/**
* Groups the records of a table by assigning them to windows defined by a time or row interval.
*
* For streaming tables of infinite size, grouping into windows is required to define finite
* groups on which group-based aggregates can be computed.
*
* For batch tables of finite size, windowing essentially provides shortcuts for time-based
* groupBy.
*
* __Note__: Computing windowed aggregates on a streaming table is only a parallel operation
* if additional grouping attributes are added to the `groupBy(...)` clause.
* If the `groupBy(...)` only references a window alias, the streamed table will be processed
* by a single task, i.e., with parallelism 1.
*
* @param window group window that specifies how elements are grouped.
* @return A group windowed table.
*/
def window(window: GroupWindow): GroupWindowedTable = {
new GroupWindowedTable(this, window)
}

/**
* Defines over-windows on the records of a table.
*
Expand Down Expand Up @@ -1204,7 +1232,12 @@ class GroupedTable(

/**
* A table that has been windowed for grouping [[Window]]s.
*
* @deprecated Will be replaced by [[GroupWindowedTable]].
*/
@Deprecated
@deprecated(
"This class will be replaced by GroupWindowedTable.", "1.8")
class WindowedTable(
private[flink] val table: Table,
private[flink] val window: Window) {
Expand Down Expand Up @@ -1256,13 +1289,28 @@ class WindowedTable(
}

/**
* A table that has been windowed and grouped for grouping [[Window]]s.
* A table that has been windowed for [[GroupWindow]]s.
*/
class WindowGroupedTable(
class GroupWindowedTable(
override private[flink] val table: Table,
override private[flink] val window: GroupWindow)
extends WindowedTable(table, window)

/**
* A table that has been windowed and grouped for [[GroupWindow]]s.
*
* @deprecated The constructor contains [[Window]] parameter will be removed. Use constructor
* with [[GroupWindow]] instead.
*/
class WindowGroupedTable @Deprecated() (
private[flink] val table: Table,
private[flink] val groupKeys: Seq[Expression],
private[flink] val window: Window) {

def this(table: Table, groupKeys: Seq[Expression], window: GroupWindow) {
this(table, groupKeys, window.asInstanceOf[Window])
}

/**
* Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement.
* The field expressions can contain complex expressions and aggregations.
Expand Down
Loading

0 comments on commit 231e4a5

Please sign in to comment.