From 8b92397d55c55615c7110cc7e2602d0106f6eac7 Mon Sep 17 00:00:00 2001 From: runzhiwang Date: Tue, 27 Aug 2019 11:56:00 +0800 Subject: [PATCH] [LIVY-620][LIVY-641] Fix travis failed on test: should end with status dead when batch session exits with no 0 return code ## What changes were proposed in this pull request? Fix travis failed on test: should end with status dead when batch session exits with no 0 return code 1. Travis failed because of thread in SparkProcApp.scala and thead in BatchSession.scala change SessionState to different value when stopSession in test. 2. The changes of BatchSession.scala is to revert the commit of https://github.com/apache/incubator-livy/commit/01da43dba07aee1e4d13a2a19f233a38546ddec0. 3. The changes of SparkYarnApp.scala and BatchSessionSpec.scala are the new changes ## How was this patch tested? 1. Existed UT and IT. 2. Create Batch Session In Yarn and kill SparkSubmit, then check the SessionState. Author: runzhiwang Closes #214 from runzhiwang/LIVY-641-SESSION-STATUS. --- .../org/apache/livy/server/batch/BatchSession.scala | 9 --------- .../scala/org/apache/livy/utils/SparkYarnApp.scala | 10 ++++++++++ .../apache/livy/server/batch/BatchSessionSpec.scala | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 2a55c0493..c94fc04a2 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -101,7 +101,6 @@ object BatchSession extends Logging { case 0 => case exitCode => warn(s"spark-submit exited with code $exitCode") - s.stateChanged(SparkApp.State.FAILED) } } finally { childProcesses.decrementAndGet() @@ -183,14 +182,6 @@ class BatchSession( override def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = { synchronized { debug(s"$this state changed from $oldState to $newState") - if (!_state.isInstanceOf[FinishedSessionState]) { - stateChanged(newState) - } - } - } - - private def stateChanged(newState: SparkApp.State): Unit = { - synchronized { newState match { case SparkApp.State.RUNNING => _state = SessionState.Running diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala index d25579627..06d00a049 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala @@ -122,6 +122,7 @@ class SparkYarnApp private[utils] ( with Logging { import SparkYarnApp._ + private var killed = false private val appIdPromise: Promise[ApplicationId] = Promise() private[utils] var state: SparkApp.State = SparkApp.State.STARTING private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] @@ -132,6 +133,7 @@ class SparkYarnApp private[utils] ( ("\nYARN Diagnostics: " +: yarnDiagnostics) override def kill(): Unit = synchronized { + killed = true if (isRunning) { try { val timeout = SparkYarnApp.getYarnTagToAppIdTimeout(livyConf) @@ -265,6 +267,14 @@ class SparkYarnApp private[utils] ( appReport.getYarnApplicationState, appReport.getFinalApplicationStatus)) + if (process.isDefined && !process.get.isAlive && process.get.exitValue() != 0) { + if (killed) { + changeState(SparkApp.State.KILLED) + } else { + changeState(SparkApp.State.FAILED) + } + } + val latestAppInfo = { val attempt = yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId) diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 20b8a817f..bc9ddc4d3 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -119,7 +119,7 @@ class BatchSessionSpec batch.appInfo shouldEqual expectedAppInfo } - it("should end with status dead when batch session exits with no 0 return code") { + it("should end with status killed when batch session was stopped") { val req = new CreateBatchRequest() req.file = runForeverScript.toString req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) @@ -133,7 +133,7 @@ class BatchSessionSpec Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS)) (batch.state match { - case SessionState.Dead(_) => true + case SessionState.Killed(_) => true case _ => false }) should be (true) }