Skip to content

Commit

Permalink
HIVE-16610: Semijoin Hint : Should be able to handle more than one hi…
Browse files Browse the repository at this point in the history
…nt per alias (Deepak Jaiswal, reviewed by Jason Dere)
  • Loading branch information
Jason Dere committed May 11, 2017
1 parent fd6f8da commit ee91b8e
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 263 deletions.
2 changes: 0 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2907,8 +2907,6 @@ public static enum ConfVars {
"Big table for runtime filteting should be of atleast this size"),
TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD("hive.tez.dynamic.semijoin.reduction.threshold", (float) 0.50,
"Only perform semijoin optimization if the estimated benefit at or above this fraction of the target table"),
TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY("hive.tez.dynamic.semijoin.reduction.hint.only", false,
"When true, only enforce semijoin when a hint is provided"),
TEZ_SMB_NUMBER_WAVES(
"hive.tez.smb.number.waves",
(float) 0.5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Stack;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.*;
Expand Down Expand Up @@ -212,36 +213,38 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Obje
if (semiJoin && ts.getConf().getFilterExpr() != null) {
LOG.debug("Initiate semijoin reduction for " + column + " ("
+ ts.getConf().getFilterExpr().getExprString());
// Get the table name from which the min-max values and bloom filter will come.
Operator<?> op = ctx.generator;

while (!(op == null || op instanceof TableScanOperator)) {
op = op.getParentOperators().get(0);
}
String tableAlias = (op == null ? "" : ((TableScanOperator) op).getConf().getAlias());

StringBuilder internalColNameBuilder = new StringBuilder();
StringBuilder colNameBuilder = new StringBuilder();
if (getColumnName(ctx, internalColNameBuilder, colNameBuilder)) {

// Apply best effort to fetch the correct table alias. If not
// found, fallback to old logic.
StringBuilder tabAliasBuilder = new StringBuilder();
if (getColumnInfo(ctx, internalColNameBuilder, colNameBuilder, tabAliasBuilder)) {
String colName = colNameBuilder.toString();
String tableAlias;
if (tabAliasBuilder.length() > 0) {
tableAlias = tabAliasBuilder.toString();
} else {
Operator<?> op = ctx.generator;

while (!(op == null || op instanceof TableScanOperator)) {
op = op.getParentOperators().get(0);
}
tableAlias = (op == null ? "" : ((TableScanOperator) op).
getConf().getAlias());
}

// Use the tableAlias to generate keyBaseAlias
keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias
+ "_" + colName;
Map<String, SemiJoinHint> hints = parseContext.getSemiJoinHints();
Map<String, List<SemiJoinHint>> hints = parseContext.getSemiJoinHints();
if (hints != null) {
if (hints.size() > 0) {
SemiJoinHint sjHint = hints.get(tableAlias);
if (sjHint != null && sjHint.getColName() != null &&
!colName.equals(sjHint.getColName())) {
LOG.debug("Removed hint due to column mismatch + Col = " + colName + " hint column = " + sjHint.getColName());
sjHint = null;
}
semiJoinAttempted = generateSemiJoinOperatorPlan(
ctx, parseContext, ts, keyBaseAlias,
internalColNameBuilder.toString(), colName, sjHint);
if (!semiJoinAttempted && sjHint != null) {
throw new SemanticException("The user hint to enforce semijoin failed required conditions");
}
}
// Create semijoin optimizations ONLY for hinted columns
semiJoinAttempted = processSemiJoinHints(
parseContext, ctx, hints, tableAlias,
internalColNameBuilder.toString(), colName, ts,
keyBaseAlias);
} else {
// fallback to regular logic
semiJoinAttempted = generateSemiJoinOperatorPlan(
Expand Down Expand Up @@ -297,16 +300,30 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Obje
}

// Given a key, find the corresponding column name.
private boolean getColumnName(DynamicListContext ctx, StringBuilder internalColName,
StringBuilder colName) {
private boolean getColumnInfo(DynamicListContext ctx, StringBuilder internalColName,
StringBuilder colName, StringBuilder tabAlias) {
ExprNodeDesc exprNodeDesc = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex());
ExprNodeColumnDesc colExpr = ExprNodeDescUtils.getColumnExpr(exprNodeDesc);

if (colExpr == null) {
return false;
}

internalColName.append(colExpr.getColumn());

// fetch table ablias
ExprNodeDescUtils.ColumnOrigin columnOrigin =
ExprNodeDescUtils.findColumnOrigin(exprNodeDesc, ctx.generator);

if (columnOrigin != null) {
// get both tableAlias and column name from columnOrigin
assert columnOrigin.op instanceof TableScanOperator;
TableScanOperator ts = (TableScanOperator) columnOrigin.op;
tabAlias.append(ts.getConf().getAlias());
colName.append(
ExprNodeDescUtils.getColumnExpr(columnOrigin.col).getColumn());
return true;
}

Operator<? extends OperatorDesc> parentOfRS = ctx.generator.getParentOperators().get(0);
if (!(parentOfRS instanceof SelectOperator)) {
colName.append(internalColName.toString());
Expand All @@ -324,6 +341,37 @@ private boolean getColumnName(DynamicListContext ctx, StringBuilder internalColN
return true;
}

// Handle hint based semijoin
private boolean processSemiJoinHints(
ParseContext pCtx, DynamicListContext ctx,
Map<String, List<SemiJoinHint>> hints, String tableAlias,
String internalColName, String colName, TableScanOperator ts,
String keyBaseAlias) throws SemanticException {
if (hints.size() == 0) {
return false;
}

List<SemiJoinHint> hintList = hints.get(tableAlias);
if (hintList == null) {
return false;
}

// Iterate through the list
for (SemiJoinHint sjHint : hintList) {
if (!colName.equals(sjHint.getColName())) {
continue;
}
// match!
LOG.info("Creating runtime filter due to user hint: column = " + colName);
if (generateSemiJoinOperatorPlan(ctx, pCtx, ts, keyBaseAlias,
internalColName, colName, sjHint)) {
return true;
}
throw new SemanticException("The user hint to enforce semijoin failed required conditions");
}
return false;
}

private void replaceExprNode(DynamicListContext ctx, FilterDesc desc, ExprNodeDesc node) {
if (ctx.grandParent == null) {
desc.setPredicate(node);
Expand Down Expand Up @@ -442,12 +490,6 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
TableScanOperator ts, String keyBaseAlias, String internalColName,
String colName, SemiJoinHint sjHint) throws SemanticException {

// If semijoin hint is enforced, make sure hint is provided
if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
&& sjHint == null) {
return false;
}

// we will put a fork in the plan at the source of the reduce sink
Operator<? extends OperatorDesc> parentOfRS = ctx.generator.getParentOperators().get(0);

Expand All @@ -457,23 +499,18 @@ private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, ParseContex
assert colName != null;
// Fetch the TableScan Operator.
Operator<?> op = parentOfRS;
while (!(op == null || op instanceof TableScanOperator)) {
while (!(op == null || op instanceof TableScanOperator ||
op instanceof ReduceSinkOperator)) {
op = op.getParentOperators().get(0);
}
assert op != null;

Table table = ((TableScanOperator) op).getConf().getTableMetadata();
if (table.isPartitionKey(colName)) {
// The column is partition column, skip the optimization.
return false;
}
Preconditions.checkNotNull(op);

// If hint is provided and only hinted semijoin optimizations should be
// created, then skip other columns on the table
if (parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
&& sjHint.getColName() != null &&
!internalColName.equals(sjHint.getColName())) {
return false;
if (op instanceof TableScanOperator) {
Table table = ((TableScanOperator) op).getConf().getTableMetadata();
if (table.isPartitionKey(colName)) {
// The column is partition column, skip the optimization.
return false;
}
}

// Check if there already exists a semijoin branch
Expand Down
6 changes: 3 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public class ParseContext {
private Map<ExprNodeDesc, GroupByOperator> colExprToGBMap =
new HashMap<>();

private Map<String, SemiJoinHint> semiJoinHints;
private Map<String, List<SemiJoinHint>> semiJoinHints;
public ParseContext() {
}

Expand Down Expand Up @@ -674,11 +674,11 @@ public Map<ExprNodeDesc, GroupByOperator> getColExprToGBMap() {
return colExprToGBMap;
}

public void setSemiJoinHints(Map<String, SemiJoinHint> hints) {
public void setSemiJoinHints(Map<String, List<SemiJoinHint>> hints) {
this.semiJoinHints = hints;
}

public Map<String, SemiJoinHint> getSemiJoinHints() {
public Map<String, List<SemiJoinHint>> getSemiJoinHints() {
return semiJoinHints;
}
}
88 changes: 41 additions & 47 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9018,23 +9018,20 @@ private void parseStreamTables(QBJoinTree joinTree, QB qb) {
}

/** Parses semjoin hints in the query and returns the table names mapped to filter size, or -1 if not specified.
* Hints can be in 3 formats
* Hints can be in 2 formats
* 1. TableName, ColumnName, bloom filter entries
* 2. TableName, bloom filter entries, and
* 3. TableName, ColumnName
* 2. TableName, ColumnName
* */
private Map<String, SemiJoinHint> parseSemiJoinHint(List<ASTNode> hints) throws SemanticException {
private Map<String, List<SemiJoinHint>> parseSemiJoinHint(List<ASTNode> hints) throws SemanticException {
if (hints == null || hints.size() == 0) return null;
Map<String, SemiJoinHint> result = null;
Map<String, List<SemiJoinHint>> result = null;
for (ASTNode hintNode : hints) {
for (Node node : hintNode.getChildren()) {
ASTNode hint = (ASTNode) node;
if (hint.getChild(0).getType() != HintParser.TOK_LEFTSEMIJOIN) continue;
if (result == null) {
result = new HashMap<>();
}
String alias = null;
String colName = null;
Tree args = hint.getChild(1);
if (args.getChildCount() == 1) {
String text = args.getChild(0).getText();
Expand All @@ -9043,46 +9040,9 @@ private Map<String, SemiJoinHint> parseSemiJoinHint(List<ASTNode> hints) throws
return result;
}
}
for (int i = 0; i < args.getChildCount(); i++) {
// We can have table names, column names or sizes here (or incorrect hint if the user is so inclined).
String text = args.getChild(i).getText();
Integer number = null;
try {
number = Integer.parseInt(text);
} catch (NumberFormatException ex) { // Ignore.
}
if (number != null) {
if (alias == null) {
throw new SemanticException("Invalid semijoin hint - arg " + i + " ("
+ text + ") is a number but the previous one is not an alias");
}
if (result.get(alias) != null) {
// A hint with same alias already present, throw
throw new SemanticException("A hint with alias " + alias +
" already present. Please use unique aliases");
}
SemiJoinHint sjHint = new SemiJoinHint(alias, colName, number);
result.put(alias, sjHint);
alias = null;
colName = null;
} else {
if (alias == null) {
alias = text;
} else if (colName == null) {
colName = text;
} else {
// No bloom filter entries provided.
if (result.get(alias) != null) {
// A hint with same alias already present, throw
throw new SemanticException("A hint with alias " + alias +
" already present. Please use unique aliases");
}
SemiJoinHint sjHint = new SemiJoinHint(alias, colName, null);
result.put(alias, sjHint);
alias = text;
colName = null;
}
}
int curIdx = 0;
while(curIdx < args.getChildCount()) {
curIdx = parseSingleSemiJoinHint(args, curIdx, result);
}
}
}
Expand All @@ -9092,6 +9052,40 @@ private Map<String, SemiJoinHint> parseSemiJoinHint(List<ASTNode> hints) throws
return result;
}

private int parseSingleSemiJoinHint(Tree args, int curIdx, Map<String, List<SemiJoinHint>> result)
throws SemanticException {
// Check if there are enough entries in the tree to constitute a hint.
int numEntriesLeft = args.getChildCount() - curIdx;
if (numEntriesLeft < 2) {
throw new SemanticException("User provided only 1 entry for the hint with alias "
+ args.getChild(curIdx).getText());
}

String alias = args.getChild(curIdx++).getText();
// validate
if (StringUtils.isNumeric(alias)) {
throw new SemanticException("User provided bloom filter entries when alias is expected");
}

String colName = args.getChild(curIdx++).getText();
// validate
if (StringUtils.isNumeric(colName)) {
throw new SemanticException("User provided bloom filter entries when column name is expected");
}

Integer number = null;
if (numEntriesLeft > 2) {
// Check if there exists bloom filter size entry
try {
number = Integer.parseInt(args.getChild(curIdx).getText());
curIdx++;
} catch (NumberFormatException e) { // Ignore
}
}
result.computeIfAbsent(alias, value -> new ArrayList<>()).add(new SemiJoinHint(colName, number));
return curIdx;
}

/**
* Merges node to target
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,14 @@
package org.apache.hadoop.hive.ql.parse;

public class SemiJoinHint {
private String tabAlias;
private String colName;
private Integer numEntries;

public SemiJoinHint(String tabAlias, String colName, Integer numEntries) {
this.tabAlias = tabAlias;
public SemiJoinHint(String colName, Integer numEntries) {
this.colName = colName;
this.numEntries = numEntries;
}

public String getTabAlias() {
return tabAlias;
}

public String getColName() {
return colName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,14 +854,13 @@ public ColumnOrigin(ExprNodeColumnDesc col, Operator<?> op) {
}

private static ExprNodeDesc findParentExpr(ExprNodeColumnDesc col, Operator<?> op) {
if (op instanceof ReduceSinkOperator) {
return col;
}

ExprNodeDesc parentExpr = col;
Map<String, ExprNodeDesc> mapping = op.getColumnExprMap();
if (mapping != null) {
parentExpr = mapping.get(col.getColumn());
if (parentExpr == null && op instanceof ReduceSinkOperator) {
return col;
}
}
return parentExpr;
}
Expand Down
Loading

0 comments on commit ee91b8e

Please sign in to comment.