Skip to content

Commit

Permalink
[HOTFIX] Wait for EOF only for the PySpark shell
Browse files Browse the repository at this point in the history
In `SparkSubmitDriverBootstrapper`, we wait for the parent process to send us an `EOF` before finishing the application. This is applicable for the PySpark shell because we terminate the application the same way. However if we run a python application, for instance, the JVM actually never exits unless it receives a manual EOF from the user. This is causing a few tests to timeout.

We only need to do this for the PySpark shell because Spark submit runs as a python subprocess only in this case. Thus, the normal Spark shell doesn't need to go through this case even though it is also a REPL.

Thanks davies for reporting this.

Author: Andrew Or <[email protected]>

Closes apache#2170 from andrewor14/bootstrap-hotfix and squashes the following commits:

42963f5 [Andrew Or] Do not wait for EOF unless this is the pyspark shell
  • Loading branch information
andrewor14 authored and pwendell committed Aug 28, 2014
1 parent f38fab9 commit dafe343
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 11 deletions.
2 changes: 2 additions & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ if [[ "$1" =~ \.py$ ]]; then
gatherSparkSubmitOpts "$@"
exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary "${APPLICATION_OPTS[@]}"
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper {
val builder = new ProcessBuilder(filteredCommand)
val process = builder.start()

// Redirect stdin, stdout, and stderr to/from the child JVM
// Redirect stdout and stderr from the child JVM
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdoutThread.start()
stderrThread.start()

// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
// a thread that contends with the subprocess in reading from System.in.
if (Utils.isWindows) {
// For the PySpark shell, the termination of this process is handled in java_gateway.py
process.waitFor()
} else {
// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
// Redirect stdin to child JVM only if we're not running Windows. This is because the
// subprocess there already reads directly from our stdin, so we should avoid spawning a
// thread that contends with the subprocess in reading from System.in.
val isWindows = Utils.isWindows
val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
stdinThread.join()
process.destroy()
// For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM
// should terminate on broken pipe, which signals that the parent process has exited. In
// Windows, the termination logic for the PySpark shell is handled in java_gateway.py
if (isPySparkShell) {
stdinThread.join()
process.destroy()
}
}
process.waitFor()
}

}

0 comments on commit dafe343

Please sign in to comment.