Skip to content

Commit

Permalink
[LIVY-41] Let users access sessions by session name
Browse files Browse the repository at this point in the history
This commit  enables Livy users to access sessions either by names or by auto-generated sessiond id's.

It also prevents users from creating sessions that have the same name.

This commit keeps API change  minimal. These are places that API change
is needed:
- `Session` and its sub-classes adds a new field, `name`.
- `RecoveryMetadata` and its sub-classes adds a new field, `name`.
- `SessionManager` adds a new method `getSession(name: String)` which looks sessions up by name.

Task-url: https://issues.apache.org/jira/browse/LIVY-41

Author: Fathi Salmi, Meisam(mfathisalmi) <[email protected]>
Author: Meisam Fathi <[email protected]>
Author: Fathi Salmi, Meisam <[email protected]>
Author: Fathi, Meisam <[email protected]>

Closes apache#48 from meisam/LIVY-41-rebased.
  • Loading branch information
meisam authored and Marcelo Vanzin committed Feb 5, 2019
1 parent e5489d0 commit def6318
Show file tree
Hide file tree
Showing 23 changed files with 289 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private CreateClientRequest() {
public static class SessionInfo implements ClientMessage {

public final int id;
public final String name;
public final String appId;
public final String owner;
public final String proxyUser;
Expand All @@ -61,9 +62,10 @@ public static class SessionInfo implements ClientMessage {
public final Map<String, String> appInfo;
public final List<String> log;

public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
String kind, Map<String, String> appInfo, List<String> log) {
public SessionInfo(int id, String name, String appId, String owner, String proxyUser,
String state, String kind, Map<String, String> appInfo, List<String> log) {
this.id = id;
this.name = name;
this.appId = appId;
this.owner = owner;
this.proxyUser = proxyUser;
Expand All @@ -74,7 +76,7 @@ public SessionInfo(int id, String appId, String owner, String proxyUser, String
}

private SessionInfo() {
this(-1, null, null, null, null, null, null, null);
this(-1, null, null, null, null, null, null, null, null);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
val session = mock(classOf[InteractiveSession])
val id = sessionManager.nextId()
when(session.id).thenReturn(id)
when(session.name).thenReturn(None)
when(session.appId).thenReturn(None)
when(session.appInfo).thenReturn(AppInfo())
when(session.state).thenReturn(SessionState.Idle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
}

def startBatch(
name: Option[String],
file: String,
className: Option[String],
args: List[String],
sparkConf: Map[String, String]): BatchSession = {
val r = new CreateBatchRequest()
r.file = file
r.name = name
r.className = className
r.args = args
r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf
Expand All @@ -264,12 +266,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
}

def startSession(
name: Option[String],
kind: Kind,
sparkConf: Map[String, String],
heartbeatTimeoutInSecond: Int): InteractiveSession = {
val r = new CreateInteractiveRequest()
r.kind = kind
r.conf = sparkConf
r.name = name
r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond

val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
private def withScript[R]
(scriptPath: String, args: List[String], sparkConf: Map[String, String] = Map.empty)
(f: (LivyRestClient#BatchSession) => R): R = {
val s = livyClient.startBatch(scriptPath, None, args, sparkConf)
val s = livyClient.startBatch(None, scriptPath, None, args, sparkConf)
withSession(s)(f)
}

private def withTestLib[R]
(testClass: Class[_], args: List[String], sparkConf: Map[String, String] = Map.empty)
(f: (LivyRestClient#BatchSession) => R): R = {
val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), args, sparkConf)
val s = livyClient.startBatch(None, testLibPath, Some(testClass.getName()), args, sparkConf)
withSession(s)(f)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
waitForIdle: Boolean = true,
heartbeatTimeoutInSecond: Int = 0)
(f: (LivyRestClient#InteractiveSession) => R): R = {
withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
withSession(livyClient.startSession(None, kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
if (waitForIdle) {
s.verifySessionIdle()
}
Expand Down
13 changes: 10 additions & 3 deletions server/src/main/scala/org/apache/livy/server/SessionServlet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,23 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
private def doWithSession(fn: (S => Any),
allowAll: Boolean,
checkFn: Option[(String, String) => Boolean]): Any = {
val sessionId = params("id").toInt
sessionManager.get(sessionId) match {
val idOrNameParam: String = params("id")
val session = if (idOrNameParam.forall(_.isDigit)) {
val sessionId = idOrNameParam.toInt
sessionManager.get(sessionId)
} else {
val sessionName = idOrNameParam
sessionManager.get(sessionName)
}
session match {
case Some(session) =>
if (allowAll || checkFn.map(_(session.owner, remoteUser(request))).getOrElse(false)) {
fn(session)
} else {
Forbidden()
}
case None =>
NotFound(ResponseMessage(s"Session '$sessionId' not found."))
NotFound(ResponseMessage(s"Session '$idOrNameParam' not found."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessB
@JsonIgnoreProperties(ignoreUnknown = true)
case class BatchRecoveryMetadata(
id: Int,
name: Option[String],
appId: Option[String],
appTag: String,
owner: String,
Expand All @@ -53,6 +54,7 @@ object BatchSession extends Logging {

def create(
id: Int,
name: Option[String],
request: CreateBatchRequest,
livyConf: LivyConf,
accessManager: AccessManager,
Expand Down Expand Up @@ -110,6 +112,7 @@ object BatchSession extends Logging {

new BatchSession(
id,
name,
appTag,
SessionState.Starting,
livyConf,
Expand All @@ -126,6 +129,7 @@ object BatchSession extends Logging {
mockApp: Option[SparkApp] = None): BatchSession = {
new BatchSession(
m.id,
m.name,
m.appTag,
SessionState.Recovering,
livyConf,
Expand All @@ -140,27 +144,33 @@ object BatchSession extends Logging {

class BatchSession(
id: Int,
name: Option[String],
appTag: String,
initialState: SessionState,
livyConf: LivyConf,
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
sparkApp: BatchSession => SparkApp)
extends Session(id, owner, livyConf) with SparkAppListener {
extends Session(id, name, owner, livyConf) with SparkAppListener {
import BatchSession._

protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global

private[this] var _state: SessionState = initialState
private val app = sparkApp(this)

private var app: Option[SparkApp] = None

override def state: SessionState = _state

override def logLines(): IndexedSeq[String] = app.log()
override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(IndexedSeq.empty[String])

override def start(): Unit = {
app = Option(sparkApp(this))
}

override def stopSession(): Unit = {
app.kill()
app.foreach(_.kill())
}

override def appIdKnown(appId: String): Unit = {
Expand All @@ -187,5 +197,5 @@ class BatchSession(
override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }

override def recoveryMetadata: RecoveryMetadata =
BatchRecoveryMetadata(id, appId, appTag, owner, proxyUser)
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.livy.utils.AppInfo

case class BatchSessionView(
id: Long,
name: Option[String],
state: String,
appId: Option[String],
appInfo: AppInfo,
Expand All @@ -42,8 +43,11 @@ class BatchSessionServlet(

override protected def createSession(req: HttpServletRequest): BatchSession = {
val createRequest = bodyAs[CreateBatchRequest](req)
val sessionId = sessionManager.nextId()
val sessionName = createRequest.name
BatchSession.create(
sessionManager.nextId(),
sessionId,
sessionName,
createRequest,
livyConf,
accessManager,
Expand All @@ -66,7 +70,8 @@ class BatchSessionServlet(
} else {
Nil
}
BatchSessionView(session.id, session.state.toString, session.appId, session.appInfo, logs)
BatchSessionView(session.id, session.name, session.state.toString, session.appId,
session.appInfo, logs)
}

}
Loading

0 comments on commit def6318

Please sign in to comment.