Skip to content

Commit

Permalink
[SPARK-2849] Handle driver configs separately in client mode
Browse files Browse the repository at this point in the history
In client deploy mode, the driver is launched from within `SparkSubmit`'s JVM. This means by the time we parse Spark configs from `spark-defaults.conf`, it is already too late to control certain properties of the driver's JVM. We currently ignore these configs in client mode altogether.
```
spark.driver.memory
spark.driver.extraJavaOptions
spark.driver.extraClassPath
spark.driver.extraLibraryPath
```
This PR handles these properties before launching the driver JVM. It achieves this by spawning a separate JVM that runs a new class called `SparkSubmitDriverBootstrapper`, which spawns `SparkSubmit` as a sub-process with the appropriate classpath, library paths, java opts and memory.

Author: Andrew Or <[email protected]>

Closes apache#1845 from andrewor14/handle-configs-bash and squashes the following commits:

bed4bdf [Andrew Or] Change a few comments / messages (minor)
24dba60 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash
08fd788 [Andrew Or] Warn against external usages of SparkSubmitDriverBootstrapper
ff34728 [Andrew Or] Minor comments
51aeb01 [Andrew Or] Filter out JVM memory in Scala rather than Bash (minor)
9a778f6 [Andrew Or] Fix PySpark: actually kill driver on termination
d0f20db [Andrew Or] Don't pass empty library paths, classpath, java opts etc.
a78cb26 [Andrew Or] Revert a few changes in utils.sh (minor)
9ba37e2 [Andrew Or] Don't barf when the properties file does not exist
8867a09 [Andrew Or] A few more naming things (minor)
19464ad [Andrew Or] SPARK_SUBMIT_JAVA_OPTS -> SPARK_SUBMIT_OPTS
d6488f9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash
1ea6bbe [Andrew Or] SparkClassLauncher -> SparkSubmitDriverBootstrapper
a91ea19 [Andrew Or] Fix precedence of library paths, classpath, java opts and memory
158f813 [Andrew Or] Remove "client mode" boolean argument
c84f5c8 [Andrew Or] Remove debug print statement (minor)
b71f52b [Andrew Or] Revert a few more changes (minor)
7d94a8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash
3a8235d [Andrew Or] Only parse the properties file if special configs exist
c37e08d [Andrew Or] Revert a few more changes
a396eda [Andrew Or] Nullify my own hard work to simplify bash
0effa1e [Andrew Or] Add code in Scala that handles special configs
c886568 [Andrew Or] Fix lines too long + a few comments / style (minor)
7a4190a [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash
7396be2 [Andrew Or] Explicitly comment that multi-line properties are not supported
fa11ef8 [Andrew Or] Parse the properties file only if the special configs exist
371cac4 [Andrew Or] Add function prefix (minor)
be99eb3 [Andrew Or] Fix tests to not include multi-line configs
bd0d468 [Andrew Or] Simplify parsing config file by ignoring multi-line arguments
56ac247 [Andrew Or] Use eval and set to simplify splitting
8d4614c [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash
aeb79c7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash
2732ac0 [Andrew Or] Integrate BASH tests into dev/run-tests + log error properly
8d26a5c [Andrew Or] Add tests for bash/utils.sh
4ae24c3 [Andrew Or] Fix bug: escape properly in quote_java_property
b3c4cd5 [Andrew Or] Fix bug: count the number of quotes instead of detecting presence
c2273fc [Andrew Or] Fix typo (minor)
e793e5f [Andrew Or] Handle multi-line arguments
5d8f8c4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra
c7b9926 [Andrew Or] Minor changes to spark-defaults.conf.template
a992ae2 [Andrew Or] Escape spark.*.extraJavaOptions correctly
aabfc7e [Andrew Or] escape -> split (minor)
45a1eb9 [Andrew Or] Fix bug: escape escaped backslashes and quotes properly...
1cdc6b1 [Andrew Or] Fix bug: escape escaped double quotes properly
c854859 [Andrew Or] Add small comment
c13a2cb [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra
8e552b7 [Andrew Or] Include an example of spark.*.extraJavaOptions
de765c9 [Andrew Or] Print spark-class command properly
a4df3c4 [Andrew Or] Move parsing and escaping logic to utils.sh
dec2343 [Andrew Or] Only export variables if they exist
fa2136e [Andrew Or] Escape Java options + parse java properties files properly
ef12f74 [Andrew Or] Minor formatting
4ec22a1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra
e5cfb46 [Andrew Or] Collapse duplicate code + fix potential whitespace issues
4edcaa8 [Andrew Or] Redirect stdout to stderr for python
130f295 [Andrew Or] Handle spark.driver.memory too
98dd8e3 [Andrew Or] Add warning if properties file does not exist
8843562 [Andrew Or] Fix compilation issues...
75ee6b4 [Andrew Or] Remove accidentally added file
63ed2e9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra
0025474 [Andrew Or] Revert SparkSubmit handling of --driver-* options for only cluster mode
a2ab1b0 [Andrew Or] Parse spark.driver.extra* in bash
250cb95 [Andrew Or] Do not ignore spark.driver.extra* for client mode
  • Loading branch information
andrewor14 authored and pwendell committed Aug 20, 2014
1 parent c1ba4cd commit b3ec51b
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 56 deletions.
49 changes: 38 additions & 11 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# limitations under the License.
#

# NOTE: Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!

cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
Expand All @@ -39,7 +41,7 @@ fi

if [ -n "$SPARK_MEM" ]; then
echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2
echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2
echo -e "(e.g., spark.executor.memory or spark.driver.memory)." 1>&2
fi

# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
Expand Down Expand Up @@ -73,11 +75,17 @@ case "$1" in
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;;

# Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS
'org.apache.spark.deploy.SparkSubmit')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \
-Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
# Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
# SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
'org.apache.spark.deploy.SparkSubmit')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
fi
if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
fi
;;

*)
Expand All @@ -101,11 +109,12 @@ fi
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"

# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
fi
export JAVA_OPTS

# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!

TOOLS_DIR="$FWDIR"/tools
Expand Down Expand Up @@ -146,10 +155,28 @@ if $cygwin; then
fi
export CLASSPATH

if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: " 1>&2
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
echo -e "========================================\n" 1>&2
# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
# Here we must parse the properties file for relevant "spark.driver.*" configs before launching
# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM
# to prepare the launch environment of this driver JVM.

if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
# This is used only if the properties file actually contains these special configs
# Export the environment variables needed by SparkSubmitDriverBootstrapper
export RUNNER
export CLASSPATH
export JAVA_OPTS
export OUR_JAVA_MEM
export SPARK_CLASS=1
shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@"
else
# Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala
if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
echo -n "Spark Command: " 1>&2
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
echo -e "========================================\n" 1>&2
fi
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
28 changes: 23 additions & 5 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
# limitations under the License.
#

# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!

export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=("$@")

while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
DEPLOY_MODE=$2
SPARK_SUBMIT_DEPLOY_MODE=$2
elif [ "$1" = "--properties-file" ]; then
SPARK_SUBMIT_PROPERTIES_FILE=$2
elif [ "$1" = "--driver-memory" ]; then
DRIVER_MEMORY=$2
export SPARK_SUBMIT_DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
Expand All @@ -35,10 +39,24 @@ while (($#)); do
shift
done

DEPLOY_MODE=${DEPLOY_MODE:-"client"}
DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}

# For client mode, the driver will be launched in the same JVM that launches
# SparkSubmit, so we may need to read the properties file for any extra class
# paths, library paths, java options and memory early on. Otherwise, it will
# be too late by the time the driver JVM has started.

if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
# Parse the properties file only if the special configs exist
contains_special_configs=$(
grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \
grep -v "^[[:space:]]*#"
)
if [ -n "$contains_special_configs" ]; then
export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
fi
fi

exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
Expand Down
Empty file modified bin/utils.sh
100644 → 100755
Empty file.
10 changes: 6 additions & 4 deletions conf/spark-defaults.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# This is useful for setting default environmental settings.

# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetail -Dkey=value -Dnumbers="one two three"
25 changes: 0 additions & 25 deletions core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,3 @@ private[spark] object PythonUtils {
paths.filter(_ != "").mkString(File.pathSeparator)
}
}


