Skip to content

Commit

Permalink
LIVY-165. Added session heartbeat to help notebooks to prevent leakag…
Browse files Browse the repository at this point in the history
…e. (apache#247)

Notebook applications like Jupyter might crash while livy is running. No one will clean up its corresponding Livy session and will be leaked.
Heartbeat is added to address this. To keep a session alive, the notebook application must continously make GET requests to the interactive session. If no GET request is made within the heartbeat interval, livy-server will delete the session regardless to its state (busy, idle).

Heartbeat is per session and is controlled by session property "heartbeatTimeoutInSecond". Its default is 0 and it means heartbeat is disabled. To enable heartbeat, please create the session with non-zero "heartbeatTimeoutInSecond" in the create request.
  • Loading branch information
alex-the-man committed Dec 22, 2016
1 parent 678839c commit 69ac11e
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
new BatchSession(id)
}

def startSession(kind: Kind, sparkConf: Map[String, String]): InteractiveSession = {
def startSession(
kind: Kind,
sparkConf: Map[String, String],
heartbeatTimeoutInSecond: Int): InteractiveSession = {
val r = new CreateInteractiveRequest()
r.kind = kind
r.conf = sparkConf
r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond

val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
new InteractiveSession(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ object MiniLivyMain extends MiniClusterBase {
LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
LivyConf.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
LivyConf.RECOVERY_MODE.key -> "recovery",
LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ class InteractiveIT extends BaseIntegrationTestSuite {
}
}

test("heartbeat should kill expired session") {
// Set it to 2s because verifySessionIdle() is calling GET every second.
val heartbeatTimeout = Duration.create("2s")
withNewSession(Spark(), Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
// If the test reaches here, that means verifySessionIdle() is successfully keeping the
// session alive. Now verify heartbeat is killing expired session.
Thread.sleep(heartbeatTimeout.toMillis * 2)
s.verifySessionDoesNotExist()
}
}

test("recover interactive session") {
withNewSession(Spark()) { s =>
val stmt1 = s.run("1")
Expand Down Expand Up @@ -175,10 +186,13 @@ class InteractiveIT extends BaseIntegrationTestSuite {
}
}

private def withNewSession[R]
(kind: Kind, sparkConf: Map[String, String] = Map.empty, waitForIdle: Boolean = true)
private def withNewSession[R] (
kind: Kind,
sparkConf: Map[String, String] = Map.empty,
waitForIdle: Boolean = true,
heartbeatTimeoutInSecond: Int = 0)
(f: (LivyRestClient#InteractiveSession) => R): R = {
withSession(livyClient.startSession(kind, sparkConf)) { s =>
withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
if (waitForIdle) {
s.verifySessionIdle()
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ object LivyConf {
val AUTH_KERBEROS_KEYTAB = Entry("livy.server.auth.kerberos.keytab", null)
val AUTH_KERBEROS_NAME_RULES = Entry("livy.server.auth.kerberos.name_rules", "DEFAULT")

val HEARTBEAT_WATCHDOG_INTERVAL = Entry("livy.server.heartbeat-watchdog.interval", "1m")

val LAUNCH_KERBEROS_PRINCIPAL =
LivyConf.Entry("livy.server.launch.kerberos.principal", null)
val LAUNCH_KERBEROS_KEYTAB =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package com.cloudera.livy.server.interactive
import com.cloudera.livy.sessions.{Kind, Spark}

class CreateInteractiveRequest {

var kind: Kind = Spark()
var proxyUser: Option[String] = None
var jars: List[String] = List()
Expand All @@ -36,5 +35,5 @@ class CreateInteractiveRequest {
var queue: Option[String] = None
var name: Option[String] = None
var conf: Map[String, String] = Map()

var heartbeatTimeoutInSecond: Int = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import java.io.{File, InputStream}
import java.net.URI
import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Random

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
Expand All @@ -50,6 +52,7 @@ case class InteractiveRecoveryMetadata(
appId: Option[String],
appTag: String,
kind: Kind,
heartbeatTimeoutS: Int,
owner: String,
proxyUser: Option[String],
rscDriverUri: Option[URI],
Expand Down Expand Up @@ -112,6 +115,7 @@ object InteractiveSession extends Logging {
client,
SessionState.Starting(),
request.kind,
request.heartbeatTimeoutInSecond,
livyConf,
owner,
proxyUser,
Expand All @@ -137,6 +141,7 @@ object InteractiveSession extends Logging {
client,
SessionState.Recovering(),
metadata.kind,
metadata.heartbeatTimeoutS,
livyConf,
metadata.owner,
metadata.proxyUser,
Expand Down Expand Up @@ -341,18 +346,24 @@ class InteractiveSession(
client: Option[RSCClient],
initialState: SessionState,
val kind: Kind,
heartbeatTimeoutS: Int,
livyConf: LivyConf,
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, owner, livyConf)
with SessionHeartbeat
with SparkAppListener {

import InteractiveSession._

private var serverSideState: SessionState = initialState

override protected val heartbeatTimeout: FiniteDuration = {
val heartbeatTimeoutInSecond = heartbeatTimeoutS
Duration(heartbeatTimeoutInSecond, TimeUnit.SECONDS)
}
private val operations = mutable.Map[Long, String]()
private val operationCounter = new AtomicLong(0)
private var rscDriverUri: Option[URI] = None
Expand All @@ -361,6 +372,7 @@ class InteractiveSession(

_appId = appIdHint
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
heartbeat()

private val app = mockApp.orElse {
if (livyConf.isRunningOnYarn()) {
Expand Down Expand Up @@ -423,7 +435,8 @@ class InteractiveSession(
override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(sessionLog)

override def recoveryMetadata: RecoveryMetadata =
InteractiveRecoveryMetadata(id, appId, appTag, kind, owner, proxyUser, rscDriverUri)
InteractiveRecoveryMetadata(
id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)

override def state: SessionState = {
if (serverSideState.isInstanceOf[SessionState.Running]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class InteractiveSessionServlet(
sessionStore: SessionStore,
livyConf: LivyConf)
extends SessionServlet(sessionManager, livyConf)
with SessionHeartbeatNotifier[InteractiveSession, InteractiveRecoveryMetadata]
with FileUploadSupport
{

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.livy.server.interactive

import java.util.Date

import scala.concurrent.duration.{Deadline, Duration, FiniteDuration}

import com.cloudera.livy.sessions.Session.RecoveryMetadata
import com.cloudera.livy.LivyConf
import com.cloudera.livy.server.SessionServlet
import com.cloudera.livy.sessions.{Session, SessionManager}

/**
* A session trait to provide heartbeat expiration check.
* Note: Session will not expire if heartbeat() was never called.
*/
trait SessionHeartbeat {
protected val heartbeatTimeout: FiniteDuration

private var _lastHeartbeat: Date = _ // For reporting purpose
private var heartbeatDeadline: Option[Deadline] = None

def heartbeat(): Unit = synchronized {
if (heartbeatTimeout > Duration.Zero) {
heartbeatDeadline = Some(heartbeatTimeout.fromNow)
}

_lastHeartbeat = new Date()
}

def lastHeartbeat: Date = synchronized { _lastHeartbeat }

def heartbeatExpired: Boolean = synchronized { heartbeatDeadline.exists(_.isOverdue()) }
}

/**
* Servlet can mixin this trait to update session's heartbeat
* whenever a /sessions/:id REST call is made. e.g. GET /sessions/:id
* Note: GET /sessions doesn't update heartbeats.
*/
trait SessionHeartbeatNotifier[S <: Session with SessionHeartbeat, R <: RecoveryMetadata]
extends SessionServlet[S, R] {

abstract override protected def withUnprotectedSession(fn: (S => Any)): Any = {
super.withUnprotectedSession { s =>
s.heartbeat()
fn(s)
}
}

abstract override protected def withSession(fn: (S => Any)): Any = {
super.withSession { s =>
s.heartbeat()
fn(s)
}
}
}

/**
* A SessionManager trait.
* It will create a thread that periodically deletes sessions with expired heartbeat.
*/
trait SessionHeartbeatWatchdog[S <: Session with SessionHeartbeat, R <: RecoveryMetadata] {
self: SessionManager[S, R] =>

private val watchdogThread = new Thread(s"HeartbeatWatchdog-${self.getClass.getName}") {
override def run(): Unit = {
val interval = livyConf.getTimeAsMs(LivyConf.HEARTBEAT_WATCHDOG_INTERVAL)
info("Heartbeat watchdog thread started.")
while (true) {
deleteExpiredSessions()
Thread.sleep(interval)
}
}
}

protected def start(): Unit = {
assert(!watchdogThread.isAlive())

watchdogThread.setDaemon(true)
watchdogThread.start()
}

private[interactive] def deleteExpiredSessions(): Unit = {
// Delete takes time. If we use .filter().foreach() here, the time difference between we check
// expiration and the time we delete the session might be huge. To avoid that, check expiration
// inside the foreach block.
sessions.values.foreach { s =>
if (s.heartbeatExpired) {
info(s"Session ${s.id} expired. Last heartbeat is at ${s.lastHeartbeat}.")
try { delete(s) } catch {
case t: Throwable =>
warn(s"Exception was thrown when deleting expired session ${s.id}", t)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.control.NonFatal

import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession}
import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession, SessionHeartbeatWatchdog}
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.Session.RecoveryMetadata

Expand All @@ -56,9 +56,13 @@ class InteractiveSessionManager(
sessionStore,
"interactive",
mockSessions)
with SessionHeartbeatWatchdog[InteractiveSession, InteractiveRecoveryMetadata]
{
start()
}

class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
livyConf: LivyConf,
protected val livyConf: LivyConf,
sessionRecovery: R => S,
sessionStore: SessionStore,
sessionType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m =
InteractiveRecoveryMetadata(78, None, "appTag", Spark(), null, None, Some(URI.create("")))
InteractiveRecoveryMetadata(
78, None, "appTag", Spark(), 0, null, None, Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))

s.state shouldBe a[SessionState.Recovering]
Expand All @@ -210,7 +211,8 @@ class InteractiveSessionSpec extends FunSpec
it("should recover session to dead state if rscDriverUri is unknown") {
val conf = new LivyConf()
val sessionStore = mock[SessionStore]
val m = InteractiveRecoveryMetadata(78, Some("appId"), "appTag", Spark(), null, None, None)
val m = InteractiveRecoveryMetadata(
78, Some("appId"), "appTag", Spark(), 0, null, None, None)
val s = InteractiveSession.recover(m, conf, sessionStore, None)

s.state shouldBe a[SessionState.Dead]
Expand Down
Loading

0 comments on commit 69ac11e

Please sign in to comment.