From 231e4a503ac249522ad390b6f7eb43635468b842 Mon Sep 17 00:00:00 2001 From: hequn8128 Date: Thu, 14 Mar 2019 18:06:19 +0800 Subject: [PATCH] [FLINK-11918][table] Deprecated some Window APIs and Rename Window to GroupWindow This closes #7985 --- docs/dev/table/tableApi.md | 14 +- .../apache/flink/table/api/java/windows.scala | 88 ++------ .../flink/table/api/scala/windows.scala | 92 ++------ .../org/apache/flink/table/api/table.scala | 52 ++++- .../org/apache/flink/table/api/windows.scala | 211 +++++++++++++++++- .../api/batch/table/GroupWindowTest.scala | 1 + .../GroupWindowValidationTest.scala | 2 +- .../validation/OverWindowValidationTest.scala | 2 +- .../api/stream/table/AggregateTest.scala | 1 + .../table/api/stream/table/CalcTest.scala | 3 +- .../api/stream/table/GroupWindowTest.scala | 1 + .../api/stream/table/OverWindowTest.scala | 2 +- .../api/stream/table/TableSourceTest.scala | 2 +- .../AggregateStringExpressionTest.scala | 6 +- .../GroupWindowStringExpressionTest.scala | 14 +- .../OverWindowStringExpressionTest.scala | 48 ++-- .../table/validation/CalcValidationTest.scala | 4 +- .../GroupWindowValidationTest.scala | 2 +- .../validation/OverWindowValidationTest.scala | 2 +- .../table/plan/RetractionRulesTest.scala | 2 +- .../plan/TimeIndicatorConversionTest.scala | 1 + .../table/plan/UpdatingPlanCheckerTest.scala | 2 +- .../batch/table/GroupWindowITCase.scala | 1 + .../batch/table/TableSourceITCase.scala | 2 +- .../runtime/stream/TimeAttributesITCase.scala | 2 +- .../stream/table/GroupWindowITCase.scala | 2 +- .../runtime/stream/table/JoinITCase.scala | 2 +- .../stream/table/OverWindowITCase.scala | 1 + .../stream/table/TableSinkITCase.scala | 2 +- .../stream/table/TableSourceITCase.scala | 2 +- 30 files changed, 358 insertions(+), 208 deletions(-) diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index a509d85e001a8..f4c74cc12ece1 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -1271,14 +1271,14 @@ orders.insertInto("OutOrders") Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals. -Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. +Windows are defined using the `window(w: GroupWindow)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. The following example shows how to define a window aggregation on a table.
{% highlight java %} Table table = input - .window([Window w].as("w")) // define window with alias w + .window([GroupWindow w].as("w")) // define window with alias w .groupBy("w") // group the table by window w .select("b.sum"); // aggregate {% endhighlight %} @@ -1287,7 +1287,7 @@ Table table = input
{% highlight scala %} val table = input - .window([w: Window] as 'w) // define window with alias w + .window([w: GroupWindow] as 'w) // define window with alias w .groupBy('w) // group the table by window w .select('b.sum) // aggregate {% endhighlight %} @@ -1301,7 +1301,7 @@ The following example shows how to define a window aggregation with additional g
{% highlight java %} Table table = input - .window([Window w].as("w")) // define window with alias w + .window([GroupWindow w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, b.sum"); // aggregate {% endhighlight %} @@ -1310,7 +1310,7 @@ Table table = input
{% highlight scala %} val table = input - .window([w: Window] as 'w) // define window with alias w + .window([w: GroupWindow] as 'w) // define window with alias w .groupBy('w, 'a) // group the table by attribute a and window w .select('a, 'b.sum) // aggregate {% endhighlight %} @@ -1323,7 +1323,7 @@ Window properties such as the start, end, or rowtime timestamp of a time window
{% highlight java %} Table table = input - .window([Window w].as("w")) // define window with alias w + .window([GroupWindow w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps {% endhighlight %} @@ -1332,7 +1332,7 @@ Table table = input
{% highlight scala %} val table = input - .window([w: Window] as 'w) // define window with alias w + .window([w: GroupWindow] as 'w) // define window with alias w .groupBy('w, 'a) // group the table by attribute a and window w .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps {% endhighlight %} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala index 5f7f422b39370..24f39a17f3ce7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala @@ -19,25 +19,16 @@ package org.apache.flink.table.api.java import org.apache.flink.table.api._ -import org.apache.flink.table.expressions.ExpressionParser /** * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups * elements in 5 minutes intervals. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Tumble]]. */ -object Tumble { - - /** - * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping - * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - * - * @param size the size of the window as time or row-count interval. - * @return a partially defined tumbling window - */ - def over(size: String): TumbleWithSize = new TumbleWithSize(size) -} +@Deprecated +object Tumble extends TumbleBase /** * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by @@ -47,74 +38,27 @@ object Tumble { * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive * window evaluations. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Slide]]. */ -object Slide { - - /** - * Creates a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups - * elements of 15 minutes and evaluates every five minutes. Each element is contained in three - * consecutive window evaluations. - * - * @param size the size of the window as time or row-count interval - * @return a partially specified sliding window - */ - def over(size: String): SlideWithSize = new SlideWithSize(size) -} +@Deprecated +object Slide extends SlideBase /** * Helper class for creating a session window. The boundary of session windows are defined by * intervals of inactivity, i.e., a session window is closes if no event appears for a defined * gap period. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Session]]. */ -object Session { - - /** - * Creates a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - * - * @param gap specifies how long (as interval of milliseconds) to wait for new data before - * closing the session window. - * @return a partially defined session window - */ - def withGap(gap: String): SessionWithGap = new SessionWithGap(gap) -} +@Deprecated +object Session extends SessionBase /** * Helper class for creating an over window. Similar to SQL, over window aggregates compute an * aggregate for each input row over a range of its neighboring rows. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Over]]. */ -object Over { - - /** - * Specifies the time attribute on which rows are ordered. - * - * For streaming tables, reference a rowtime or proctime time attribute here - * to specify the time mode. - * - * For batch tables, refer to a timestamp or long attribute. - * - * @param orderBy field reference - * @return an over window with defined order - */ - def orderBy(orderBy: String): OverWindowPartitionedOrdered = { - new OverWindowPartitionedOrdered(Seq(), ExpressionParser.parseExpression(orderBy)) - } - - /** - * Partitions the elements on some partition keys. - * - * Each partition is individually sorted and aggregate functions are applied to each - * partition separately. - * - * @param partitionBy list of field references - * @return an over window with defined partitioning - */ - def partitionBy(partitionBy: String): OverWindowPartitioned = { - new OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy)) - } -} +@Deprecated +object Over extends OverBase diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala index 5af1d09ae5bac..4c2a5dddb78d0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -19,25 +19,17 @@ package org.apache.flink.table.api.scala import org.apache.flink.table.api._ -import org.apache.flink.table.expressions.Expression /** * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups * elements in 5 minutes intervals. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Tumble]]. */ -object Tumble { - - /** - * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping - * windows. For example, a tumbling window of 5 minutes size groups - * elements in 5 minutes intervals. - * - * @param size the size of the window as time or row-count interval. - * @return a partially defined tumbling window - */ - def over(size: Expression): TumbleWithSize = new TumbleWithSize(size) -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Tumble.", "1.8") +object Tumble extends TumbleBase /** * Helper object for creating a sliding window. Sliding windows have a fixed size and slide by @@ -47,74 +39,30 @@ object Tumble { * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive * window evaluations. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Slide]]. */ -object Slide { - - /** - * Creates a sliding window. Sliding windows have a fixed size and slide by - * a specified slide interval. If the slide interval is smaller than the window size, sliding - * windows are overlapping. Thus, an element can be assigned to multiple windows. - * - * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups - * elements of 15 minutes and evaluates every five minutes. Each element is contained in three - * consecutive - * - * @param size the size of the window as time or row-count interval - * @return a partially specified sliding window - */ - def over(size: Expression): SlideWithSize = new SlideWithSize(size) -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Slide.", "1.8") +object Slide extends SlideBase /** * Helper object for creating a session window. The boundary of session windows are defined by * intervals of inactivity, i.e., a session window is closes if no event appears for a defined * gap period. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Session]]. */ -object Session { - - /** - * Creates a session window. The boundary of session windows are defined by - * intervals of inactivity, i.e., a session window is closes if no event appears for a defined - * gap period. - * - * @param gap specifies how long (as interval of milliseconds) to wait for new data before - * closing the session window. - * @return a partially defined session window - */ - def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap) -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Session.", "1.8") +object Session extends SessionBase /** * Helper class for creating an over window. Similar to SQL, over window aggregates compute an * aggregate for each input row over a range of its neighboring rows. + * + * @deprecated This class will be replaced by [[org.apache.flink.table.api.Over]]. */ -object Over { - - /** - * Specifies the time attribute on which rows are ordered. - * - * For streaming tables, reference a rowtime or proctime time attribute here - * to specify the time mode. - * - * For batch tables, refer to a timestamp or long attribute. - * - * @param orderBy field reference - * @return an over window with defined order - */ - def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = { - new OverWindowPartitionedOrdered(Seq(), orderBy) - } - - /** - * Partitions the elements on some partition keys. - * - * Each partition is individually sorted and aggregate functions are applied to each - * partition separately. - * - * @param partitionBy list of field references - * @return an over window with defined partitioning - */ - def partitionBy(partitionBy: Expression*): OverWindowPartitioned = { - new OverWindowPartitioned(partitionBy) - } -} +@deprecated( + "This class will be replaced by org.apache.flink.table.api.Over.", "1.8") +object Over extends OverBase diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala index d1cf8ace83ae7..2c3738ae4016a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala @@ -1091,11 +1091,39 @@ class Table( * * @param window window that specifies how elements are grouped. * @return A windowed table. + * + * @deprecated Will be removed in a future release. Please use Table.window(window: GroupWindow) + * instead. */ + @deprecated( + "This method will be removed. Please use Table.window(window: GroupWindow) instead.", + "1.8") + @Deprecated def window(window: Window): WindowedTable = { new WindowedTable(this, window) } + /** + * Groups the records of a table by assigning them to windows defined by a time or row interval. + * + * For streaming tables of infinite size, grouping into windows is required to define finite + * groups on which group-based aggregates can be computed. + * + * For batch tables of finite size, windowing essentially provides shortcuts for time-based + * groupBy. + * + * __Note__: Computing windowed aggregates on a streaming table is only a parallel operation + * if additional grouping attributes are added to the `groupBy(...)` clause. + * If the `groupBy(...)` only references a window alias, the streamed table will be processed + * by a single task, i.e., with parallelism 1. + * + * @param window group window that specifies how elements are grouped. + * @return A group windowed table. + */ + def window(window: GroupWindow): GroupWindowedTable = { + new GroupWindowedTable(this, window) + } + /** * Defines over-windows on the records of a table. * @@ -1204,7 +1232,12 @@ class GroupedTable( /** * A table that has been windowed for grouping [[Window]]s. + * + * @deprecated Will be replaced by [[GroupWindowedTable]]. */ +@Deprecated +@deprecated( + "This class will be replaced by GroupWindowedTable.", "1.8") class WindowedTable( private[flink] val table: Table, private[flink] val window: Window) { @@ -1256,13 +1289,28 @@ class WindowedTable( } /** - * A table that has been windowed and grouped for grouping [[Window]]s. + * A table that has been windowed for [[GroupWindow]]s. */ -class WindowGroupedTable( +class GroupWindowedTable( + override private[flink] val table: Table, + override private[flink] val window: GroupWindow) + extends WindowedTable(table, window) + +/** + * A table that has been windowed and grouped for [[GroupWindow]]s. + * + * @deprecated The constructor contains [[Window]] parameter will be removed. Use constructor + * with [[GroupWindow]] instead. + */ +class WindowGroupedTable @Deprecated() ( private[flink] val table: Table, private[flink] val groupKeys: Seq[Expression], private[flink] val window: Window) { + def this(table: Table, groupKeys: Seq[Expression], window: GroupWindow) { + this(table, groupKeys, window.asInstanceOf[Window]) + } + /** * Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. * The field expressions can contain complex expressions and aggregations. diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala index a650683d4c1c1..948a620405098 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala @@ -193,7 +193,12 @@ class OverWindowPartitionedOrderedPreceding( * is required to apply aggregations on streaming tables. * * For finite batch tables, group windows provide shortcuts for time-based groupBy. + * + * @deprecated Will be replaced by [[GroupWindow]] */ +@Deprecated +@deprecated( + "This class will be replaced by GroupWindow.", "1.8") abstract class Window(alias: Expression, timeField: Expression) { def getAlias: Expression = { @@ -205,6 +210,21 @@ abstract class Window(alias: Expression, timeField: Expression) { } } +/** + * A group window specification. + * + * Group windows group rows based on time or row-count intervals and is therefore essentially a + * special type of groupBy. Just like groupBy, group windows allow to compute aggregates + * on groups of elements. + * + * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping + * is required to apply aggregations on streaming tables. + * + * For finite batch tables, group windows provide shortcuts for time-based groupBy. + */ +abstract class GroupWindow(alias: Expression, timeField: Expression) + extends Window(alias, timeField) + // ------------------------------------------------------------------------------------------------ // Tumbling windows // ------------------------------------------------------------------------------------------------ @@ -293,7 +313,7 @@ class TumbleWithSizeOnTimeWithAlias( alias: Expression, timeField: Expression, size: Expression) - extends Window( + extends GroupWindow( alias, timeField) { @@ -425,7 +445,7 @@ class SlideWithSizeAndSlideOnTimeWithAlias( timeField: Expression, size: Expression, slide: Expression) - extends Window( + extends GroupWindow( alias, timeField) { @@ -526,7 +546,7 @@ class SessionWithGapOnTimeWithAlias( alias: Expression, timeField: Expression, gap: Expression) - extends Window( + extends GroupWindow( alias, timeField) { @@ -534,3 +554,188 @@ class SessionWithGapOnTimeWithAlias( gap } } + +/** + * Base class for Tumble Window Helper classes. This class contains help methods to create Tumble + * Windows. + */ +class TumbleBase { + + /** + * Creates a tumbling window. Tumbling windows are consecutive, non-overlapping + * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + * + * @param size the size of the window as time or row-count interval. + * @return a partially defined tumbling window + */ + def over(size: String): TumbleWithSize = new TumbleWithSize(size) + + /** + * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping + * windows. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + * + * @param size the size of the window as time or row-count interval. + * @return a partially defined tumbling window + */ + def over(size: Expression): TumbleWithSize = new TumbleWithSize(size) +} + +/** + * Base class for Slide Window Helper classes. This class contains help methods to create Slide + * Windows. + */ +class SlideBase { + + /** + * Creates a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups + * elements of 15 minutes and evaluates every five minutes. Each element is contained in three + * consecutive window evaluations. + * + * @param size the size of the window as time or row-count interval + * @return a partially specified sliding window + */ + def over(size: String): SlideWithSize = new SlideWithSize(size) + + /** + * Creates a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups + * elements of 15 minutes and evaluates every five minutes. Each element is contained in three + * consecutive + * + * @param size the size of the window as time or row-count interval + * @return a partially specified sliding window + */ + def over(size: Expression): SlideWithSize = new SlideWithSize(size) +} + +/** + * Base class for Session Window Helper classes. This class contains help methods to create Session + * Windows. + */ +class SessionBase { + + /** + * Creates a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + * + * @param gap specifies how long (as interval of milliseconds) to wait for new data before + * closing the session window. + * @return a partially defined session window + */ + def withGap(gap: String): SessionWithGap = new SessionWithGap(gap) + + /** + * Creates a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + * + * @param gap specifies how long (as interval of milliseconds) to wait for new data before + * closing the session window. + * @return a partially defined session window + */ + def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap) +} + +/** + * Base class for Over Window Helper classes. This class contains help methods to create Over + * Windows. + */ +class OverBase { + + /** + * Specifies the time attribute on which rows are ordered. + * + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. + * + * For batch tables, refer to a timestamp or long attribute. + * + * @param orderBy field reference + * @return an over window with defined order + */ + def orderBy(orderBy: String): OverWindowPartitionedOrdered = { + new OverWindowPartitionedOrdered(Seq(), ExpressionParser.parseExpression(orderBy)) + } + + /** + * Specifies the time attribute on which rows are ordered. + * + * For streaming tables, reference a rowtime or proctime time attribute here + * to specify the time mode. + * + * For batch tables, refer to a timestamp or long attribute. + * + * @param orderBy field reference + * @return an over window with defined order + */ + def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = { + new OverWindowPartitionedOrdered(Seq(), orderBy) + } + + /** + * Partitions the elements on some partition keys. + * + * Each partition is individually sorted and aggregate functions are applied to each + * partition separately. + * + * @param partitionBy list of field references + * @return an over window with defined partitioning + */ + def partitionBy(partitionBy: String): OverWindowPartitioned = { + new OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy)) + } + + /** + * Partitions the elements on some partition keys. + * + * Each partition is individually sorted and aggregate functions are applied to each + * partition separately. + * + * @param partitionBy list of field references + * @return an over window with defined partitioning + */ + def partitionBy(partitionBy: Expression*): OverWindowPartitioned = { + new OverWindowPartitioned(partitionBy) + } +} + +/** + * Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping + * windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups + * elements in 5 minutes intervals. + */ +object Tumble extends TumbleBase + +/** + * Helper class for creating a sliding window. Sliding windows have a fixed size and slide by + * a specified slide interval. If the slide interval is smaller than the window size, sliding + * windows are overlapping. Thus, an element can be assigned to multiple windows. + * + * For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements + * of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive + * window evaluations. + */ +object Slide extends SlideBase + +/** + * Helper class for creating a session window. The boundary of session windows are defined by + * intervals of inactivity, i.e., a session window is closes if no event appears for a defined + * gap period. + */ +object Session extends SessionBase + +/** + * Helper class for creating an over window. Similar to SQL, over window aggregates compute an + * aggregate for each input row over a range of its neighboring rows. + */ +object Over extends OverBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala index 46eab77525736..5be9f25da0b02 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/GroupWindowTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api.batch.table import java.sql.Timestamp import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala index 7cf1b82424a89..15d70586010ce 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/GroupWindowValidationTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.batch.table.validation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{Session, Slide, Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala index df8f7df619373..b19a523675771 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/validation/OverWindowValidationTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.batch.table.validation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.OverAgg0 import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala index 68dad5ddccd15..3b4f363137db7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/AggregateTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala index 7be799c0eca5b..88041d55eb07b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/CalcTest.scala @@ -20,8 +20,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.expressions.WindowReference -import org.apache.flink.table.plan.logical.TumblingGroupWindow +import org.apache.flink.table.api.Tumble import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil._ import org.junit.Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala index 4cfca8f749e37..650f40d60cb39 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala index 848e19e761a0f..c41112411d266 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala @@ -21,7 +21,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.Func1 -import org.apache.flink.table.api.Table +import org.apache.flink.table.api.{Over, Table} import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala index 5e3a410ef69b4..c6ba3545b613b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.api.{Over, TableSchema, Tumble, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{TableTestBase, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSourceWithTime} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala index 0833c240c281e..63c7866b61df5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Tumble import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.java.{Tumble => JTumble} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset} import org.apache.flink.table.utils.TableTestBase @@ -143,7 +143,7 @@ class AggregateStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("50.milli").on("proctime").as("w1")) + .window(Tumble.over("50.milli").on("proctime").as("w1")) .groupBy("w1, string") .select("w1.proctime as proctime, w1.start as start, w1.end as end, string, int.count") @@ -163,7 +163,7 @@ class AggregateStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("50.milli").on("rowtime").as("w1")) + .window(Tumble.over("50.milli").on("rowtime").as("w1")) .groupBy("w1, string") .select("w1.rowtime as rowtime, string, int.count") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala index 2cb5a8a547c0f..2e05139cca949 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala @@ -19,9 +19,9 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg -import org.apache.flink.table.api.java.{Session => JSession, Slide => JSlide, Tumble => JTumble} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.utils.TableTestBase import org.junit.Test @@ -53,7 +53,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSlide.over("4.hours").every("2.hours").on("rowtime").as("w")) + .window(Slide.over("4.hours").every("2.hours").on("rowtime").as("w")) .groupBy("w, string") .select( "string, " + @@ -92,7 +92,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("4.hours").on("rowtime").as("w")) + .window(Tumble.over("4.hours").on("rowtime").as("w")) .groupBy("w, string") .select( "string, " + @@ -130,7 +130,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSession.withGap("4.hours").on("rowtime").as("w")) + .window(Session.withGap("4.hours").on("rowtime").as("w")) .groupBy("w, string") .select( "string, " + @@ -167,7 +167,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSlide.over("4.hours").every("2.hours").on("proctime").as("w")) + .window(Slide.over("4.hours").every("2.hours").on("proctime").as("w")) .groupBy("w") .select( "myCountFun(string), " + @@ -204,7 +204,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JTumble.over("4.hours").on("proctime").as("w")) + .window(Tumble.over("4.hours").on("proctime").as("w")) .groupBy("w") .select( "myCountFun(string), " + @@ -241,7 +241,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSession.withGap("4.hours").on("proctime").as("w")) + .window(Session.withGap("4.hours").on("proctime").as("w")) .groupBy("w") .select( "myCountFun(string), " + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala index 99114b8831882..f3b312bdf7e5d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala @@ -19,10 +19,10 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Over import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithRetract import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg -import org.apache.flink.table.api.java.{Over => JOver} -import org.apache.flink.table.api.scala.{Over => SOver, _} +import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.Func1 import org.apache.flink.table.utils.TableTestBase import org.junit.Test @@ -38,10 +38,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) .select('a, 'b.sum over 'w as 'cnt, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) + .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) .select("a, SUM(b) OVER w as cnt, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -56,10 +56,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .window(Over orderBy 'rowtime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w")) + .window(Over.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -74,10 +74,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 'w) + .window(Over partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a, d").orderBy("rowtime").preceding("10.rows").as("w")) + .window(Over.partitionBy("a, d").orderBy("rowtime").preceding("10.rows").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -92,10 +92,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 'w) + .window(Over orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.orderBy("rowtime").preceding("10.rows").following("current_row").as("w")) + .window(Over.orderBy("rowtime").preceding("10.rows").following("current_row").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -110,10 +110,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w")) + .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -128,15 +128,15 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) + .window(Over orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t .window( - JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w")) + Over.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") val resJava2 = t .window( - JOver.orderBy("rowtime").as("w")) + Over.orderBy("rowtime").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -152,15 +152,15 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'proctime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) + .window(Over orderBy 'proctime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t .window( - JOver.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w")) + Over.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") val resJava2 = t .window( - JOver.orderBy("proctime").as("w")) + Over.orderBy("proctime").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -176,10 +176,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes as 'w) + .window(Over partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.partitionBy("a, c").orderBy("rowtime").preceding("10.minutes").as("w")) + .window(Over.partitionBy("a, c").orderBy("rowtime").preceding("10.minutes").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -194,10 +194,10 @@ class OverWindowStringExpressionTest extends TableTestBase { util.tableEnv.registerFunction("weightAvgFun", weightAvgFun) val resScala = t - .window(SOver orderBy 'rowtime preceding 4.hours following CURRENT_RANGE as 'w) + .window(Over orderBy 'rowtime preceding 4.hours following CURRENT_RANGE as 'w) .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt) val resJava = t - .window(JOver.orderBy("rowtime").preceding("4.hours").following("current_range").as("w")) + .window(Over.orderBy("rowtime").preceding("4.hours").following("current_range").as("w")) .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt") verifyTableEquals(resScala, resJava) @@ -214,7 +214,7 @@ class OverWindowStringExpressionTest extends TableTestBase { util.addFunction("weightedAvg", weightedAvg) val resScala = t - .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) + .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) .select( array('a.sum over 'w, 'a.count over 'w), plusOne('b.sum over 'w as 'wsum) as 'd, @@ -223,7 +223,7 @@ class OverWindowStringExpressionTest extends TableTestBase { "AVG:".toExpr + (weightedAvg('a, 'b) over 'w)) val resJava = t - .window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) + .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")) .select( s""" |ARRAY(SUM(a) OVER w, COUNT(a) OVER w), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala index 6e0e15e93a8bd..03438c8b556bc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/CalcValidationTest.scala @@ -20,8 +20,8 @@ package org.apache.flink.table.api.stream.table.validation import java.math.BigDecimal import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.api.scala.{Tumble, _} +import org.apache.flink.table.api.{Tumble, ValidationException} +import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase import org.junit.Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala index 8d5943de96f74..5a507a4c7bec0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/GroupWindowValidationTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.stream.table.validation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.{Session, Slide, Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestBase diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala index 37e1526921033..350ad06dd8c16 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/OverWindowValidationTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.api.stream.table.validation import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.api.{Over, Table, Tumble, ValidationException} import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{OverAgg0, WeightedAvgWithRetract} import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala index eb9d4b2413aa4..5b4f36480e7b2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan import org.apache.calcite.rel.RelNode import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Table +import org.apache.flink.table.api.{Table, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.nodes.datastream._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.CountDistinct diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index 5bfb65d8b6ee7..644eebb23d95c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.plan import java.sql.Timestamp import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Tumble import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.TimeIntervalUnit import org.apache.flink.table.functions.{ScalarFunction, TableFunction} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala index bd2a86836064f..a57cabde69c91 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan -import org.apache.flink.table.api.Table +import org.apache.flink.table.api.{Table, Tumble} import org.apache.flink.table.plan.util.UpdatingPlanChecker import org.apache.flink.table.utils.StreamTableTestUtil import org.junit.Assert._ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala index 0b6470f6956b4..80886bc404988 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.batch.table import java.math.BigDecimal import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala index a0f6c535aac28..b152dcb0424e9 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo} import org.apache.flink.api.java.{DataSet, ExecutionEnvironment => JExecEnv} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableException, TableSchema, Types} +import org.apache.flink.table.api.{TableException, TableSchema, Tumble, Types} import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.table.runtime.utils.{CommonTestData, TableProgramsCollectionTestBase} import org.apache.flink.table.sources.BatchTableSource diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 2fb1fcd8a5fe4..a81b3ed92b655 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableSchema, Types} +import org.apache.flink.table.api.{TableSchema, Tumble, Types} import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{AtomicTimestampWithEqualWatermark, TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala index c7a71e40ee934..d9b7b50727b2e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala @@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.api.{Session, Slide, StreamQueryConfig, Tumble} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.runtime.stream.table.GroupWindowITCase._ import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithMerge, WeightedAvg, WeightedAvgWithMerge} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala index 926319ff74c7c..6341620aafba4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.api.{StreamQueryConfig, Tumble, Types} import org.apache.flink.table.expressions.Literal import org.apache.flink.table.expressions.utils.Func20 import org.apache.flink.table.functions.aggfunctions.CountAggFunction diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala index c0a95ab739c06..bb1c813848465 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.Over import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, CountDistinctWithRetractAndReset, WeightedAvg} import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0 import org.apache.flink.table.api.scala._ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 84d8d21e82a9d..8080082384fa2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableException, Types} +import org.apache.flink.table.api.{TableException, Tumble, Types} import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData} import org.apache.flink.table.sinks._ import org.apache.flink.table.utils.MemoryTableSourceSinkUtil diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index 52df3d50e093f..eb571d504a527 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableException, TableSchema, Types} +import org.apache.flink.table.api.{TableException, TableSchema, Tumble, Types} import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase} import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.table.utils._