Skip to content

Commit

Permalink
HIVE-7299: Enable metadata only optimization on Tez (Gunther Hagleitn…
Browse files Browse the repository at this point in the history
…er, reviewed by Vikram Dixit K)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1608959 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
hagleitn committed Jul 8, 2014
1 parent 65ff5b0 commit 906314e
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 59 deletions.
2 changes: 1 addition & 1 deletion itests/qtest/testconfiguration.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
minimr.query.files=stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q
minimr.query.negative.files=cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q
minitez.query.files=tez_fsstat.q,mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q,tez_schema_evolution.q,tez_join_hash.q
minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q
minitez.query.files.shared=cross_product_check_1.q,cross_product_check_2.q,dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q,transform1.q,transform2.q,transform_ppr1.q,transform_ppr2.q,script_env_var1.q,script_env_var2.q,script_pipe.q,scriptfile1.q,metadataonly1.q
beeline.positive.exclude=add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q
8 changes: 7 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -35,6 +36,7 @@
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
Expand Down Expand Up @@ -365,6 +367,10 @@ public T getWork() {
return work;
}

public Collection<MapWork> getMapWork() {
return Collections.<MapWork>emptyList();
}

public void setId(String id) {
this.id = id;
}
Expand All @@ -389,7 +395,7 @@ public boolean hasReduce() {
return false;
}

public Operator<? extends OperatorDesc> getReducer() {
public Operator<? extends OperatorDesc> getReducer(MapWork work) {
return null;
}

Expand Down
48 changes: 35 additions & 13 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ public static MapredWork getMapRedWork(Configuration conf) {
return w;
}

public static void cacheMapWork(Configuration conf, MapWork work, Path hiveScratchDir) {
cacheBaseWork(conf, MAP_PLAN_NAME, work, hiveScratchDir);
}

public static void setMapWork(Configuration conf, MapWork work) {
setBaseWork(conf, MAP_PLAN_NAME, work);
}
Expand All @@ -283,6 +287,17 @@ public static ReduceWork getReduceWork(Configuration conf) {
return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
}

public static void cacheBaseWork(Configuration conf, String name, BaseWork work,
Path hiveScratchDir) {
try {
setPlanPath(conf, hiveScratchDir);
setBaseWork(conf, name, work);
} catch (IOException e) {
LOG.error("Failed to cache plan", e);
throw new RuntimeException(e);
}
}

/**
* Pushes work into the global work map
*/
Expand Down Expand Up @@ -2324,13 +2339,15 @@ public static long sumOfExcept(Map<String, Long> aliasToSize,

public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx)
throws Exception {
ContentSummary cs = ctx.getCS(dirPath);
if (cs != null) {
LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + " num files: "
+ cs.getFileCount() + " num directories: " + cs.getDirectoryCount());
return (cs.getLength() == 0 && cs.getFileCount() == 0 && cs.getDirectoryCount() <= 1);
} else {
LOG.info("Content Summary not cached for " + dirPath);
if (ctx != null) {
ContentSummary cs = ctx.getCS(dirPath);
if (cs != null) {
LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + " num files: "
+ cs.getFileCount() + " num directories: " + cs.getDirectoryCount());
return (cs.getLength() == 0 && cs.getFileCount() == 0 && cs.getDirectoryCount() <= 1);
} else {
LOG.info("Content Summary not cached for " + dirPath);
}
}
return isEmptyPath(job, dirPath);
}
Expand Down Expand Up @@ -2958,7 +2975,13 @@ public static double getHighestSamplePercentage (MapWork work) {
* so we don't want to depend on scratch dir and context.
*/
public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exception {
List<Path> paths = getInputPaths(job, work, null, null);
String scratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);

// we usually don't want to create dummy files for tez, however the metadata only
// optimization relies on it.
List<Path> paths = getInputPaths(job, work, new Path(scratchDir), null,
!work.isUseOneNullRowInputFormat());

return paths;
}

