Skip to content

Commit

Permalink
[FLINK-28682][table-planner] Support join hint in batch rules
Browse files Browse the repository at this point in the history
This closes apache#20359
  • Loading branch information
xuyangzhong authored and godfreyhe committed Aug 8, 2022
1 parent 53f1a66 commit f794d72
Show file tree
Hide file tree
Showing 43 changed files with 9,123 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.calcite.sql2rel;

import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -203,6 +205,15 @@ public static RelNode decorrelateQuery(RelNode rootRel, RelBuilder relBuilder) {
// Re-propagate the hints.
newRootRel = RelOptUtil.propagateRelHints(newRootRel, true);

// ----- FLINK MODIFICATION BEGIN -----

// clear join hints which are propagated into wrong query block
// The hint QueryBlockAlias will be added when building a RelNode tree before. It is used to
// distinguish the query block in the SQL.
newRootRel = newRootRel.accept(new ClearJoinHintWithInvalidPropagationShuttle());

// ----- FLINK MODIFICATION END -----

return newRootRel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.calcite.sql2rel;

import org.apache.flink.table.planner.alias.SubQueryAliasNodeClearShuttle;
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSubQueryAlias;
import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;
import org.apache.flink.table.planner.hint.FlinkHints;

import org.apache.calcite.avatica.util.Spaces;
import org.apache.calcite.linq4j.Ord;
Expand Down Expand Up @@ -598,8 +598,18 @@ public RelRoot convertQuery(SqlNode query, final boolean needsValidation, final
hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
}
}
// propagate the hints.

result = RelOptUtil.propagateRelHints(result, false);

// ----- FLINK MODIFICATION BEGIN -----

// clear join hints which are propagated into wrong query block
// The hint QueryBlockAlias will be added when building a RelNode tree before. It is used to
// distinguish the query block in the SQL.
result = result.accept(new ClearJoinHintWithInvalidPropagationShuttle());

// ----- FLINK MODIFICATION END -----

return RelRoot.of(result, validatedRowType, query.getKind())
.withCollation(collation)
.withHints(hints);
Expand Down Expand Up @@ -2025,6 +2035,25 @@ protected void convertFrom(Blackboard bb, SqlNode from) {
convertFrom(bb, from, Collections.emptyList());
}

// ----- FLINK MODIFICATION BEGIN -----

private boolean containsJoinHint = false;

/**
* To tell this converter that this SqlNode tree contains join hint and then a query block alias
* will be attached to the root node of the query block.
*
* <p>The `containsJoinHint` is false default to be compatible with previous behavior and then
* planner can reuse some node.
*
* <p>TODO At present, it is a relatively hacked way
*/
public void containsJoinHint() {
containsJoinHint = true;
}

// ----- FLINK MODIFICATION END -----

