Skip to content

Commit

Permalink
[FLINK-29081][table-planner] Capitalize join hints to avoid case sens…
Browse files Browse the repository at this point in the history
…itive

This closes apache#20669
  • Loading branch information
xuyangzhong authored and godfreyhe committed Aug 25, 2022
1 parent 58c4be4 commit fcaa4f7
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,9 @@ public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final

// ----- FLINK MODIFICATION BEGIN -----

// replace all join hints with upper case
result = FlinkHints.capitalizeJoinHints(result);

// clear join hints which are propagated into wrong query block
// The hint QueryBlockAlias will be added when building a RelNode tree before. It is used to
// distinguish the query block in the SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSnapshot;
import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/** Utility class for Flink hints. */
Expand Down Expand Up @@ -178,4 +182,40 @@ public static List<RelHint> getQueryBlockAliasHints(List<RelHint> allHints) {
.filter(hint -> hint.hintName.equals(FlinkHints.HINT_ALIAS))
.collect(Collectors.toList());
}

public static RelNode capitalizeJoinHints(RelNode root) {
return root.accept(new CapitalizeJoinHintShuttle());
}

private static class CapitalizeJoinHintShuttle extends RelShuttleImpl {

@Override
public RelNode visit(LogicalJoin join) {
List<RelHint> hints = join.getHints();
AtomicBoolean changed = new AtomicBoolean(false);
List<RelHint> hintsWithCapitalJoinHints =
hints.stream()
.map(
hint -> {
String capitalHintName =
hint.hintName.toUpperCase(Locale.ROOT);
if (JoinStrategy.isJoinStrategy(capitalHintName)) {
changed.set(true);
return RelHint.builder(capitalHintName)
.hintOptions(hint.listOptions)
.inheritPath(hint.inheritPath)
.build();
} else {
return hint;
}
})
.collect(Collectors.toList());

if (changed.get()) {
return super.visit(join.withHints(hintsWithCapitalJoinHints));
} else {
return super.visit(join);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,13 @@ public void testJoinHintWithSubStringViewName2() {
verifyRelPlanByCustom(String.format(sql, getTestSingleJoinHint()));
}

@Test
public void testJoinHintWithoutCaseSensitive() {
String sql = "select /*+ %s(T1) */* from T1 join T2 on T1.a1 = T2.a2";

verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint())));
}

protected String buildAstPlanWithQueryBlockAlias(List<RelNode> relNodes) {
StringBuilder astBuilder = new StringBuilder();
relNodes.forEach(
Expand All @@ -725,4 +732,19 @@ protected String buildAstPlanWithQueryBlockAlias(List<RelNode> relNodes) {
true)));
return astBuilder.toString();
}

private String buildCaseSensitiveStr(String str) {
char[] chars = str.toCharArray();

for (int i = 0; i < chars.length; i++) {
boolean needCapitalize = i % 2 == 0;
if (needCapitalize) {
chars[i] = Character.toUpperCase(chars[i]);
} else {
chars[i] = Character.toLowerCase(chars[i]);
}
}

return new String(chars);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,27 @@ Calc(select=[a1, b1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2])
]]>
</Resource>
</TestCase>
<TestCase name="testJoinHintWithoutCaseSensitive">
<Resource name="sql">
<![CDATA[select /*+ BrOaDcAsT(T1) */* from T1 join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], isBroadcast=[true], build=[left])
:- Exchange(distribution=[broadcast])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,27 @@ Calc(select=[a1, b1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2])
]]>
</Resource>
</TestCase>
<TestCase name="testJoinHintWithoutCaseSensitive">
<Resource name="sql">
<![CDATA[select /*+ NeSt_lOoP(T1) */* from T1 join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], build=[left])
:- Exchange(distribution=[broadcast])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,28 @@ Calc(select=[a1, b1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2])
]]>
</Resource>
</TestCase>
<TestCase name="testJoinHintWithoutCaseSensitive">
<Resource name="sql">
<![CDATA[select /*+ ShUfFlE_HaSh(T1) */* from T1 join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], build=[left])
:- Exchange(distribution=[hash[a1]])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,28 @@ Calc(select=[a1, b1])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2])
]]>
</Resource>
</TestCase>
<TestCase name="testJoinHintWithoutCaseSensitive">
<Resource name="sql">
<![CDATA[select /*+ ShUfFlE_MeRgE(T1) */* from T1 join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+- LogicalTableScan(table=[[default_catalog, default_database, T2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
SortMergeJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2])
:- Exchange(distribution=[hash[a1]])
: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,19 @@ LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
]]>
</Resource>
</TestCase>
<TestCase name="testJoinHintWithoutCaseSensitive">
<Resource name="sql">
<![CDATA[select /*+ BrOaDcAsT(T1) */* from T1 join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,19 @@ LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], hints=[[[ALIAS inheritPath:[0] options:[V2]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
]]>
</Resource>
</TestCase>
<TestCase name="testJoinHintWithoutCaseSensitive">
<Resource name="sql">
<![CDATA[select /*+ BrOaDcAsT(T1) */* from T1 join T2 on T1.a1 = T2.a2]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
:- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit fcaa4f7

Please sign in to comment.