Skip to content

Commit

Permalink
[LIVY-415][CORE][FOLLOWUP] Fix static object creation issue for Sessi…
Browse files Browse the repository at this point in the history
…onState

## What changes were proposed in this pull request?

In LIVY-415, we changed `SessionState` to static object instead of case class. This is OK for `SessionState`, but for `FinishedSessionState`, it requires `time` field to be set during object creation. Using static object will never reflect the actual `time` when object is created. So here propose to fix it.

CC aa8y please take a review, this is introduced in your changes.

## How was this patch tested?

Existing tests.

Author: jerryshao <[email protected]>

Closes apache#67 from jerryshao/LIVY-415-followup.
  • Loading branch information
jerryshao committed Nov 29, 2017
1 parent cd777e2 commit 1efc801
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 23 deletions.
15 changes: 6 additions & 9 deletions core/src/main/scala/org/apache/livy/sessions/SessionState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ object SessionState {
case "running" => Running
case "busy" => Busy
case "shutting_down" => ShuttingDown
case "error" => Error
case "dead" => Dead
case "success" => Success
case "error" => Error()
case "dead" => Dead()
case "success" => Success()
case _ => throw new IllegalArgumentException(s"Illegal session state: $s")
}

Expand All @@ -57,15 +57,12 @@ object SessionState {

object ShuttingDown extends SessionState("shutting_down", false)

case class Error(override val time: Long) extends
case class Error(override val time: Long = System.nanoTime()) extends
FinishedSessionState("error", true, time)
object Error extends Error(System.nanoTime)

case class Dead(override val time: Long) extends
case class Dead(override val time: Long = System.nanoTime()) extends
FinishedSessionState("dead", false, time)
object Dead extends Dead(System.nanoTime)

case class Success(override val time: Long) extends
case class Success(override val time: Long = System.nanoTime()) extends
FinishedSessionState("success", false, time)
object Success extends Success(System.nanoTime)
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
}

class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
def verifySessionDead(): Unit = verifySessionState(SessionState.Dead)
def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
def verifySessionRunning(): Unit = verifySessionState(SessionState.Running)
def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success)
def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success())
}

class InteractiveSession(id: Int) extends Session(id, INTERACTIVE_TYPE) {
Expand Down Expand Up @@ -226,7 +226,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
.setBody(mapper.writeValueAsString(requestBody))
.execute()

verifySessionState(SessionState.Dead)
verifySessionState(SessionState.Dead())
}

def verifySessionIdle(): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions repl/src/main/scala/org/apache/livy/repl/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class Session(
entries
}(interpreterExecutor)

future.onFailure { case _ => changeState(SessionState.Error) }(interpreterExecutor)
future.onFailure { case _ => changeState(SessionState.Error()) }(interpreterExecutor)
future
}

Expand Down Expand Up @@ -297,7 +297,7 @@ class Session(
(TRACEBACK -> traceback)

case Interpreter.ExecuteAborted(message) =>
changeState(SessionState.Error)
changeState(SessionState.Error())

(STATUS -> ERROR) ~
(EXECUTION_COUNT -> executionCount) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ class BatchSession(
_state = SessionState.Running
info(s"Batch session $id created [appid: ${appId.orNull}, state: ${state.toString}, " +
s"info: ${appInfo.asJavaMap}]")
case SparkApp.State.FINISHED => _state = SessionState.Success
case SparkApp.State.FINISHED => _state = SessionState.Success()
case SparkApp.State.KILLED | SparkApp.State.FAILED =>
_state = SessionState.Dead
_state = SessionState.Dead()
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class InteractiveSession(
}

if (client.isEmpty) {
transition(Dead)
transition(Dead())
val msg = s"Cannot recover interactive session $id because its RSCDriver URI is unknown."
info(msg)
sessionLog = IndexedSeq(msg)
Expand Down Expand Up @@ -435,7 +435,7 @@ class InteractiveSession(
// this callback might be triggered. Check and don't call stop() to avoid nested called
// if the session is already shutting down.
if (serverSideState != SessionState.ShuttingDown) {
transition(SessionState.Error)
transition(SessionState.Error())
stop()
app.foreach { a =>
info(s"Failed to ping RSC driver for session $id. Killing application.")
Expand Down Expand Up @@ -474,7 +474,7 @@ class InteractiveSession(
_.kill()
}
} finally {
transition(SessionState.Dead)
transition(SessionState.Dead())
}
}

Expand Down Expand Up @@ -611,7 +611,7 @@ class InteractiveSession(
debug(s"$this app state changed from $oldState to $newState")
newState match {
case SparkApp.State.FINISHED | SparkApp.State.KILLED | SparkApp.State.FAILED =>
transition(SessionState.Dead)
transition(SessionState.Dead())
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
}

// Stopped session should be gc-ed after retained timeout
for (s <- Seq(SessionState.Error,
SessionState.Success,
SessionState.Dead)) {
for (s <- Seq(SessionState.Error(),
SessionState.Success(),
SessionState.Dead())) {
eventually(timeout(30 seconds), interval(100 millis)) {
changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
}
Expand Down

0 comments on commit 1efc801

Please sign in to comment.