Skip to content

Commit

Permalink
HIVE-13750: Avoid additional shuffle stage created by Sorted Dynamic …
Browse files Browse the repository at this point in the history
…Partition Optimizer when possible (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
  • Loading branch information
jcamachor committed May 19, 2016
1 parent f25b865 commit df72234
Show file tree
Hide file tree
Showing 9 changed files with 720 additions and 845 deletions.
1,021 changes: 500 additions & 521 deletions contrib/src/test/results/clientpositive/udf_row_sequence.q.out

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
// Create SelectDesc
SelectDesc selConf = new SelectDesc(descs, colNames);


// Create Select Operator
SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(
selConf, selRS, rsOp);
Expand Down Expand Up @@ -420,7 +419,6 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions,
// 1) Partition columns
// 2) Bucket number column
// 3) Sort columns
// 4) Null sort columns
Set<Integer> keyColsPosInVal = Sets.newLinkedHashSet();
ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList();
List<Integer> newSortOrder = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Stack;

import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -50,11 +51,12 @@
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;

import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* If two reducer sink operators share the same partition/sort columns and order,
Expand All @@ -65,6 +67,8 @@
*/
public class ReduceSinkDeDuplication extends Transform {

protected static final Logger LOG = LoggerFactory.getLogger(ReduceSinkDeDuplication.class);

private static final String RS = ReduceSinkOperator.getOperatorName();
private static final String GBY = GroupByOperator.getOperatorName();
private static final String JOIN = JoinOperator.getOperatorName();
Expand Down Expand Up @@ -253,7 +257,7 @@ protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReduc
*/
protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
throws SemanticException {
int[] result = checkStatus(cRS, pRS, minReducer);
int[] result = extractMergeDirections(cRS, pRS, minReducer);
if (result == null) {
return false;
}
Expand Down Expand Up @@ -334,7 +338,7 @@ protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minR
* 2. for -1, configuration of parent RS is more specific than child RS
* 3. for 1, configuration of child RS is more specific than parent RS
*/
private int[] checkStatus(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
private int[] extractMergeDirections(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
throws SemanticException {
ReduceSinkDesc cConf = cRS.getConf();
ReduceSinkDesc pConf = pRS.getConf();
Expand Down Expand Up @@ -494,6 +498,112 @@ protected Integer checkNumReducer(int creduce, int preduce) {
}
return 0;
}

protected boolean aggressiveDedup(ReduceSinkOperator cRS, ReduceSinkOperator pRS,
ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException {
assert cRS.getNumParent() == 1;

ReduceSinkDesc cConf = cRS.getConf();
ReduceSinkDesc pConf = pRS.getConf();
List<ExprNodeDesc> cKeys = cConf.getKeyCols();
List<ExprNodeDesc> pKeys = pConf.getKeyCols();

// Check that in the path between cRS and pRS, there are only Select operators
// i.e. the sequence must be pRS-SEL*-cRS
Operator<? extends OperatorDesc> parent = cRS.getParentOperators().get(0);
while (parent != pRS) {
assert parent.getNumParent() == 1;
if (!(parent instanceof SelectOperator)) {
return false;
}
parent = parent.getParentOperators().get(0);
}

// If child keys are null or empty, we bail out
if (cKeys == null || cKeys.isEmpty()) {
return false;
}
// If parent keys are null or empty, we bail out
if (pKeys == null || pKeys.isEmpty()) {
return false;
}

// Backtrack key columns of cRS to pRS
// If we cannot backtrack any of the columns, bail out
List<ExprNodeDesc> cKeysInParentRS = ExprNodeDescUtils.backtrack(cKeys, cRS, pRS);
for (int i = 0; i < cKeysInParentRS.size(); i++) {
ExprNodeDesc pexpr = cKeysInParentRS.get(i);
if (pexpr == null) {
// We cannot backtrack the expression, we bail out
return false;
}
}
cRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(cKeysInParentRS, cRS, pRS));

// Backtrack partition columns of cRS to pRS
// If we cannot backtrack any of the columns, bail out
List<ExprNodeDesc> cPartitionInParentRS = ExprNodeDescUtils.backtrack(
cConf.getPartitionCols(), cRS, pRS);
for (int i = 0; i < cPartitionInParentRS.size(); i++) {
ExprNodeDesc pexpr = cPartitionInParentRS.get(i);
if (pexpr == null) {
// We cannot backtrack the expression, we bail out
return false;
}
}
cRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(cPartitionInParentRS, cRS, pRS));

// Backtrack value columns of cRS to pRS
// If we cannot backtrack any of the columns, bail out
List<ExprNodeDesc> cValueInParentRS = ExprNodeDescUtils.backtrack(
cConf.getValueCols(), cRS, pRS);
for (int i = 0; i < cValueInParentRS.size(); i++) {
ExprNodeDesc pexpr = cValueInParentRS.get(i);
if (pexpr == null) {
// We cannot backtrack the expression, we bail out
return false;
}
}
cRS.getConf().setValueCols(ExprNodeDescUtils.backtrack(cValueInParentRS, cRS, pRS));

// Backtrack bucket columns of cRS to pRS (if any)
// If we cannot backtrack any of the columns, bail out
if (cConf.getBucketCols() != null) {
List<ExprNodeDesc> cBucketInParentRS = ExprNodeDescUtils.backtrack(
cConf.getBucketCols(), cRS, pRS);
for (int i = 0; i < cBucketInParentRS.size(); i++) {
ExprNodeDesc pexpr = cBucketInParentRS.get(i);
if (pexpr == null) {
// We cannot backtrack the expression, we bail out
return false;
}
}
cRS.getConf().setBucketCols(ExprNodeDescUtils.backtrack(cBucketInParentRS, cRS, pRS));
}

// Update column expression map
for (Entry<String, ExprNodeDesc> e : cRS.getColumnExprMap().entrySet()) {
e.setValue(ExprNodeDescUtils.backtrack(e.getValue(), cRS, pRS));
}

// Replace pRS with cRS and remove operator sequence from pRS to cRS
// Recall that the sequence must be pRS-SEL*-cRS
parent = cRS.getParentOperators().get(0);
while (parent != pRS) {
dedupCtx.addRemovedOperator(parent);
parent = parent.getParentOperators().get(0);
}
dedupCtx.addRemovedOperator(pRS);
cRS.getParentOperators().clear();
for (Operator<? extends OperatorDesc> op : pRS.getParentOperators()) {
op.replaceChild(pRS, cRS);
cRS.getParentOperators().add(op);
}
pRS.getParentOperators().clear();
pRS.getChildOperators().clear();

return true;
}
}

static class GroupbyReducerProc extends AbsctractReducerReducerProc {
Expand Down Expand Up @@ -601,11 +711,18 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup
ReduceSinkOperator pRS =
CorrelationUtilities.findPossibleParent(
cRS, ReduceSinkOperator.class, dedupCtx.trustScript());
if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
CorrelationUtilities.replaceReduceSinkWithSelectOperator(
cRS, dedupCtx.getPctx(), dedupCtx);
pRS.getConf().setDeduplicated(true);
return true;
if (pRS != null) {
// Try extended deduplication
if (aggressiveDedup(cRS, pRS, dedupCtx)) {
return true;
}
// Normal deduplication
if (merge(cRS, pRS, dedupCtx.minReducer())) {
CorrelationUtilities.replaceReduceSinkWithSelectOperator(
cRS, dedupCtx.getPctx(), dedupCtx);
pRS.getConf().setDeduplicated(true);
return true;
}
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ explain insert overwrite table over1k_part_orc partition(ds="foo", t) select si,
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0
Stage-0 depends on stages: Stage-1
Stage-2 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-1
Expand All @@ -178,33 +177,12 @@ STAGE PLANS:
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: smallint)
sort order: +
key expressions: _col4 (type: tinyint), _col0 (type: smallint)
sort order: ++
Map-reduce partition columns: _col4 (type: tinyint)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float)
Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: int), VALUE._col1 (type: bigint), VALUE._col2 (type: float), VALUE._col3 (type: tinyint)
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col4 (type: tinyint), _col0 (type: smallint)
sort order: ++
Map-reduce partition columns: _col4 (type: tinyint)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float)
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint)
Expand Down Expand Up @@ -232,7 +210,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_orc

