Skip to content

Commit

Permalink
[LIVY-620] Spark batch session always ends with success when configur…
Browse files Browse the repository at this point in the history
…ation is master yarn and deploy-mode client

## What changes were proposed in this pull request?

Batch session should end with state dead when process exits with no 0 return code.
https://issues.apache.org/jira/browse/LIVY-620

## How was this patch tested?

1. Unit Test (included in this PR)
Submit batch session that runs forever, wait 2 seconds, kill that batch session and expect for state dead.

2. Also currently used in production environment.

Author: Gustavo Martin Morcuende <[email protected]>

Closes apache#192 from gumartinm/master.
  • Loading branch information
gumartinm authored and jerryshao committed Aug 15, 2019
1 parent e7f23e0 commit 01da43d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.{Session, SessionState}
import org.apache.livy.sessions.{FinishedSessionState, Session, SessionState}
import org.apache.livy.sessions.Session._
import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}

Expand Down Expand Up @@ -101,6 +101,7 @@ object BatchSession extends Logging {
case 0 =>
case exitCode =>
warn(s"spark-submit exited with code $exitCode")
s.stateChanged(SparkApp.State.FAILED)
}
} finally {
childProcesses.decrementAndGet()
Expand Down Expand Up @@ -182,6 +183,14 @@ class BatchSession(
override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
synchronized {
debug(s"$this state changed from $oldState to $newState")
if (!_state.isInstanceOf[FinishedSessionState]) {
stateChanged(newState)
}
}
}

private def stateChanged(newState: SparkApp.State): Unit = {
synchronized {
newState match {
case SparkApp.State.RUNNING =>
_state = SessionState.Running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.SessionState
import org.apache.livy.utils.{AppInfo, SparkApp}
import org.apache.livy.utils.{AppInfo, Clock, SparkApp}

class BatchSessionSpec
extends FunSpec
Expand All @@ -56,6 +56,23 @@ class BatchSessionSpec
script
}

val runForeverScript: Path = {
val script = Files.createTempFile("livy-test-run-forever-script", ".py")
script.toFile.deleteOnExit()
val writer = new FileWriter(script.toFile)
try {
writer.write(
"""
|import time
|while True:
| time.sleep(1)
""".stripMargin)
} finally {
writer.close()
}
script
}

describe("A Batch process") {
var sessionStore: SessionStore = null

Expand Down Expand Up @@ -102,6 +119,25 @@ class BatchSessionSpec
batch.appInfo shouldEqual expectedAppInfo
}

it("should end with status dead when batch session exits with no 0 return code") {
val req = new CreateBatchRequest()
req.file = runForeverScript.toString
req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))

val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
val accessManager = new AccessManager(conf)
val batch = BatchSession.create(0, None, req, conf, accessManager, null, None, sessionStore)
batch.start()
Clock.sleep(2)
batch.stopSession()

Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS))
(batch.state match {
case SessionState.Dead(_) => true
case _ => false
}) should be (true)
}

def testRecoverSession(name: Option[String]): Unit = {
val conf = new LivyConf()
val req = new CreateBatchRequest()
Expand Down

0 comments on commit 01da43d

Please sign in to comment.