Skip to content

Commit

Permalink
[SPARK-5479] [YARN] Handle --py-files correctly in YARN.
Browse files Browse the repository at this point in the history
The bug description is a little misleading: the actual issue is that
.py files are not handled correctly when distributed by YARN. They're
added to "spark.submit.pyFiles", which, when processed by context.py,
explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS),
and that does not include .py files.

On top of that, archives were not handled at all! They made it to the
driver's python path, but never made it to executors, since the mechanism
used to propagate their location (spark.submit.pyFiles) only works on
the driver side.

So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH
correctly for both driver and executors. Individual .py files are
placed in a subdirectory of the container's local dir in the cluster,
which is then added to the python path. Archives are added directly.

The change, as a side effect, ends up solving the symptom described
in the bug. The issue was not that the files were not being distributed,
but that they were never made visible to the python application
running under Spark.

Also included is a proper unit test for running python on YARN, which
broke in several different ways with the previous code.

A short walk around of the changes:
- SparkSubmit does not try to be smart about how YARN handles python
  files anymore. It just passes down the configs to the YARN client
  code.
- The YARN client distributes python files and archives differently,
  placing the files in a subdirectory.
- The YARN client now sets PYTHONPATH for the processes it launches;
  to properly handle different locations, it uses YARN's support for
  embedding env variables, so to avoid YARN expanding those at the
  wrong time, SparkConf is now propagated to the AM using a conf file
  instead of command line options.
- Because the Client initialization code is a maze of implicit
  dependencies, some code needed to be moved around to make sure
  all needed state was available when the code ran.
- The pyspark tests in YarnClusterSuite now actually distribute and try
  to use both a python file and an archive containing a different python
  module. Also added a yarn-client tests for completeness.
- I cleaned up some of the code around distributing files to YARN, to
  avoid adding more copied & pasted code to handle the new files being
  distributed.

Author: Marcelo Vanzin <[email protected]>

Closes apache#6360 from vanzin/SPARK-5479 and squashes the following commits:

bcaf7e6 [Marcelo Vanzin] Feedback.
c47501f [Marcelo Vanzin] Fix yarn-client mode.
46b1d0c [Marcelo Vanzin] Merge branch 'master' into SPARK-5479
c743778 [Marcelo Vanzin] Only pyspark cares about python archives.
c8e5a82 [Marcelo Vanzin] Actually run pyspark in client mode.
705571d [Marcelo Vanzin] Move some code to the YARN module.
1dd4d0c [Marcelo Vanzin] Review feedback.
71ee736 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479
220358b [Marcelo Vanzin] Scalastyle.
cdbb990 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479
7fe3cd4 [Marcelo Vanzin] No need to distribute primary file to executors.
09045f1 [Marcelo Vanzin] Style.
943cbf4 [Marcelo Vanzin] [SPARK-5479] [yarn] Handle --py-files correctly in YARN.
  • Loading branch information
Marcelo Vanzin authored and Andrew Or committed Jun 10, 2015
1 parent 8f7308f commit 3811290
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 208 deletions.
77 changes: 17 additions & 60 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,55 +324,20 @@ object SparkSubmit {
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
args.files = mergeFileLists(args.files, args.pyFiles)
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
if (pyArchivesEnvOpt.isDefined) {
pyArchives = pyArchivesEnvOpt.get
} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
}
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
if (!py4jFile.exists()) {
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
"in yarn mode.")
}
pythonPath += pyArchivesFile.getAbsolutePath()
pythonPath += py4jFile.getAbsolutePath()
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map { localPath =>
val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
new Path(localPath).getName
} else {
localURI.getPath
}
}.mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand All @@ -386,19 +351,10 @@ object SparkSubmit {
}
}

if (isYarnCluster) {
// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
// that can be distributed with the job
if (args.isPython) {
args.files = mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.pyFiles)
}

if (isYarnCluster && args.isR) {
// In yarn-cluster mode for a R app, add primary resource to files
// that can be distributed with the job
if (args.isR) {
args.files = mergeFileLists(args.files, args.primaryResource)
}
args.files = mergeFileLists(args.files, args.primaryResource)
}

// Special flag to avoid deprecation warnings at the client
Expand Down Expand Up @@ -515,17 +471,18 @@ object SparkSubmit {
}
}

// Let YARN know it's a pyspark app, so it distributes needed libraries.
if (clusterManager == YARN && args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
val mainPyFile = new Path(args.primaryResource).getName
childArgs += ("--primary-py-file", mainPyFile)
childArgs += ("--primary-py-file", args.primaryResource)
if (args.pyFiles != null) {
// These files will be distributed to each machine's working directory, so strip the
// path prefix
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
childArgs += ("--py-files", pyFilesNames)
childArgs += ("--py-files", args.pyFiles)
}
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand All @@ -46,6 +46,14 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {

// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
if (args.propertiesFile != null) {
Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}

// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.

Expand Down Expand Up @@ -490,9 +498,11 @@ private[spark] class ApplicationMaster(
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}

var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
System.setProperty("spark.submit.pyFiles",
PythonRunner.formatPaths(args.pyFiles).mkString(","))
// When running pyspark, the app is run using PythonRunner. The second argument is the list
// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
Expand All @@ -503,9 +513,7 @@ private[spark] class ApplicationMaster(
val userThread = new Thread {
override def run() {
try {
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running users class")
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
var pyFiles: String = null
var userArgs: Seq[String] = Seq[String]()
var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
var propertiesFile: String = null

parseArgs(args.toList)

Expand Down Expand Up @@ -59,10 +59,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
primaryRFile = value
args = tail

case ("--py-files") :: value :: tail =>
pyFiles = value
args = tail

case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
Expand All @@ -79,6 +75,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
executorCores = value
args = tail

case ("--properties-file") :: value :: tail =>
propertiesFile = value
args = tail

case _ =>
printUsageAndExit(1, args)
}
Expand Down
Loading

0 comments on commit 3811290

Please sign in to comment.