Skip to content

Commit

Permalink
[LIVY-411][REPL] Fix session cannot start issue when python or r pack…
Browse files Browse the repository at this point in the history
…age is missing

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/LIVY-411

In Livy 0.5.0, we supported multiple languages in one session, but it requires that all the packages should be available before session creation, such as R package and Python package, otherwise session will be failed to create. However, in some cases python or R package may be missing in Spark distro, this will make Livy fail to creation interactive session.

To fix this issue, we should not force such restriction on session creation, but delay the check until related interpreter is used. If such packaging is missing, we should make the related execution failure and return user the cause of issue, but don't affect other correctly started interpreters.

## How was this patch tested?

Existing test and manually verification on local cluster.

Author: jerryshao <[email protected]>

Closes apache#56 from jerryshao/LIVY-411.
  • Loading branch information
jerryshao committed Oct 20, 2017
1 parent 19004b5 commit 1bbefe6
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logg
}

private def waitFor[T](future: JFuture[T]): T = {
future.get(30, TimeUnit.SECONDS)
future.get(60, TimeUnit.SECONDS)
}

private def sessionList(): SessionList = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class Spark2JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll wit
}

private def waitFor[T](future: JFuture[T]): T = {
future.get(30, TimeUnit.SECONDS)
future.get(60, TimeUnit.SECONDS)
}

private def sessionList(): SessionList = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ object PythonInterpreter extends Logging {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
"pyspark.zip not found; cannot start pyspark interpreter.")

val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
.iterator()
.next()
.toFile

require(py4jFile.exists(),
"py4j-*-src.zip not found; cannot run pyspark application in YARN mode.")
"py4j-*-src.zip not found; cannot start pyspark interpreter.")
Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath)
}.getOrElse(Seq())
}
Expand Down
9 changes: 5 additions & 4 deletions repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,25 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)

override protected def createWrapper(msg: BaseProtocol.BypassJobRequest): BypassJobWrapper = {
Kind(msg.jobType) match {
case PySpark() =>
case PySpark() if session.interpreter(PySpark()).isDefined =>
new BypassJobWrapper(this, msg.id,
new BypassPySparkJob(msg.serializedJob,
session.interpreter(PySpark()).asInstanceOf[PythonInterpreter]))
session.interpreter(PySpark()).get.asInstanceOf[PythonInterpreter]))
case _ => super.createWrapper(msg)
}
}

override protected def addFile(path: String): Unit = {
if (!ClientConf.TEST_MODE) {
session.interpreter(PySpark()).asInstanceOf[PythonInterpreter].addFile(path)
session.interpreter(PySpark()).foreach { _.asInstanceOf[PythonInterpreter].addFile(path) }
}
super.addFile(path)
}

override protected def addJarOrPyFile(path: String): Unit = {
if (!ClientConf.TEST_MODE) {
session.interpreter(PySpark()).asInstanceOf[PythonInterpreter].addPyFile(this, conf, path)
session.interpreter(PySpark())
.foreach { _.asInstanceOf[PythonInterpreter].addPyFile(this, conf, path) }
}
super.addJarOrPyFile(path)
}
Expand Down
132 changes: 74 additions & 58 deletions repl/src/main/scala/org/apache/livy/repl/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.spark.{SparkConf, SparkContext}
import org.json4s.jackson.JsonMethods.{compact, render}
Expand Down Expand Up @@ -90,24 +91,29 @@ class Session(
entries.sc().sc
}

