Skip to content

Commit

Permalink
LIVY-186. Better YARN integration for Interactive Session.
Browse files Browse the repository at this point in the history
- Transit to dead state when the YARN application dies.
- If RSCClient.stop() fails to stop RSCDriver, kill the YARN application using YARN API.
- Expose YARN app id.
- Show YARN diagnosis in session log.
- Fixed flaky SparkYarnAppSpec.should kill yarn app.
- Increased some integration test timeout when running on Travis.

Closes apache#179
  • Loading branch information
alex-the-man committed Sep 7, 2016
1 parent 4a681a2 commit 2d6e026
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
val session = mock(classOf[InteractiveSession])
val id = sessionManager.nextId()
when(session.id).thenReturn(id)
when(session.appId).thenReturn(None)
when(session.state).thenReturn(SessionState.Idle())
when(session.proxyUser).thenReturn(None)
when(session.kind).thenReturn(Spark())
Expand Down
18 changes: 9 additions & 9 deletions conf/livy.conf
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
# Use this keystore for the SSL certificate and key.
## livy.keystore =
# livy.keystore =

# Specify the keystore password.
## livy.keystore.password =
# livy.keystore.password =

# What host address to start the server on. By default, Livy will bind to all network interfaces.
## livy.server.host = 0.0.0.0
# livy.server.host = 0.0.0.0

# What port to start the server on.
## livy.server.port = 8998
# livy.server.port = 8998

# What spark master Livy sessions should use.
## livy.spark.master = local
# livy.spark.master = local

# What spark deploy mode Livy sessions should use.
## livy.spark.deployMode =
# livy.spark.deployMode =

# Time in milliseconds on how long Livy will wait before timing out an idle session.
## livy.server.session.timeout = 1h
# livy.server.session.timeout = 1h

# If livy should impersonate the requesting users when creating a new session.
## livy.impersonation.enabled = true
# livy.impersonation.enabled = true

# Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation
# directory every time a session is started. By caching these files in HDFS, for example, startup
Expand Down Expand Up @@ -54,7 +54,7 @@
# livy.repl.enableHiveContext =

# If Livy can't find the yarn app within this time, consider it lost.
livy.server.yarn.app-lookup-timeout = 30s
# livy.server.yarn.app-lookup-timeout = 30s

# How often Livy polls YARN to refresh YARN app state.
# livy.server.yarn.poll-interval = 1s
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.ning.http.client.AsyncHttpClient
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest._
import org.scalatest.concurrent.Eventually._

Expand All @@ -44,6 +45,8 @@ object BaseIntegrationTestSuite {
// Duplicated from repl.Session. Should really be in a more shared location.
val OK = "ok"
val ERROR = "error"

def isRunningOnTravis: Boolean = sys.env.contains("TRAVIS")
}

abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with BeforeAndAfterAll {
Expand All @@ -64,8 +67,20 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with Befo
.find(new File(_).getName().startsWith("livy-test-lib-"))
.getOrElse(throw new Exception(s"Cannot find test lib in ${sys.props("java.class.path")}"))

protected def getYarnLog(appId: String): String = {
require(appId != null, "appId shouldn't be null")

val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
assert(appReport != null, "appReport shouldn't be null")

appReport.getDiagnostics()
}

protected def waitTillSessionIdle(sessionId: Int): Unit = {
eventually(timeout(2 minutes), interval(100 millis)) {
// Travis uses very slow VM. It needs a longer timeout.
// Keeping the original timeout to avoid slowing down local development.
val idleTimeout = if (isRunningOnTravis) 5.minutes else 2.minutes
eventually(timeout(idleTimeout), interval(1 second)) {
val curState = livyClient.getSessionStatus(sessionId)
assert(curState === SessionState.Idle().toString)
}
Expand Down Expand Up @@ -141,16 +156,20 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with Befo
}
}

def getSessionStatus(sessionId: Int): String = {
def getSessionInfo(sessionId: Int): Map[String, Any] = {
val rep = httpClient.prepareGet(s"$livyEndpoint/sessions/$sessionId").execute().get()
withClue(rep.getResponseBody) {
rep.getStatusCode should equal(HttpServletResponse.SC_OK)

val sessionState =
mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
}
}

sessionState should contain key ("state")
def getSessionStatus(sessionId: Int): String = {
val sessionState = getSessionInfo(sessionId)

withClue(sessionState) {
sessionState should contain key ("state")
sessionState("state").asInstanceOf[String]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,14 @@ object MiniLivyMain extends MiniClusterBase {
var livyUrl: Option[String] = None

def start(config: MiniClusterConfig, configPath: String): Unit = {
val livyConf = Map(
var livyConf = Map(
LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster")

if (BaseIntegrationTestSuite.isRunningOnTravis) {
livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
}

saveProperties(livyConf, new File(configPath + "/livy.conf"))

val server = new LivyServer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,20 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
case e: Throwable =>
ignoringAll {
info(s"Session log: ${getBatchSessionInfo(batchId).log}")
info(s"YARN log: ${getYarnLog(batchId)}")
info(s"YARN log: ${getBatchYarnLog(batchId)}")
}
throw e
} finally {
ignoringAll { deleteBatch(batchId) }
}
}

private def getYarnLog(batchId: Int): String = {
private def getBatchYarnLog(batchId: Int): String = {
allCatch.opt {
val appId = getBatchSessionInfo(batchId).appId
assert(appId != null, "appId shouldn't be null")

val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
assert(appReport != null, "appReport shouldn't be null")

appReport.getDiagnostics()
getYarnLog(appId)
}.getOrElse("")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import java.util.regex.Pattern

import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.Exception.allCatch

import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._

import com.cloudera.livy.rsc.RSCConf
import com.cloudera.livy.sessions._
import com.cloudera.livy.test.framework.{BaseIntegrationTestSuite, StatementError}

Expand All @@ -41,14 +45,23 @@ class InteractiveIT extends BaseIntegrationTestSuite with BeforeAndAfter {
test("basic interactive session") {
sessionId = livyClient.startSession(Spark())

matchResult("1+1", "res0: Int = 2")
matchResult("sqlContext", startsWith("res1: org.apache.spark.sql.hive.HiveContext"))
matchResult("val sql = new org.apache.spark.sql.SQLContext(sc)",
startsWith("sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext"))

matchError("abcde", evalue = ".*?:[0-9]+: error: not found: value abcde.*")
matchError("throw new IllegalStateException()",
evalue = ".*java\\.lang\\.IllegalStateException.*")
dumpLogOnFailure(sessionId) {
matchResult("1+1", "res0: Int = 2")
matchResult("sqlContext", startsWith("res1: org.apache.spark.sql.hive.HiveContext"))
matchResult("val sql = new org.apache.spark.sql.SQLContext(sc)",
startsWith("sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext"))

matchError("abcde", evalue = ".*?:[0-9]+: error: not found: value abcde.*")
matchError("throw new IllegalStateException()",
evalue = ".*java\\.lang\\.IllegalStateException.*")

// Stop session and verify the YARN app state is finished.
// This is important because if YARN app state is killed, Spark history is not archived.
val appId = getAppId(sessionId)
livyClient.stopSession(sessionId)
val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
assert(appReport.getYarnApplicationState() == YarnApplicationState.FINISHED)
}
}

pytest("pyspark interactive session") {
Expand Down Expand Up @@ -85,24 +98,29 @@ class InteractiveIT extends BaseIntegrationTestSuite with BeforeAndAfter {

test("application kills session") {
sessionId = livyClient.startSession(Spark())
waitTillSessionIdle(sessionId)
livyClient.runStatement(sessionId, "System.exit(0)")

val expected = Set(SessionState.Idle().toString, SessionState.Error().toString)
eventually(timeout(30 seconds), interval(1 second)) {
val state = livyClient.getSessionStatus(sessionId)
assert(expected.contains(state))
dumpLogOnFailure(sessionId) {
waitTillSessionIdle(sessionId)
livyClient.runStatement(sessionId, "System.exit(0)")

val expected = Set(SessionState.Dead().toString)
eventually(timeout(30 seconds), interval(1 second)) {
val state = livyClient.getSessionStatus(sessionId)
assert(expected.contains(state))
}
}
}

// After the statement has run, it shouldn't be possible to run more commands. Once LIVY-139
// is fixed, this test should be changed to make sure the session state automatically turns
// to "error" or "dead", depending on how it's implemented.
try {
livyClient.runStatement(sessionId, "1+1")
val state = livyClient.getSessionStatus(sessionId)
fail(s"Should have failed to run statement; session state is $state")
} catch {
case e: Exception =>
test("should kill RSCDriver if it doesn't respond to end session") {
val testConfName = s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_END_SESSION.key()}"
sessionId = livyClient.startSession(Spark(), Map(testConfName -> "true"))

dumpLogOnFailure(sessionId) {
waitTillSessionIdle(sessionId)

val appId = getAppId(sessionId)
livyClient.stopSession(sessionId)
val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
assert(appReport.getYarnApplicationState() == YarnApplicationState.KILLED)
}
}

Expand All @@ -128,6 +146,31 @@ class InteractiveIT extends BaseIntegrationTestSuite with BeforeAndAfter {
matchResult("rdd.count()", ".*= 10")
}

private def dumpLogOnFailure[T](sessionId: Int)(f: => T): T = {
try {
f
} catch {
case e: Throwable =>
allCatch {
info(s"Session state: ${livyClient.getSessionInfo(sessionId)}")
info(s"YARN log: ${getSessionYarnLog(sessionId)}")
}
throw e
}
}

private def getAppId(sessionId: Int): String = {
val appId = livyClient.getSessionInfo(sessionId)("appId").asInstanceOf[String]
assert(appId != null, "appId returned null.")
appId
}

private def getSessionYarnLog(sessionId: Int): String = {
allCatch.opt {
getYarnLog(getAppId(sessionId))
}.getOrElse("")
}

private def matchResult(code: String, expected: String): Unit = {
runAndValidateStatement(code) match {
case Left(result) =>
Expand Down
18 changes: 9 additions & 9 deletions rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ public void onSuccess(Rpc rpc) throws Exception {
public void onSuccess(Void unused) {
if (isAlive) {
LOG.warn("Client RPC channel closed unexpectedly.");
stop(false);
try {
stop(false);
} catch (Exception e) { /* stop() itself prints warning. */ }
}
}
});
Expand All @@ -128,7 +130,9 @@ public void onFailure(Throwable error) throws Exception {

private void connectionError(Throwable error) {
LOG.error("Failed to connect to context.", error);
stop(false);
try {
stop(false);
} catch (Exception e) { /* stop() itself prints warning. */ }
}

private <T> io.netty.util.concurrent.Future<T> deferredCall(final Object msg,
Expand Down Expand Up @@ -194,16 +198,12 @@ public synchronized void stop(boolean shutdownContext) {
// since it closes the channel while handling it, we wait for the RPC's channel
// to close instead.
long stopTimeout = conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT);
try {
driverRpc.get().getChannel().closeFuture().get(stopTimeout,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.warn("Error waiting for context to shut down: {} ({}).",
e.getClass().getSimpleName(), e.getMessage());
}
driverRpc.get().getChannel().closeFuture().get(stopTimeout,
TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
LOG.warn("Exception while waiting for end session reply.", e);
Utils.propagate(e);
} finally {
if (driverRpc.isSuccess()) {
try {
Expand Down
4 changes: 3 additions & 1 deletion rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public static enum Entry implements ConfEntry {
RPC_SECRET_RANDOM_BITS("secret.bits", 256),

SASL_MECHANISMS("rpc.sasl.mechanisms", "DIGEST-MD5"),
SASL_QOP("rpc.sasl.qop", null);
SASL_QOP("rpc.sasl.qop", null),

TEST_STUCK_END_SESSION("test.do_not_use.stuck_end_session", false);

private final String key;
private final Object dflt;
Expand Down
8 changes: 6 additions & 2 deletions rsc/src/main/java/com/cloudera/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,12 @@ public void handle(ChannelHandlerContext ctx, CancelJob msg) {
}

public void handle(ChannelHandlerContext ctx, EndSession msg) {
LOG.debug("Shutting down due to EndSession request.");
shutdown();
if (livyConf.getBoolean(TEST_STUCK_END_SESSION)) {
LOG.warn("Ignoring EndSession request because TEST_STUCK_END_SESSION is set.");
} else {
LOG.debug("Shutting down due to EndSession request.");
shutdown();
}
}

public void handle(ChannelHandlerContext ctx, JobRequest<?> msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BatchSession(

val file = resolveURIs(Seq(request.file))(0)
val sparkSubmitProcess = builder.start(Some(file), request.args)
SparkApp.create(uniqueAppTag, sparkSubmitProcess, livyConf, Some(this))
SparkApp.create(uniqueAppTag, Option(sparkSubmitProcess), livyConf, Some(this))
}

protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global
Expand Down
Loading

0 comments on commit 2d6e026

Please sign in to comment.