Skip to content

Commit

Permalink
[LIVY-586] Fix batch state from starting to dead when startup fail
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Fix batch state from starting to dead when startup fail

## How was this patch tested?

Create batch session with error parameter, the batch state will turn from starting to dead.

Author: runzhiwang <[email protected]>

Closes apache#215 from runzhiwang/LIVY-586.
  • Loading branch information
runzhiwang authored and jerryshao committed Aug 30, 2019
1 parent a90f4fa commit 4521ef9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
10 changes: 9 additions & 1 deletion server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ class SparkYarnApp private[utils] (
}
}

private def isProcessErrExit(): Boolean = {
process.isDefined && !process.get.isAlive && process.get.exitValue() != 0
}

private def changeState(newState: SparkApp.State.Value): Unit = {
if (state != newState) {
listener.foreach(_.stateChanged(state, newState))
Expand All @@ -171,6 +175,10 @@ class SparkYarnApp private[utils] (
appTag: String,
pollInterval: Duration,
deadline: Deadline): ApplicationId = {
if (isProcessErrExit()) {
throw new IllegalStateException("spark-submit start failed")
}

val appTagLowerCase = appTag.toLowerCase()

// FIXME Should not loop thru all YARN applications but YarnClient doesn't offer an API.
Expand Down Expand Up @@ -268,7 +276,7 @@ class SparkYarnApp private[utils] (
appReport.getYarnApplicationState,
appReport.getFinalApplicationStatus))

if (process.isDefined && !process.get.isAlive && process.get.exitValue() != 0) {
if (isProcessErrExit()) {
if (killed) {
changeState(SparkApp.State.KILLED)
} else {
Expand Down
24 changes: 24 additions & 0 deletions server/src/test/scala/org/apache/livy/utils/SparkYarnAppSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,30 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
}
}

it("should end with state failed when spark submit start failed") {
Clock.withSleepMethod(mockSleep) {
val livyConf = new LivyConf()
val mockSparkSubmit = mock[LineBufferedProcess]
when(mockSparkSubmit.isAlive).thenReturn(false)
when(mockSparkSubmit.exitValue).thenReturn(-1)

val app = new SparkYarnApp(
appTag,
None,
Some(mockSparkSubmit),
None,
livyConf)

cleanupThread(app.yarnAppMonitorThread) {
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")
assert(app.state == SparkApp.State.FAILED,
"SparkYarnApp should end with state failed when spark submit start failed")
}
}
}

it("should map YARN state to SparkApp.State correctly") {
val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf)
cleanupThread(app.yarnAppMonitorThread) {
Expand Down

0 comments on commit 4521ef9

Please sign in to comment.