From fcaa4f77e0b3253ea902fbfad0bc1b2046ff814d Mon Sep 17 00:00:00 2001 From: xuyang Date: Tue, 23 Aug 2022 18:03:26 +0800 Subject: [PATCH] [FLINK-29081][table-planner] Capitalize join hints to avoid case sensitive This closes #20669 --- .../calcite/sql2rel/SqlToRelConverter.java | 3 ++ .../flink/table/planner/hint/FlinkHints.java | 40 +++++++++++++++++++ .../plan/hints/batch/JoinHintTestBase.java | 22 ++++++++++ .../hints/batch/BroadcastJoinHintTest.xml | 21 ++++++++++ .../plan/hints/batch/NestLoopJoinHintTest.xml | 21 ++++++++++ .../hints/batch/ShuffleHashJoinHintTest.xml | 22 ++++++++++ .../hints/batch/ShuffleMergeJoinHintTest.xml | 22 ++++++++++ .../ClearQueryBlockAliasResolverTest.xml | 13 ++++++ .../plan/optimize/JoinHintResolverTest.xml | 13 ++++++ 9 files changed, 177 insertions(+) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index 41b1f266472c6..908f788cfa035 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -603,6 +603,9 @@ public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final // ----- FLINK MODIFICATION BEGIN ----- + // replace all join hints with upper case + result = FlinkHints.capitalizeJoinHints(result); + // clear join hints which are propagated into wrong query block // The hint QueryBlockAlias will be added when building a RelNode tree before. It is used to // distinguish the query block in the SQL. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java index 815237aae0c39..7463ac228930d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java @@ -24,9 +24,11 @@ import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttleImpl; import org.apache.calcite.rel.hint.Hintable; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSnapshot; import org.apache.commons.lang3.StringUtils; @@ -34,8 +36,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** Utility class for Flink hints. */ @@ -178,4 +182,40 @@ public static List getQueryBlockAliasHints(List allHints) { .filter(hint -> hint.hintName.equals(FlinkHints.HINT_ALIAS)) .collect(Collectors.toList()); } + + public static RelNode capitalizeJoinHints(RelNode root) { + return root.accept(new CapitalizeJoinHintShuttle()); + } + + private static class CapitalizeJoinHintShuttle extends RelShuttleImpl { + + @Override + public RelNode visit(LogicalJoin join) { + List hints = join.getHints(); + AtomicBoolean changed = new AtomicBoolean(false); + List hintsWithCapitalJoinHints = + hints.stream() + .map( + hint -> { + String capitalHintName = + hint.hintName.toUpperCase(Locale.ROOT); + if (JoinStrategy.isJoinStrategy(capitalHintName)) { + changed.set(true); + return RelHint.builder(capitalHintName) + .hintOptions(hint.listOptions) + .inheritPath(hint.inheritPath) + .build(); + } else { + return hint; + } + }) + .collect(Collectors.toList()); + + if (changed.get()) { + return super.visit(join.withHints(hintsWithCapitalJoinHints)); + } else { + return super.visit(join); + } + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java index 25bc946c70d92..4af17d5d4fa4a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java @@ -708,6 +708,13 @@ public void testJoinHintWithSubStringViewName2() { verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint())); } + @Test + public void testJoinHintWithoutCaseSensitive() { + String sql = "select /*+ %s(T1) */* from T1 join T2 on T1.a1 = T2.a2"; + + verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint()))); + } + protected String buildAstPlanWithQueryBlockAlias(List relNodes) { StringBuilder astBuilder = new StringBuilder(); relNodes.forEach( @@ -725,4 +732,19 @@ protected String buildAstPlanWithQueryBlockAlias(List relNodes) { true))); return astBuilder.toString(); } + + private String buildCaseSensitiveStr(String str) { + char[] chars = str.toCharArray(); + + for (int i = 0; i < chars.length; i++) { + boolean needCapitalize = i % 2 == 0; + if (needCapitalize) { + chars[i] = Character.toUpperCase(chars[i]); + } else { + chars[i] = Character.toLowerCase(chars[i]); + } + } + + return new String(chars); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml index 9b0a7acf5796b..1915709ae2943 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml @@ -895,6 +895,27 @@ Calc(select=[a1, b1]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1]) +- Exchange(distribution=[hash[a2]]) +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2]) +]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml index 1fc76daeff798..36f873fa27f04 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml @@ -892,6 +892,27 @@ Calc(select=[a1, b1]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1]) +- Exchange(distribution=[hash[a2]]) +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2]) +]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml index b695c77a5558e..dd016f80566a0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml @@ -924,6 +924,28 @@ Calc(select=[a1, b1]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1]) +- Exchange(distribution=[hash[a2]]) +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2]) +]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml index d921282af68f9..e5a1a851edd60 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml @@ -924,6 +924,28 @@ Calc(select=[a1, b1]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1]) +- Exchange(distribution=[hash[a2]]) +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2]) +]]> + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml index 6408c6f0b5495..b9b59d4215a4b 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml @@ -507,6 +507,19 @@ LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483 +- LogicalJoin(condition=[=($0, $2)], joinType=[inner]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)] :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)] +]]> + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml index e75c03efacca9..0da1883edcd59 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml @@ -507,6 +507,19 @@ LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483 +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], hints=[[[ALIAS inheritPath:[0] options:[V2]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)] :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)] +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)] +]]> + + + + + + + +