diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala index 98ca415b9c550..aba3acb833bc9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala @@ -19,14 +19,15 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.runtime.typeutils.InternalTypeInfo -import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode} +import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin import org.apache.flink.table.planner.plan.utils.JoinUtil import org.apache.calcite.plan._ -import org.apache.calcite.rel.core.{Join, JoinRelType} +import org.apache.calcite.rel.core.{Exchange, Join, JoinRelType} import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexNode @@ -67,11 +68,11 @@ class StreamPhysicalJoin( */ def inputUniqueKeyContainsJoinKey(inputOrdinal: Int): Boolean = { val input = getInput(inputOrdinal) - val inputUniqueKeys = getCluster.getMetadataQuery.getUniqueKeys(input) + val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else joinSpec.getRightKeys + val inputUniqueKeys = getUniqueKeys(input, joinKeys) if (inputUniqueKeys != null) { - val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else joinSpec.getRightKeys inputUniqueKeys.exists { - uniqueKey => joinKeys.forall(uniqueKey.toArray.contains(_)) + uniqueKey => joinKeys.forall(uniqueKey.contains(_)) } } else { false @@ -98,21 +99,22 @@ class StreamPhysicalJoin( JoinUtil.analyzeJoinInput( InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(left.getRowType)), joinSpec.getLeftKeys, - getUniqueKeys(left))) + getUniqueKeys(left, joinSpec.getLeftKeys))) .item( "rightInputSpec", JoinUtil.analyzeJoinInput( InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(right.getRowType)), joinSpec.getRightKeys, - getUniqueKeys(right))) + getUniqueKeys(right, joinSpec.getRightKeys))) } - private def getUniqueKeys(input: RelNode): List[Array[Int]] = { - val uniqueKeys = cluster.getMetadataQuery.getUniqueKeys(input) - if (uniqueKeys == null || uniqueKeys.isEmpty) { + private def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = { + val upsertKeys = FlinkRelMetadataQuery.reuseOrCreate(cluster.getMetadataQuery) + .getUpsertKeysInKeyGroupRange(input, keys) + if (upsertKeys == null || upsertKeys.isEmpty) { List.empty } else { - uniqueKeys.map(_.asList.map(_.intValue).toArray).toList + upsertKeys.map(_.asList.map(_.intValue).toArray).toList } } @@ -125,8 +127,8 @@ class StreamPhysicalJoin( override def translateToExecNode(): ExecNode[_] = { new StreamExecJoin( joinSpec, - getUniqueKeys(left), - getUniqueKeys(right), + getUniqueKeys(left, joinSpec.getLeftKeys), + getUniqueKeys(right, joinSpec.getRightKeys), InputProperty.DEFAULT, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml index 08bbce2520a88..a7e53852e66c6 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml @@ -420,6 +420,49 @@ Calc(select=[a1]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2, a3)]]], fields=[a1, a2, a3]) +- Exchange(distribution=[hash[pk1]]) +- TableSourceScan(table=[[default_catalog, default_database, tableWithCompositePk, project=[pk1]]], fields=[pk1]) +]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala index 12f04242bbbe7..fd70df2164d7c 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala @@ -299,4 +299,38 @@ class JoinTest extends TableTestBase { |""".stripMargin) util.verifyExecPlan("SELECT A.a1 FROM A LEFT JOIN tableWithCompositePk T ON A.a1 = T.pk1") } + + @Test + def testJoinDisorderChangeLog(): Unit = { + util.tableEnv.executeSql( + """ + |CREATE TABLE src (person String, votes BIGINT) WITH( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.tableEnv.executeSql( + """ + |CREATE TABLE award (votes BIGINT, prize DOUBLE, PRIMARY KEY(votes) NOT ENFORCED) WITH( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.tableEnv.executeSql( + """ + |CREATE TABLE people (person STRING, age INT, PRIMARY KEY(person) NOT ENFORCED) WITH( + | 'connector' = 'values' + |) + |""".stripMargin) + + util.verifyExecPlan( + """ + |SELECT T1.person, T1.sum_votes, T1.prize, T2.age FROM + | (SELECT T.person, T.sum_votes, award.prize FROM + | (SELECT person, SUM(votes) AS sum_votes FROM src GROUP BY person) T, + | award + | WHERE T.sum_votes = award.votes) T1, people T2 + | WHERE T1.person = T2.person + |""".stripMargin) + } }