/**
* A utility class to redirect the child process's stdout or stderr.
*/
private[spark] class RedirectThread(
in: InputStream,
out: OutputStream,
name: String)
extends Thread(name) {

setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.api.python

import java.lang.Runtime
import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}

import scala.collection.mutable
import scala.collection.JavaConversions._

import org.apache.spark._
import org.apache.spark.util.Utils
import org.apache.spark.util.{RedirectThread, Utils}

private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.net.URI
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._

import org.apache.spark.api.python.{PythonUtils, RedirectThread}
import org.apache.spark.util.Utils
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}

/**
* A main class used by spark-submit to launch Python applications. It executes python as a
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,21 @@ object SparkSubmit {
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),

// Other options
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.files")
sysProp = "spark.files"),

// Only process driver specific options for cluster mode here,
// because they have already been processed in bash for client mode
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER,
sysProp = "spark.driver.extraLibraryPath")
)

// In client mode, launch the application main class directly
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io.File

import scala.collection.JavaConversions._

import org.apache.spark.util.{RedirectThread, Utils}

/**
* Launch an application through Spark submit in client mode with the appropriate classpath,
* library paths, java options and memory. These properties of the JVM must be set before the
* driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
* of parsing the properties file for such relevant configs in Bash.
*
* Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args>
*/
private[spark] object SparkSubmitDriverBootstrapper {

// Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
// Any changes made there must be reflected in this file.

def main(args: Array[String]): Unit = {

// This should be called only from `bin/spark-class`
if (!sys.env.contains("SPARK_CLASS")) {
System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!")
System.exit(1)
}

val submitArgs = args
val runner = sys.env("RUNNER")
val classpath = sys.env("CLASSPATH")
val javaOpts = sys.env("JAVA_OPTS")
val defaultDriverMemory = sys.env("OUR_JAVA_MEM")

// Spark submit specific environment variables
val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS")

assume(runner != null, "RUNNER must be set")
assume(classpath != null, "CLASSPATH must be set")
assume(javaOpts != null, "JAVA_OPTS must be set")
assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")

// Parse the properties file for the equivalent spark.driver.* configs
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
val confDriverMemory = properties.get("spark.driver.memory")
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
val confClasspath = properties.get("spark.driver.extraClassPath")
val confJavaOpts = properties.get("spark.driver.extraJavaOptions")

// Favor Spark submit arguments over the equivalent configs in the properties file.
// Note that we do not actually use the Spark submit values for library path, classpath,
// and Java opts here, because we have already captured them in Bash.

val newDriverMemory = submitDriverMemory
.orElse(confDriverMemory)
.getOrElse(defaultDriverMemory)

val newLibraryPath =
if (submitLibraryPath.isDefined) {
// SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
""
} else {
confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
}

val newClasspath =
if (submitClasspath.isDefined) {
// SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
classpath
} else {
classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
}

val newJavaOpts =
if (submitJavaOpts.isDefined) {
// SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS
javaOpts
} else {
javaOpts + confJavaOpts.map(" " + _).getOrElse("")
}

val filteredJavaOpts = Utils.splitCommandString(newJavaOpts)
.filterNot(_.startsWith("-Xms"))
.filterNot(_.startsWith("-Xmx"))

// Build up command
val command: Seq[String] =
Seq(runner) ++
Seq("-cp", newClasspath) ++
Seq(newLibraryPath) ++
filteredJavaOpts ++
Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
Seq("org.apache.spark.deploy.SparkSubmit") ++
submitArgs

// Print the launch command. This follows closely the format used in `bin/spark-class`.
if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
System.err.print("Spark Command: ")
System.err.println(command.mkString(" "))
System.err.println("========================================\n")
}

// Start the driver JVM
val filteredCommand = command.filter(_.nonEmpty)
val builder = new ProcessBuilder(filteredCommand)
val process = builder.start()

// Redirect stdin, stdout, and stderr to/from the child JVM
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdinThread.start()
stdoutThread.start()
stderrThread.start()

// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
stdinThread.join()
process.destroy()
}

}
Loading

0 comments on commit b3ec51b

Please sign in to comment.