Stage: Stage-3
Stage: Stage-2
Stats-Aggr Operator

PREHOOK: query: explain insert overwrite table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10
Expand Down Expand Up @@ -517,9 +495,8 @@ explain insert into table over1k_part_orc partition(ds="foo", t) select si,i,b,f
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0
Stage-0 depends on stages: Stage-1
Stage-2 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-1
Expand All @@ -536,33 +513,12 @@ STAGE PLANS:
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: smallint)
sort order: +
key expressions: _col4 (type: tinyint), _col0 (type: smallint)
sort order: ++
Map-reduce partition columns: _col4 (type: tinyint)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float)
Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: int), VALUE._col1 (type: bigint), VALUE._col2 (type: float), VALUE._col3 (type: tinyint)
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col4 (type: tinyint), _col0 (type: smallint)
sort order: ++
Map-reduce partition columns: _col4 (type: tinyint)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float)
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint)
Expand Down Expand Up @@ -590,7 +546,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
name: default.over1k_part_orc

Stage: Stage-3
Stage: Stage-2
Stats-Aggr Operator

PREHOOK: query: explain insert into table over1k_part_limit_orc partition(ds="foo", t) select si,i,b,f,t from over1k_orc where t is null or t=27 limit 10
Expand Down Expand Up @@ -1336,9 +1292,8 @@ POSTHOOK: query: explain insert overwrite table over1k_part2_orc partition(ds="f
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
Stage-3 depends on stages: Stage-0
Stage-0 depends on stages: Stage-1
Stage-2 depends on stages: Stage-0

STAGE PLANS:
Stage: Stage-1
Expand All @@ -1355,33 +1310,12 @@ STAGE PLANS:
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col1 (type: int)
sort order: +
key expressions: _col4 (type: tinyint), _col1 (type: int)
sort order: ++
Map-reduce partition columns: _col4 (type: tinyint)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: smallint), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint)
value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float)
Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: smallint), KEY.reducesinkkey0 (type: int), VALUE._col1 (type: bigint), VALUE._col2 (type: float), VALUE._col3 (type: tinyint)
outputColumnNames: _col0, _col1, _col2, _col3, _col4
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col4 (type: tinyint), _col1 (type: int)
sort order: ++
Map-reduce partition columns: _col4 (type: tinyint)
Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float)
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), KEY._col4 (type: tinyint)
Expand Down Expand Up @@ -1409,7 +1343,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.over1k_part2_orc

Stage: Stage-3
Stage: Stage-2
Stats-Aggr Operator

PREHOOK: query: explain insert overwrite table over1k_part2_orc partition(ds="foo",t) select si,i,b,f,t from (select * from over1k_orc order by i limit 10) tmp where t is null or t=27
Expand Down
Loading

0 comments on commit df72234

Please sign in to comment.