Skip to content

Commit

Permalink
[LIVY-620][LIVY-641] Fix travis failed on test: should end with statu…
Browse files Browse the repository at this point in the history
…s dead when batch session exits with no 0 return code

## What changes were proposed in this pull request?

Fix travis failed on test: should end with status dead when batch session exits with no 0 return code

1. Travis failed because of thread in SparkProcApp.scala and thead in BatchSession.scala change SessionState to different value when stopSession in test.

2. The changes of BatchSession.scala is to revert the commit of apache@01da43d.

3. The changes of SparkYarnApp.scala and BatchSessionSpec.scala are the new changes

## How was this patch tested?

1. Existed UT and IT.
2. Create Batch Session In Yarn and kill SparkSubmit, then check the SessionState.

Author: runzhiwang <[email protected]>

Closes apache#214 from runzhiwang/LIVY-641-SESSION-STATUS.
  • Loading branch information
runzhiwang authored and jerryshao committed Aug 27, 2019
1 parent 256702e commit 8b92397
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ object BatchSession extends Logging {
case 0 =>
case exitCode =>
warn(s"spark-submit exited with code $exitCode")
s.stateChanged(SparkApp.State.FAILED)
}
} finally {
childProcesses.decrementAndGet()
Expand Down Expand Up @@ -183,14 +182,6 @@ class BatchSession(
override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {
synchronized {
debug(s"$this state changed from $oldState to $newState")
if (!_state.isInstanceOf[FinishedSessionState]) {
stateChanged(newState)
}
}
}

private def stateChanged(newState: SparkApp.State): Unit = {
synchronized {
newState match {
case SparkApp.State.RUNNING =>
_state = SessionState.Running
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class SparkYarnApp private[utils] (
with Logging {
import SparkYarnApp._

private var killed = false
private val appIdPromise: Promise[ApplicationId] = Promise()
private[utils] var state: SparkApp.State = SparkApp.State.STARTING
private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
Expand All @@ -132,6 +133,7 @@ class SparkYarnApp private[utils] (
("\nYARN Diagnostics: " +: yarnDiagnostics)

override def kill(): Unit = synchronized {
killed = true
if (isRunning) {
try {
val timeout = SparkYarnApp.getYarnTagToAppIdTimeout(livyConf)
Expand Down Expand Up @@ -265,6 +267,14 @@ class SparkYarnApp private[utils] (
appReport.getYarnApplicationState,
appReport.getFinalApplicationStatus))

if (process.isDefined && !process.get.isAlive && process.get.exitValue() != 0) {
if (killed) {
changeState(SparkApp.State.KILLED)
} else {
changeState(SparkApp.State.FAILED)
}
}

val latestAppInfo = {
val attempt =
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class BatchSessionSpec
batch.appInfo shouldEqual expectedAppInfo
}

it("should end with status dead when batch session exits with no 0 return code") {
it("should end with status killed when batch session was stopped") {
val req = new CreateBatchRequest()
req.file = runForeverScript.toString
req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))
Expand All @@ -133,7 +133,7 @@ class BatchSessionSpec

Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS))
(batch.state match {
case SessionState.Dead(_) => true
case SessionState.Killed(_) => true
case _ => false
}) should be (true)
}
Expand Down

0 comments on commit 8b92397

Please sign in to comment.