Skip to content

Commit

Permalink
[FLINK-33314][table] Fix the named parameter example in window tvf
Browse files Browse the repository at this point in the history
This closes apache#23549
  • Loading branch information
lincoln-lil authored Oct 23, 2023
1 parent 6cdfca2 commit 3596a94
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 4 deletions.
4 changes: 2 additions & 2 deletions docs/content.zh/docs/dev/table/sql/queries/window-tvf.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
-- note: the DATA param must be the first and `OFFSET` should be wrapped with double quotes
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
`OFFSET` => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/dev/table/sql/queries/window-tvf.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ We show an example to describe how to use offset in Tumble window in the followi
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
-- note: the DATA param must be the first and `OFFSET` should be wrapped with double quotes
Flink SQL> SELECT * FROM TABLE(
TUMBLE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE => INTERVAL '10' MINUTES,
OFFSET => INTERVAL '1' MINUTES));
`OFFSET` => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,37 @@ Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, wi
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testHopTVFWithNamedParams">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(TUMBLE(
DATA => TABLE MyTable,
TIMECOL => DESCRIPTOR(rowtime),
SIZE => INTERVAL '15' MINUTE,
`OFFSET` => INTERVAL '5' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL MINUTE, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min], offset=[5 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -335,6 +366,37 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], wind
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min], offset=[5 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testTumbleTVFWithNamedParams">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(TUMBLE(
DATA => TABLE MyTable,
TIMECOL => DESCRIPTOR(rowtime),
SIZE => INTERVAL '15' MINUTE,
`OFFSET` => INTERVAL '5' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($4), 900000:INTERVAL MINUTE, 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min], offset=[5 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ class WindowTableFunctionTest extends TableTestBase {
util.verifyRelPlan(sql)
}

@Test
def testTumbleTVFWithNamedParams(): Unit = {
val sql =
"""
|SELECT *
|FROM TABLE(TUMBLE(
| DATA => TABLE MyTable,
| TIMECOL => DESCRIPTOR(rowtime),
| SIZE => INTERVAL '15' MINUTE,
| `OFFSET` => INTERVAL '5' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testHopTVFWithOffset(): Unit = {
val sql =
Expand Down Expand Up @@ -199,6 +213,20 @@ class WindowTableFunctionTest extends TableTestBase {
util.verifyRelPlan(sql)
}

@Test
def testHopTVFWithNamedParams(): Unit = {
val sql =
"""
|SELECT *
|FROM TABLE(TUMBLE(
| DATA => TABLE MyTable,
| TIMECOL => DESCRIPTOR(rowtime),
| SIZE => INTERVAL '15' MINUTE,
| `OFFSET` => INTERVAL '5' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testCumulateTVFWithOffset(): Unit = {
val sql =
Expand Down Expand Up @@ -230,4 +258,5 @@ class WindowTableFunctionTest extends TableTestBase {
|""".stripMargin
util.verifyRelPlan(sql)
}

}

0 comments on commit 3596a94

Please sign in to comment.