Skip to content

Commit

Permalink
LIVY-208. Exposing YARN driver log url and Spark UI url for YARN sess…
Browse files Browse the repository at this point in the history
…ions.

Only work in yarn-cluster mode.
They are exposed as:
- appInfo.driverLogUrl
- appInfo.sparkUiUrl

Closes  apache#189
  • Loading branch information
alex-the-man authored Sep 20, 2016
1 parent 9d84905 commit 35dee37
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,23 @@ public static class SessionInfo implements ClientMessage {
public final String proxyUser;
public final String state;
public final String kind;
public final Map<String, String> appInfo;
public final List<String> log;

public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
String kind, List<String> log) {
String kind, Map<String, String> appInfo, List<String> log) {
this.id = id;
this.appId = appId;
this.owner = owner;
this.proxyUser = proxyUser;
this.state = state;
this.kind = kind;
this.appInfo = appInfo;
this.log = log;
}

private SessionInfo() {
this(-1, null, null, null, null, null, null);
this(-1, null, null, null, null, null, null, null);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import com.cloudera.livy.server.WebServer
import com.cloudera.livy.server.interactive.{InteractiveSession, InteractiveSessionServlet}
import com.cloudera.livy.sessions.{SessionState, Spark}
import com.cloudera.livy.test.jobs.Echo
import com.cloudera.livy.utils.AppInfo

/**
* The test for the HTTP client is written in Scala so we can reuse the code in the livy-server
Expand Down Expand Up @@ -270,6 +271,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
val id = sessionManager.nextId()
when(session.id).thenReturn(id)
when(session.appId).thenReturn(None)
when(session.appInfo).thenReturn(AppInfo())
when(session.state).thenReturn(SessionState.Idle())
when(session.proxyUser).thenReturn(None)
when(session.kind).thenReturn(Spark())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import com.cloudera.livy.test.apps._
import com.cloudera.livy.test.framework.BaseIntegrationTestSuite

class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
import com.cloudera.livy.utils.AppInfo._
implicit val patienceConfig = PatienceConfig(timeout = 2 minutes, interval = 1 second)

private var testLibPath: String = _
Expand All @@ -56,6 +57,10 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
dumpLogOnFailure(result.id) {
assert(result.state === SessionState.Success().toString())
assert(cluster.fs.isDirectory(new Path(output)))
result.appInfo should contain key DRIVER_LOG_URL_NAME
result.appInfo should contain key SPARK_UI_URL_NAME
result.appInfo.get(DRIVER_LOG_URL_NAME) should include ("containerlogs")
result.appInfo.get(SPARK_UI_URL_NAME) should startWith ("http")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,21 @@ class InteractiveIT extends BaseIntegrationTestSuite with BeforeAndAfter {
matchError("throw new IllegalStateException()",
evalue = ".*java\\.lang\\.IllegalStateException.*")

// Make sure appInfo is reported correctly.
val result = livyClient.getSessionInfo(sessionId)
result should contain key ("appInfo")
val appInfo = result("appInfo").asInstanceOf[Map[String, String]]
appInfo should contain key ("driverLogUrl")
appInfo should contain key ("sparkUiUrl")
appInfo("driverLogUrl") should include ("containerlogs")
appInfo("sparkUiUrl") should startWith ("http")

// Stop session and verify the YARN app state is finished.
// This is important because if YARN app state is killed, Spark history is not archived.
val appId = getAppId(sessionId)
livyClient.stopSession(sessionId)
val appReport = cluster.yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
assert(appReport.getYarnApplicationState() == YarnApplicationState.FINISHED)
appReport.getYarnApplicationState() shouldEqual YarnApplicationState.FINISHED
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ import scala.util.Random

import com.cloudera.livy.LivyConf
import com.cloudera.livy.sessions.{Session, SessionState}
import com.cloudera.livy.utils.{SparkApp, SparkAppListener, SparkProcessBuilder}
import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}

class BatchSession(
id: Int,
owner: String,
override val proxyUser: Option[String],
livyConf: LivyConf,
request: CreateBatchRequest)
extends Session(id, owner, livyConf) with SparkAppListener {
request: CreateBatchRequest,
mockApp: Option[SparkApp] = None) // For unit test.
extends Session(id, owner, livyConf) with SparkAppListener {

private val app = {
private val app = mockApp.getOrElse {
val uniqueAppTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}"

val conf = SparkApp.prepareSparkConf(uniqueAppTag, livyConf,
Expand Down Expand Up @@ -92,4 +93,6 @@ class BatchSession(
}
}
}

override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ import javax.servlet.http.HttpServletRequest

import com.cloudera.livy.LivyConf
import com.cloudera.livy.server.SessionServlet
import com.cloudera.livy.utils.AppInfo

case class BatchSessionView(id: Long, state: String, appId: Option[String], log: Seq[String])
case class BatchSessionView(
id: Long,
state: String,
appId: Option[String],
appInfo: AppInfo,
log: Seq[String])

class BatchSessionServlet(livyConf: LivyConf)
extends SessionServlet[BatchSession](livyConf)
Expand All @@ -35,7 +41,9 @@ class BatchSessionServlet(livyConf: LivyConf)
new BatchSession(sessionManager.nextId(), remoteUser(req), proxyUser, livyConf, createRequest)
}

override protected def clientSessionView(session: BatchSession, req: HttpServletRequest): Any = {
override protected[batch] def clientSessionView(
session: BatchSession,
req: HttpServletRequest): Any = {
val logs =
if (hasAccess(session.owner, req)) {
val lines = session.logLines()
Expand All @@ -48,7 +56,7 @@ class BatchSessionServlet(livyConf: LivyConf)
} else {
Nil
}
BatchSessionView(session.id, session.state.toString, session.appId, logs)
BatchSessionView(session.id, session.state.toString, session.appId, session.appInfo, logs)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.cloudera.livy._
import com.cloudera.livy.client.common.HttpMessages._
import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf}
import com.cloudera.livy.sessions._
import com.cloudera.livy.utils.{SparkApp, SparkAppListener}
import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkAppListener}

object InteractiveSession {
val LivyReplJars = "livy.repl.jars"
Expand All @@ -52,7 +52,8 @@ class InteractiveSession(
owner: String,
override val proxyUser: Option[String],
livyConf: LivyConf,
request: CreateInteractiveRequest)
request: CreateInteractiveRequest,
mockApp: Option[SparkApp] = None) // For unit test.
extends Session(id, owner, livyConf)
with SparkAppListener {

Expand Down Expand Up @@ -149,14 +150,16 @@ class InteractiveSession(
.setURI(new URI("rsc:/"))
val client = builder.build().asInstanceOf[RSCClient]

val app = if (livyConf.isRunningOnYarn()) {
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
Option(SparkApp.create(uniqueAppTag, None, livyConf, Some(this)))
} else {
// When Livy is running with other cluster manager, SparkApp doesn't provide any additional
// benefit over controlling RSCDriver using RSCClient. Don't use it.
None
val app = mockApp.orElse {
if (livyConf.isRunningOnYarn()) {
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
Option(SparkApp.create(uniqueAppTag, None, livyConf, Some(this)))
} else {
// When Livy is running with other cluster manager, SparkApp doesn't provide any additional
// benefit over controlling RSCDriver using RSCClient. Don't use it.
None
}
}
(client, app)
}
Expand Down Expand Up @@ -435,4 +438,6 @@ class InteractiveSession(
}
}
}

override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class InteractiveSessionServlet(livyConf: LivyConf)
createRequest)
}

override protected def clientSessionView(
override protected[interactive] def clientSessionView(
session: InteractiveSession,
req: HttpServletRequest): Any = {
val logs =
Expand All @@ -72,7 +72,7 @@ class InteractiveSessionServlet(livyConf: LivyConf)
}

new SessionInfo(session.id, session.appId.orNull, session.owner, session.proxyUser.orNull,
session.state.toString, session.kind.toString, logs.asJava)
session.state.toString, session.kind.toString, session.appInfo.asJavaMap, logs.asJava)
}

private def statementView(statement: Statement): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation

import com.cloudera.livy.{LivyConf, Logging, Utils}
import com.cloudera.livy.utils.AppInfo

object Session {

Expand All @@ -57,6 +58,8 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) e

def appId: Option[String] = _appId

var appInfo: AppInfo = AppInfo()

def lastActivity: Long = state match {
case SessionState.Error(time) => time
case SessionState.Dead(time) => time
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,31 @@

package com.cloudera.livy.utils

import scala.collection.JavaConverters._

import com.cloudera.livy.LivyConf
import com.cloudera.livy.util.LineBufferedProcess

object AppInfo {
val DRIVER_LOG_URL_NAME = "driverLogUrl"
val SPARK_UI_URL_NAME = "sparkUiUrl"
}

case class AppInfo(var driverLogUrl: Option[String] = None, var sparkUiUrl: Option[String] = None) {
import AppInfo._
def asJavaMap: java.util.Map[String, String] =
Map(DRIVER_LOG_URL_NAME -> driverLogUrl.orNull, SPARK_UI_URL_NAME -> sparkUiUrl.orNull).asJava
}

trait SparkAppListener {
/** Fired when appId is known, even during recovery. */
def appIdKnown(appId: String): Unit = {}

/** Fired when the app state in the cluster changes. */
def stateChanged(oldState: SparkApp.State, newState: SparkApp.State): Unit = {}

/** Fired when the app info is changed. */
def infoChanged(appInfo: AppInfo): Unit = {}
}

