Skip to content

Commit

Permalink
[FLINK-23919][table-planner] Fix field name conflict bug in WindowUtil
Browse files Browse the repository at this point in the history
This closes apache#17604
  • Loading branch information
beyond1920 authored and godfreyhe committed Nov 1, 2021
1 parent 3b163c7 commit 003df21
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.logical._
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED}
import org.apache.flink.table.planner.typeutils.RowTypeUtils
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
import org.apache.flink.table.types.logical.TimestampType
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType
Expand All @@ -43,6 +44,7 @@ import java.time.Duration
import java.util.Collections

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
Expand Down Expand Up @@ -119,18 +121,19 @@ object WindowUtil {
var containsTimeAttribute = false
var newTimeAttributeIndex = -1
val calcFieldShifting = ArrayBuffer[Int]()

val visitedProjectNames = new mutable.ArrayBuffer[String]
oldProgram.getNamedProjects.foreach { namedProject =>
val expr = oldProgram.expandLocalRef(namedProject.left)
val name = namedProject.right
val uniqueName = RowTypeUtils.getUniqueName(namedProject.right, visitedProjectNames)
// project columns except window columns
expr match {
case inputRef: RexInputRef if windowColumns.contains(inputRef.getIndex) =>
calcFieldShifting += -1

case _ =>
try {
programBuilder.addProject(expr, name)
programBuilder.addProject(expr, uniqueName)
visitedProjectNames += uniqueName
} catch {
case e: Throwable =>
e.printStackTrace()
Expand All @@ -149,9 +152,11 @@ object WindowUtil {

// append time attribute if the calc doesn't refer it
if (!containsTimeAttribute) {
val oldTimeAttributeFieldName = inputRowType.getFieldNames.get(inputTimeAttributeIndex)
val uniqueName = RowTypeUtils.getUniqueName(oldTimeAttributeFieldName, visitedProjectNames)
programBuilder.addProject(
inputTimeAttributeIndex,
inputRowType.getFieldNames.get(inputTimeAttributeIndex))
uniqueName)
newTimeAttributeIndex = programBuilder.getProjectList.size() - 1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,73 @@ Calc(select=[a, b, uv])
+- Expand(projects=[{a, b, c, 0 AS $e, rowtime}, {a, null AS b, c, 4 AS $e, rowtime}, {null AS a, null AS b, c, 12 AS $e, rowtime}])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testFieldNameConflict[aggPhaseEnforcer=ONE_PHASE]">
<Resource name="sql">
<![CDATA[
SELECT window_time,
MIN(rowtime) as start_time,
MAX(rowtime) as end_time
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
GROUP BY window_start, window_end, window_time
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(window_time=[$2], start_time=[$3], end_time=[$4])
+- LogicalAggregate(group=[{0, 1, 2}], start_time=[MIN($3)], end_time=[MAX($3)])
+- LogicalProject(window_start=[$7], window_end=[$8], window_time=[$9], rowtime=[$5])
+- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[window_time, start_time, end_time])
+- WindowAggregate(window=[TUMBLE(time_col=[rowtime_0], size=[15 min])], select=[MIN(rowtime) AS start_time, MAX(rowtime) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+- Exchange(distribution=[single])
+- Calc(select=[CAST(rowtime) AS rowtime, rowtime AS rowtime_0])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testFieldNameConflict[aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
<![CDATA[
SELECT window_time,
MIN(rowtime) as start_time,
MAX(rowtime) as end_time
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
GROUP BY window_start, window_end, window_time
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(window_time=[$2], start_time=[$3], end_time=[$4])
+- LogicalAggregate(group=[{0, 1, 2}], start_time=[MIN($3)], end_time=[MAX($3)])
+- LogicalProject(window_start=[$7], window_end=[$8], window_time=[$9], rowtime=[$5])
+- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[window_time, start_time, end_time])
+- GlobalWindowAggregate(window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[MIN(min$0) AS start_time, MAX(max$1) AS end_time, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+- Exchange(distribution=[single])
+- LocalWindowAggregate(window=[TUMBLE(time_col=[rowtime_0], size=[15 min])], select=[MIN(rowtime) AS min$0, MAX(rowtime) AS max$1, slice_end('w$) AS $slice_end])
+- Calc(select=[CAST(rowtime) AS rowtime, rowtime AS rowtime_0])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,19 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testFieldNameConflict(): Unit = {
val sql =
"""
|SELECT window_time,
| MIN(rowtime) as start_time,
| MAX(rowtime) as end_time
|FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
|GROUP BY window_start, window_end, window_time
""".stripMargin
util.verifyRelPlan(sql)
}
}

object WindowAggregateTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,39 @@ class WindowAggregateITCase(
CumulateWindowRollupExpectedData.sorted.mkString("\n"),
sink.getAppendResults.sorted.mkString("\n"))
}

@Test
def testFieldNameConflict(): Unit = {
val sql =
"""
|SELECT
| window_time,
| MIN(rowtime) as start_time,
| MAX(rowtime) as end_time
|FROM TABLE(
| TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
|GROUP BY window_start, window_end, window_time
""".stripMargin

val sink = new TestingAppendSink
tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
env.execute()

val expected = if (useTimestampLtz) {
Seq(
"2020-10-09T16:00:04.999Z,2020-10-09T16:00:01Z,2020-10-09T16:00:04Z",
"2020-10-09T16:00:09.999Z,2020-10-09T16:00:06Z,2020-10-09T16:00:08Z",
"2020-10-09T16:00:19.999Z,2020-10-09T16:00:16Z,2020-10-09T16:00:16Z",
"2020-10-09T16:00:34.999Z,2020-10-09T16:00:32Z,2020-10-09T16:00:34Z")
} else {
Seq(
"2020-10-10T00:00:04.999,2020-10-10T00:00:01,2020-10-10T00:00:04",
"2020-10-10T00:00:09.999,2020-10-10T00:00:06,2020-10-10T00:00:08",
"2020-10-10T00:00:19.999,2020-10-10T00:00:16,2020-10-10T00:00:16",
"2020-10-10T00:00:34.999,2020-10-10T00:00:32,2020-10-10T00:00:34")
}
assertEquals(expected.sorted.mkString("\n"), sink.getAppendResults.sorted.mkString("\n"))
}
}

object WindowAggregateITCase {
Expand Down

0 comments on commit 003df21

Please sign in to comment.