private[repl] def interpreter(kind: Kind): Interpreter = interpGroup.synchronized {
private[repl] def interpreter(kind: Kind): Option[Interpreter] = interpGroup.synchronized {
if (interpGroup.contains(kind)) {
interpGroup(kind)
Some(interpGroup(kind))
} else {
require(entries != null,
"SparkEntries should not be null when lazily initialize other interpreters.")

val interp = kind match {
case Spark() =>
// This should never be touched here.
throw new IllegalStateException("SparkInterpreter should not be lazily created.")
case PySpark() => PythonInterpreter(sparkConf, entries)
case SparkR() => SparkRInterpreter(sparkConf, entries)
try {
require(entries != null,
"SparkEntries should not be null when lazily initialize other interpreters.")

val interp = kind match {
case Spark() =>
// This should never be touched here.
throw new IllegalStateException("SparkInterpreter should not be lazily created.")
case PySpark() => PythonInterpreter(sparkConf, entries)
case SparkR() => SparkRInterpreter(sparkConf, entries)
}
interp.start()
interpGroup(kind) = interp
Some(interp)
} catch {
case NonFatal(e) =>
warn(s"Fail to start interpreter $kind", e)
None
}
interp.start()
interpGroup(kind) = interp

interp
}
}

Expand Down Expand Up @@ -171,8 +177,7 @@ class Session(

def complete(code: String, codeType: String, cursor: Int): Array[String] = {
val tpe = Kind(codeType)
val interp = interpreter(tpe)
interp.complete(code, cursor)
interpreter(tpe).map { _.complete(code, cursor) }.getOrElse(Array.empty)
}

def cancel(statementId: Int): Unit = {
Expand Down Expand Up @@ -251,7 +256,9 @@ class Session(
stateChangedCallback(newState)
}

private def executeCode(interp: Interpreter, executionCount: Int, code: String): String = {
private def executeCode(interp: Option[Interpreter],
executionCount: Int,
code: String): String = {
changeState(SessionState.Busy())

def transitToIdle() = {
Expand All @@ -261,52 +268,61 @@ class Session(
}
}

val resultInJson = try {
interp.execute(code) match {
case Interpreter.ExecuteSuccess(data) =>
transitToIdle()

(STATUS -> OK) ~
(EXECUTION_COUNT -> executionCount) ~
(DATA -> data)

case Interpreter.ExecuteIncomplete() =>
transitToIdle()

(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> "Error") ~
(EVALUE -> "incomplete statement") ~
(TRACEBACK -> Seq.empty[String])
val resultInJson = interp.map { i =>
try {
i.execute(code) match {
case Interpreter.ExecuteSuccess(data) =>
transitToIdle()

(STATUS -> OK) ~
(EXECUTION_COUNT -> executionCount) ~
(DATA -> data)

case Interpreter.ExecuteIncomplete() =>
transitToIdle()

(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> "Error") ~
(EVALUE -> "incomplete statement") ~
(TRACEBACK -> Seq.empty[String])

case Interpreter.ExecuteError(ename, evalue, traceback) =>
transitToIdle()

(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> ename) ~
(EVALUE -> evalue) ~
(TRACEBACK -> traceback)

case Interpreter.ExecuteAborted(message) =>
changeState(SessionState.Error())

(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> "Error") ~
(EVALUE -> f"Interpreter died:\n$message") ~
(TRACEBACK -> Seq.empty[String])
}
} catch {
case e: Throwable =>
error("Exception when executing code", e)

case Interpreter.ExecuteError(ename, evalue, traceback) =>
transitToIdle()

(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> ename) ~
(EVALUE -> evalue) ~
(TRACEBACK -> traceback)

case Interpreter.ExecuteAborted(message) =>
changeState(SessionState.Error())

(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> "Error") ~
(EVALUE -> f"Interpreter died:\n$message") ~
(TRACEBACK -> Seq.empty[String])
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> f"Internal Error: ${e.getClass.getName}") ~
(EVALUE -> e.getMessage) ~
(TRACEBACK -> Seq.empty[String])
}
} catch {
case e: Throwable =>
error("Exception when executing code", e)

transitToIdle()

(STATUS -> ERROR) ~
}.getOrElse {
transitToIdle()
(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
(ENAME -> f"Internal Error: ${e.getClass.getName}") ~
(EVALUE -> e.getMessage) ~
(ENAME -> "InterpreterError") ~
(EVALUE -> "Fail to start interpreter") ~
(TRACEBACK -> Seq.empty[String])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ class ScalaClientTest extends FunSuite

after {
if (client != null) {
client.stop(true)
client = null
try {
client.stop(true)
} finally {
client = null
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Random
import scala.util.{Random, Try}

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.google.common.annotations.VisibleForTesting
Expand Down Expand Up @@ -179,11 +179,15 @@ object InteractiveSession extends Logging {

def findSparkRArchive(): Option[String] = {
Option(livyConf.get(RSCConf.Entry.SPARKR_PACKAGE.key())).orElse {
sys.env.get("SPARK_HOME").map { case sparkHome =>
sys.env.get("SPARK_HOME").flatMap { case sparkHome =>
val path = Seq(sparkHome, "R", "lib", "sparkr.zip").mkString(File.separator)
val rArchivesFile = new File(path)
require(rArchivesFile.exists(), "sparkr.zip not found; cannot run sparkr application.")
rArchivesFile.getAbsolutePath()
if (rArchivesFile.exists()) {
Some(rArchivesFile.getAbsolutePath)
} else {
warn("sparkr.zip not found; cannot start R interpreter.")
None
}
}
}
}
Expand Down Expand Up @@ -252,17 +256,22 @@ object InteractiveSession extends Logging {
sys.env.get("SPARK_HOME") .map { case sparkHome =>
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")

val py4jFile = Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
.iterator()
.next()
.toFile

require(py4jFile.exists(),
"py4j-*-src.zip not found; cannot run pyspark application in YARN mode.")
Seq(pyArchivesFile.getAbsolutePath, py4jFile.getAbsolutePath)
val py4jFile = Try {
Files.newDirectoryStream(Paths.get(pyLibPath), "py4j-*-src.zip")
.iterator()
.next()
.toFile
}.toOption

if (!pyArchivesFile.exists()) {
warn("pyspark.zip not found; cannot start pyspark interpreter.")
Seq.empty
} else if (py4jFile.isEmpty || !py4jFile.get.exists()) {
warn("py4j-*-src.zip not found; can start pyspark interpreter.")
Seq.empty
} else {
Seq(pyArchivesFile.getAbsolutePath, py4jFile.get.getAbsolutePath)
}
}.getOrElse(Seq())
}
}
Expand Down Expand Up @@ -297,11 +306,15 @@ object InteractiveSession extends Logging {
}

val pySparkFiles = if (!LivyConf.TEST_MODE) {
builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
findPySparkArchives()
} else {
Nil
}

if (pySparkFiles.nonEmpty) {
builderProperties.put(SPARK_YARN_IS_PYTHON, "true")
}

mergeConfList(pySparkFiles, LivyConf.SPARK_PY_FILES)

val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
Expand Down

0 comments on commit 1bbefe6

Please sign in to comment.