/**
Expand Down
49 changes: 39 additions & 10 deletions server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try

import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, FinalApplicationStatus, YarnApplicationState}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException

import com.cloudera.livy.{LivyConf, Logging, Utils}
import com.cloudera.livy.util.LineBufferedProcess
Expand Down Expand Up @@ -78,7 +80,7 @@ class SparkYarnApp private[utils] (
import SparkYarnApp._

private val appIdPromise: Promise[ApplicationId] = Promise()
private var state: SparkApp.State = SparkApp.State.STARTING
private[utils] var state: SparkApp.State = SparkApp.State.STARTING
private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]

override def log(): IndexedSeq[String] =
Expand Down Expand Up @@ -209,16 +211,43 @@ class SparkYarnApp private[utils] (
listener.foreach(_.appIdKnown(appId.toString))

val pollInterval = SparkYarnApp.getYarnPollInterval(livyConf)
var appInfo = AppInfo()
while (isRunning) {
Clock.sleep(pollInterval.toMillis)

// Refresh application state
val appReport = yarnClient.getApplicationReport(appId)
yarnDiagnostics = getYarnDiagnostics(appReport)
changeState(mapYarnState(
appReport.getApplicationId,
appReport.getYarnApplicationState,
appReport.getFinalApplicationStatus))
try {
Clock.sleep(pollInterval.toMillis)

// Refresh application state
val appReport = yarnClient.getApplicationReport(appId)
yarnDiagnostics = getYarnDiagnostics(appReport)
changeState(mapYarnState(
appReport.getApplicationId,
appReport.getYarnApplicationState,
appReport.getFinalApplicationStatus))

val latestAppInfo = {
val attempt =
yarnClient.getApplicationAttemptReport(appReport.getCurrentApplicationAttemptId)
val driverLogUrl =
Try(yarnClient.getContainerReport(attempt.getAMContainerId).getLogUrl)
.toOption
AppInfo(driverLogUrl, Option(appReport.getTrackingUrl))
}

if (appInfo != latestAppInfo) {
listener.foreach(_.infoChanged(latestAppInfo))
appInfo = latestAppInfo
}
} catch {
// This exception might be thrown during app is starting up. It's transient.
case e: ApplicationAttemptNotFoundException =>
// Workaround YARN-4411: No enum constant FINAL_SAVING from getApplicationAttemptReport()
case e: IllegalArgumentException =>
if (e.getMessage.contains("FINAL_SAVING")) {
debug("Encountered YARN-4411.")
} else {
throw e
}
}
}

debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ package com.cloudera.livy.server.batch
import java.io.FileWriter
import java.nio.file.{Files, Path}
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse._

import scala.concurrent.duration.Duration

import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar.mock

import com.cloudera.livy.Utils
import com.cloudera.livy.server.BaseSessionServletSpec
import com.cloudera.livy.sessions.SessionState
import com.cloudera.livy.utils.AppInfo

class BatchServletSpec extends BaseSessionServletSpec[BatchSession] {

Expand Down Expand Up @@ -107,6 +112,31 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession] {
jpost[Map[String, Any]]("/", createRequest, expectedStatus = SC_BAD_REQUEST) { _ => }
}

it("should show session properties") {
val id = 0
val state = SessionState.Running()
val appId = "appid"
val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
val log = IndexedSeq[String]("log1", "log2")

val session = mock[BatchSession]
when(session.id).thenReturn(id)
when(session.state).thenReturn(state)
when(session.appId).thenReturn(Some(appId))
when(session.appInfo).thenReturn(appInfo)
when(session.logLines()).thenReturn(log)

val req = mock[HttpServletRequest]

val view = servlet.asInstanceOf[BatchSessionServlet].clientSessionView(session, req)
.asInstanceOf[BatchSessionView]

view.id shouldEqual id
view.state shouldEqual state.toString
view.appId shouldEqual Some(appId)
view.appInfo shouldEqual appInfo
view.log shouldEqual log
}
}

}
Loading

0 comments on commit 35dee37

Please sign in to comment.