Skip to content

Commit

Permalink
[FLINK-17751][table-planner-blink] Fix proctime defined in ddl can no…
Browse files Browse the repository at this point in the history
…t work with over window in Table api

This closes apache#12342
  • Loading branch information
godfreyhe authored May 27, 2020
1 parent 5a9fe5d commit 0e67f18
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private static DataType convertToTimeAttributeType(TimeIndicatorTypeInfo timeInd

private static boolean canConvertToTimeAttributeTypeInfo(DataType dataType) {
return hasRoot(dataType.getLogicalType(), LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) &&
dataTypeTypeInfoMap.containsKey(dataType) && // checks precision and conversion
dataTypeTypeInfoMap.containsKey(dataType.nullable()) && // checks precision and conversion and ignore nullable
((TimestampType) dataType.getLogicalType()).getKind() != TimestampKind.REGULAR;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ Calc(select=[PROCTIME_MATERIALIZE(ptime) AS ptime, name, val, id])
+- WatermarkAssigner(rowtime=[rtime], watermark=[rtime])
+- Calc(select=[id, rtime, val, name, PROCTIME() AS ptime])
+- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, rtime, val, name])
]]>
</Resource>
</TestCase>
<TestCase name="testProcTimeTableSourceOverWindow">
<Resource name="planBefore">
<![CDATA[
LogicalFilter(condition=[>($2, 100)])
+- LogicalProject(id=[$0], name=[$2], valSum=[AS(SUM($1) OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW), _UTF-16LE'valSum')])
+- LogicalTableScan(table=[[default_catalog, default_database, procTimeT]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
Calc(select=[id, name, w0$o0 AS valSum], where=[>(w0$o0, 100)])
+- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0])
+- Exchange(distribution=[hash[id]])
+- Calc(select=[id, val, name, PROCTIME() AS $3])
+- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.junit.{Ignore, Test}

class TableSourceTest extends TableTestBase {

val util = streamTestUtil()
private val util = streamTestUtil()

@Test
def testTableSourceWithTimestampRowTimeField(): Unit = {
Expand Down Expand Up @@ -95,7 +95,6 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan(t)
}

@Ignore("remove ignore once FLINK-17751 is fixed")
@Test
def testProcTimeTableSourceOverWindow(): Unit = {
val ddl =
Expand Down

0 comments on commit 0e67f18

Please sign in to comment.