Skip to content

Commit

Permalink
HIVE-17964: HoS: some spark configs doesn't require re-creating a ses…
Browse files Browse the repository at this point in the history
…sion (Rui reviewed by Xuefu)
  • Loading branch information
lirui-apache committed Nov 20, 2017
1 parent f1698b6 commit 966d2b3
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 37 deletions.
22 changes: 21 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class HiveConf extends Configuration {
private static final Map<String, ConfVars> metaConfs = new HashMap<String, ConfVars>();
private final List<String> restrictList = new ArrayList<String>();
private final Set<String> hiddenSet = new HashSet<String>();
private final List<String> rscList = new ArrayList<>();

private Pattern modWhiteListPattern = null;
private volatile boolean isSparkConfigUpdated = false;
Expand Down Expand Up @@ -3580,6 +3581,7 @@ public static enum ConfVars {
"hive.spark.client.secret.bits," +
"hive.spark.client.rpc.server.address," +
"hive.spark.client.rpc.server.port," +
"hive.spark.client.rpc.sasl.mechanisms," +
"bonecp.,"+
"hive.druid.broker.address.default,"+
"hive.druid.coordinator.address.default,"+
Expand All @@ -3600,6 +3602,12 @@ public static enum ConfVars {
"hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
"Comma separated list of variables which are used internally and should not be configurable."),

HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list",
SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," +
SPARK_CLIENT_FUTURE_TIMEOUT.varname,
"Comma separated list of variables which are related to remote spark context.\n" +
"Changing these variables will result in re-creating the spark session."),

HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s",
new TimeValidator(TimeUnit.SECONDS),
"Timeout for Running Query in seconds. A nonpositive value means infinite. " +
Expand Down Expand Up @@ -3927,7 +3935,7 @@ private boolean isSparkRelatedConfig(String name) {
if (sparkMaster != null && sparkMaster.startsWith("yarn")) {
result = true;
}
} else if (name.startsWith("hive.spark")) { // Remote Spark Context property.
} else if (rscList.stream().anyMatch(rscVar -> rscVar.equals(name))) { // Remote Spark Context property.
result = true;
} else if (name.equals("mapreduce.job.queuename")) {
// a special property starting with mapreduce that we would also like to effect if it changes
Expand Down Expand Up @@ -4409,6 +4417,7 @@ private void initialize(Class<?> cls) {
setupRestrictList();
hiddenSet.clear();
hiddenSet.addAll(HiveConfUtil.getHiddenSet(this));
setupRSCList();
}

/**
Expand Down Expand Up @@ -4799,6 +4808,17 @@ private void setupRestrictList() {
restrictList.add(ConfVars.HIVE_CONF_RESTRICTED_LIST.varname);
restrictList.add(ConfVars.HIVE_CONF_HIDDEN_LIST.varname);
restrictList.add(ConfVars.HIVE_CONF_INTERNAL_VARIABLE_LIST.varname);
restrictList.add(ConfVars.HIVE_SPARK_RSC_CONF_LIST.varname);
}

private void setupRSCList() {
rscList.clear();
String vars = this.getVar(ConfVars.HIVE_SPARK_RSC_CONF_LIST);
if (vars != null) {
for (String var : vars.split(",")) {
rscList.add(var.trim());
}
}
}

/**
Expand Down
23 changes: 23 additions & 0 deletions data/scripts/sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
import time

for line in sys.stdin.readlines():
time.sleep(3)
8 changes: 6 additions & 2 deletions ql/src/test/queries/clientnegative/spark_job_max_tasks.q
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
set hive.spark.job.max.tasks=2;

add file ../../data/scripts/sleep.py;

EXPLAIN
SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;

SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
8 changes: 6 additions & 2 deletions ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
set hive.spark.stage.max.tasks=1;

add file ../../data/scripts/sleep.py;

EXPLAIN
SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;

SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
39 changes: 23 additions & 16 deletions ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
PREHOOK: query: EXPLAIN
SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
POSTHOOK: query: EXPLAIN
SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
Expand All @@ -22,39 +24,43 @@ STAGE PLANS:
alias: src1
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: key, value
expressions: key (type: string)
outputColumnNames: key
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: sum(value)
keys: key (type: string)
mode: hash
outputColumnNames: _col0, _col1
outputColumnNames: _col0
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double)
Reducer 2
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col1 (type: double)
sort order: +
Transform Operator
command: python sleep.py
output info:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string)
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string)
Reducer 3
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
outputColumnNames: _col0, _col1
expressions: VALUE._col0 (type: string)
outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Expand All @@ -70,7 +76,8 @@ STAGE PLANS:
Processor Tree:
ListSink

PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
PREHOOK: Input: default@src1
#### A masked pattern was here ####
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
PREHOOK: query: EXPLAIN
SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
POSTHOOK: query: EXPLAIN
SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
Expand All @@ -22,39 +24,43 @@ STAGE PLANS:
alias: src1
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: key, value
expressions: key (type: string)
outputColumnNames: key
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: sum(value)
keys: key (type: string)
mode: hash
outputColumnNames: _col0, _col1
outputColumnNames: _col0
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double)
Reducer 2
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col1 (type: double)
sort order: +
Transform Operator
command: python sleep.py
output info:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string)
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string)
Reducer 3
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
outputColumnNames: _col0, _col1
expressions: VALUE._col0 (type: string)
outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Expand All @@ -70,7 +76,8 @@ STAGE PLANS:
Processor Tree:
ListSink

PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k
FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
PREHOOK: Input: default@src1
#### A masked pattern was here ####
Expand Down

0 comments on commit 966d2b3

Please sign in to comment.