Expand All @@ -2976,8 +2999,8 @@ public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exce
* @return List of paths to process for the given MapWork
* @throws Exception
*/
public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx)
throws Exception {
public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir,
Context ctx, boolean skipDummy) throws Exception {
int sequenceNumber = 0;

Set<Path> pathsProcessed = new HashSet<Path>();
Expand All @@ -3002,7 +3025,7 @@ public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScrat
pathsProcessed.add(path);

LOG.info("Adding input file " + path);
if (!HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
if (!skipDummy
&& isEmptyPath(job, path, ctx)) {
path = createDummyFileForEmptyPartition(path, job, work,
hiveScratchDir, alias, sequenceNumber++);
Expand All @@ -3020,8 +3043,7 @@ && isEmptyPath(job, path, ctx)) {
// T2) x;
// If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
// rows)
if (path == null
&& !HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
if (path == null && !skipDummy) {
path = createDummyFileForEmptyTable(job, work, hiveScratchDir,
alias, sequenceNumber++);
pathsToAdd.add(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public int execute(DriverContext driverContext) {
}
}
work.configureJobConf(job);
List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx);
List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx, false);
Utilities.setInputPaths(job, inputPaths);

Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
Expand Down Expand Up @@ -788,6 +788,11 @@ public static String generateCmdLine(HiveConf hconf, Context ctx)
return " -jobconffile " + hConfFilePath.toString();
}

@Override
public Collection<MapWork> getMapWork() {
return Collections.<MapWork>singleton(getWork().getMapWork());
}

@Override
public boolean isMapRedTask() {
return true;
Expand Down
8 changes: 6 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.session.SessionState;
Expand Down Expand Up @@ -462,8 +463,11 @@ public static String isEligibleForLocalMode(HiveConf conf,
}

@Override
public Operator<? extends OperatorDesc> getReducer() {
return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer();
public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) {
if (getWork().getMapWork() == mapWork) {
return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer();
}
return null;
}

@Override
Expand Down
14 changes: 12 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
Expand Down Expand Up @@ -195,6 +196,10 @@ private JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
inpFormat = BucketizedHiveInputFormat.class.getName();
}

if (mapWork.isUseOneNullRowInputFormat()) {
inpFormat = CombineHiveInputFormat.class.getName();
}

conf.set("mapred.mapper.class", ExecMapper.class.getName());
conf.set("mapred.input.format.class", inpFormat);

Expand Down Expand Up @@ -413,7 +418,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
Path tezDir = getTezDir(mrScratchDir);

// set up the operator plan
Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
Utilities.cacheMapWork(conf, mapWork, mrScratchDir);

// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, mapWork);
Expand Down Expand Up @@ -441,6 +446,7 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
}
}
}

if (vertexHasCustomInput) {
useTezGroupedSplits = false;
// grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
Expand All @@ -459,7 +465,8 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
}
}

if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) {
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
// if we're generating the splits in the AM, we just need to set
// the correct plugin.
amSplitGeneratorClass = HiveSplitGenerator.class;
Expand All @@ -470,6 +477,9 @@ private Vertex createVertex(JobConf conf, MapWork mapWork,
numTasks = inputSplitInfo.getNumTasks();
}

// set up the operator plan
Utilities.setMapWork(conf, mapWork, mrScratchDir, false);

byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
map = new Vertex(mapWork.getName(),
new ProcessorDescriptor(MapTezProcessor.class.getName()).
Expand Down
42 changes: 42 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.exec.tez;

import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
Expand All @@ -36,6 +37,9 @@
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
Expand Down Expand Up @@ -354,4 +358,42 @@ public StageType getType() {
public String getName() {
return "TEZ";
}

@Override
public Collection<MapWork> getMapWork() {
List<MapWork> result = new LinkedList<MapWork>();
TezWork work = getWork();

// framework expects MapWork instances that have no physical parents (i.e.: union parent is
// fine, broadcast parent isn't)
for (BaseWork w: work.getAllWorkUnsorted()) {
if (w instanceof MapWork) {
List<BaseWork> parents = work.getParents(w);
boolean candidate = true;
for (BaseWork parent: parents) {
if (!(parent instanceof UnionWork)) {
candidate = false;
}
}
if (candidate) {
result.add((MapWork)w);
}
}
}
return result;
}

@Override
public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) {
List<BaseWork> children = getWork().getChildren(mapWork);
if (children.size() != 1) {
return null;
}

if (!(children.get(0) instanceof ReduceWork)) {
return null;
}

return ((ReduceWork)children.get(0)).getReducer();
}
}
Loading

0 comments on commit 906314e

Please sign in to comment.