/**
* Converts a FROM clause into a relational expression.
*
Expand Down Expand Up @@ -2061,6 +2090,36 @@ protected void convertFrom(Blackboard bb, SqlNode from, List<String> fieldNames)
}
}
convertFrom(bb, firstOperand, fieldNameList);

// ----- FLINK MODIFICATION BEGIN -----

// Add a query-block alias hint to distinguish different query levels
// Due to Calcite will expand the whole SQL Rel Node tree that contains query block,
// but sometimes the query block should be perceived such as join hint propagation.
// TODO add query-block alias hint in SqlNode instead of here
if (containsJoinHint) {
RelNode root = bb.root;

if (root instanceof Hintable) {
RelHint queryBlockAliasHint =
RelHint.builder(FlinkHints.HINT_ALIAS)
.hintOption(call.operand(1).toString())
.build();
RelNode newRoot =
((Hintable) root)
.attachHints(
Collections.singletonList(queryBlockAliasHint));
boolean isLeaf = leaves.containsKey(root);
if (isLeaf) {
// remove old root node
leaves.remove(root);
}

bb.setRoot(newRoot, isLeaf);
}
}

// ----- FLINK MODIFICATION END -----
return;

case MATCH_RECOGNIZE:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.alias;

import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.hint.JoinStrategy;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalJoin;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Due to Calcite will expand the whole SQL RelNode tree that contains query block, join hints will
* be propagated from root to leaves in the whole RelNode tree. This shuttle is used to clear the
* join hints that are propagated into the query block incorrectly.
*
* <p>See more at {@see org.apache.calcite.sql2rel.SqlToRelConverter#convertFrom()}.
*
* <p>TODO some node will be attached join hints when parse SqlNode to RelNode such as Project and
* etc. The join hints on these node can also be cleared.
*/
public class ClearJoinHintWithInvalidPropagationShuttle extends RelShuttleImpl {

@Override
public RelNode visit(LogicalJoin join) {
List<RelHint> hints = join.getHints();

Set<String> allHintNames =
hints.stream().map(hint -> hint.hintName).collect(Collectors.toSet());

// there are no join hints on this Join node
if (allHintNames.stream().noneMatch(JoinStrategy::isJoinStrategy)) {
return super.visit(join);
}

Optional<RelHint> firstAliasHint =
hints.stream()
.filter(hint -> FlinkHints.HINT_ALIAS.equals(hint.hintName))
.findFirst();

// there are no alias hints on this Join node
if (!firstAliasHint.isPresent()) {
return super.visit(join);
}

List<RelHint> joinHintsFromOuterQueryBlock =
hints.stream()
.filter(
hint ->
JoinStrategy.isJoinStrategy(hint.hintName)
// if the size of inheritPath is bigger than 0, it
// means that this join hint is propagated from its
// parent
&& hint.inheritPath.size()
> firstAliasHint.get().inheritPath.size())
.collect(Collectors.toList());

if (joinHintsFromOuterQueryBlock.isEmpty()) {
return super.visit(join);
}

RelNode newJoin = join;
ClearOuterJoinHintShuttle clearOuterJoinHintShuttle;

for (RelHint outerJoinHint : joinHintsFromOuterQueryBlock) {
clearOuterJoinHintShuttle = new ClearOuterJoinHintShuttle(outerJoinHint);
newJoin = newJoin.accept(clearOuterJoinHintShuttle);
}

return super.visit(newJoin);
}

/**
* A shuttle to clean the join hints which are in outer query block and should not affect the
* query-block inside.
*/
private static class ClearOuterJoinHintShuttle extends RelShuttleImpl {
// the current inheritPath about the join hint that need be removed
private final Deque<Integer> currentInheritPath;

// the join hint that need be removed
private final RelHint joinHintNeedRemove;

public ClearOuterJoinHintShuttle(RelHint joinHintNeedRemove) {
this.joinHintNeedRemove = joinHintNeedRemove;
this.currentInheritPath = new ArrayDeque<>();
this.currentInheritPath.addAll(joinHintNeedRemove.inheritPath);
}

@Override
protected RelNode visitChild(RelNode parent, int i, RelNode child) {
currentInheritPath.addLast(i);
RelNode newNode = super.visitChild(parent, i, child);
currentInheritPath.removeLast();
return newNode;
}

@Override
public RelNode visit(LogicalJoin join) {
List<RelHint> hints = new ArrayList<>(join.getHints());
Optional<RelHint> invalidJoinHint = getInvalidJoinHint(hints);

// if this join node contains the join hint that needs to be removed
if (invalidJoinHint.isPresent()) {
hints.remove(invalidJoinHint.get());
return super.visit(join.withHints(hints));
}

return super.visit(join);
}

/**
* Get the invalid join hint in this node.
*
* <p>The invalid join meets the following requirement:
*
* <p>1. This hint name is same with the join hint that needs to be removed
*
* <p>2.The length of this hint should be same with the length of propagating this removed
* join hint.
*
* <p>3. The inherited path of this hint should match the inherited path of this removed
* join hint.
*
* @param hints all hints
* @return return the invalid join hint if exists, else return empty
*/
private Optional<RelHint> getInvalidJoinHint(List<RelHint> hints) {
for (RelHint hint : hints) {
if (hint.hintName.equals(joinHintNeedRemove.hintName)
&& isMatchInvalidInheritPath(
new ArrayList<>(currentInheritPath), hint.inheritPath)) {
return Optional.of(hint);
}
}
return Optional.empty();
}

private boolean isMatchInvalidInheritPath(
List<Integer> invalidInheritPath, List<Integer> checkedInheritPath) {
if (invalidInheritPath.size() != checkedInheritPath.size()) {
return false;
}

for (int i = 0; i < invalidInheritPath.size(); i++) {
if (!Objects.equals(invalidInheritPath.get(i), checkedInheritPath.get(i))) {
return false;
}
}
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ public RexNode visitInputRef(RexInputRef inputRef) {
}
}
});
return FlinkLogicalJoin.create(newLeft, newRight, newCondition, join.getJoinType());
return FlinkLogicalJoin.create(
newLeft, newRight, newCondition, join.getHints(), join.getJoinType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;

import org.apache.calcite.rel.hint.HintOptionChecker;
import org.apache.calcite.rel.hint.HintPredicates;
import org.apache.calcite.rel.hint.HintStrategy;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.util.Litmus;

import java.util.Collections;

/** A collection of Flink style {@link HintStrategy}s. */
public abstract class FlinkHintStrategies {

Expand All @@ -51,6 +54,57 @@ public static HintStrategyTable createHintStrategyTable() {
HintStrategy.builder(HintPredicates.AGGREGATE)
.excludedRules(WrapJsonAggFunctionArgumentsRule.INSTANCE)
.build())
// internal join hint used for alias
.hintStrategy(
FlinkHints.HINT_ALIAS,
// currently, only join hints care about query block alias
HintStrategy.builder(HintPredicates.JOIN)
.optionChecker(fixedSizeListOptionChecker(1))
.build())
// TODO semi/anti join with CORRELATE is not supported
.hintStrategy(
JoinStrategy.BROADCAST.getJoinHintName(),
HintStrategy.builder(HintPredicates.JOIN)
.optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
.build())
.hintStrategy(
JoinStrategy.SHUFFLE_HASH.getJoinHintName(),
HintStrategy.builder(HintPredicates.JOIN)
.optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
.build())
.hintStrategy(
JoinStrategy.SHUFFLE_MERGE.getJoinHintName(),
HintStrategy.builder(HintPredicates.JOIN)
.optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
.build())
.hintStrategy(
JoinStrategy.NEST_LOOP.getJoinHintName(),
HintStrategy.builder(HintPredicates.JOIN)
.optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
.build())
.build();
}

private static HintOptionChecker fixedSizeListOptionChecker(int size) {
return (hint, errorHandler) ->
errorHandler.check(
hint.listOptions.size() == size,
"Invalid hint: {}, expecting {} table or view {} "
+ "specified in hint {}.",
FlinkHints.stringifyHints(Collections.singletonList(hint)),
size,
size > 1 ? "names" : "name",
hint.hintName);
}

// ~ hint option checker ------------------------------------------------------------

private static final HintOptionChecker NON_EMPTY_LIST_OPTION_CHECKER =
(hint, errorHandler) ->
errorHandler.check(
hint.listOptions.size() > 0,
"Invalid hint: {}, expecting at least "
+ "one table or view specified in hint {}.",
FlinkHints.stringifyHints(Collections.singletonList(hint)),
hint.hintName);
}
Loading

0 comments on commit f794d72

Please sign in to comment.