Skip to content

Commit

Permalink
[livy] Pass around CreateInteractiveRequest objects
Browse files Browse the repository at this point in the history
  • Loading branch information
Erick Tryzelaar committed May 20, 2015
1 parent 07a3e75 commit 68c44bb
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@

package com.cloudera.hue.livy.server.interactive

import com.cloudera.hue.livy.LivyConf
import com.cloudera.hue.livy.sessions.Kind
import com.cloudera.hue.livy.yarn.Client

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future

trait InteractiveSessionFactory {
def createSession(id: Int, kind: Kind, proxyUser: Option[String] = None): Future[InteractiveSession]
def createSession(id: Int, createInteractiveRequest: CreateInteractiveRequest): Future[InteractiveSession]

def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class SessionManager(factory: InteractiveSessionFactory) extends Logging {
sessions.keys
}

def createSession(kind: Kind, proxyUser: Option[String] = None): Future[InteractiveSession] = {
def createSession(createInteractiveRequest: CreateInteractiveRequest): Future[InteractiveSession] = {
val id = _idCounter.getAndIncrement
val session = factory.createSession(id, kind, proxyUser = proxyUser)
val session = factory.createSession(id, createInteractiveRequest)

session.map({ case(session: InteractiveSession) =>
info("created session %s" format session.id)
Expand Down Expand Up @@ -97,6 +97,9 @@ class SessionManager(factory: InteractiveSessionFactory) extends Logging {
}
}

case class CreateInteractiveRequest(kind: Kind,
proxyUser: Option[String] = None)

class SessionNotFound extends Exception

private class GarbageCollector(sessionManager: SessionManager) extends Thread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ package com.cloudera.hue.livy.server.interactive
import java.lang.ProcessBuilder.Redirect
import java.net.URL

import com.cloudera.hue.livy.sessions.Kind
import com.cloudera.hue.livy.spark.SparkSubmitProcessBuilder
import com.cloudera.hue.livy.spark.SparkSubmitProcessBuilder.AbsolutePath
import com.cloudera.hue.livy.{LivyConf, Logging, Utils}

import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.io.Source

Expand All @@ -38,21 +35,21 @@ object InteractiveSessionProcess extends Logging {
val CONF_LIVY_REPL_CALLBACK_URL = "livy.repl.callback-url"
val CONF_LIVY_REPL_DRIVER_CLASS_PATH = "livy.repl.driverClassPath"

def create(livyConf: LivyConf, id: Int, kind: Kind, proxyUser: Option[String] = None): InteractiveSession = {
val process = startProcess(livyConf, id, kind, proxyUser)
new InteractiveSessionProcess(id, kind, proxyUser, process)
def create(livyConf: LivyConf, id: Int, createInteractiveRequest: CreateInteractiveRequest): InteractiveSession = {
val process = startProcess(livyConf, id, createInteractiveRequest)
new InteractiveSessionProcess(id, createInteractiveRequest, process)
}

// Loop until we've started a process with a valid port.
private def startProcess(livyConf: LivyConf, id: Int, kind: Kind, proxyUser: Option[String]): Process = {
private def startProcess(livyConf: LivyConf, id: Int, createInteractiveRequest: CreateInteractiveRequest): Process = {

val builder = new SparkSubmitProcessBuilder(livyConf)

builder.className("com.cloudera.hue.livy.repl.Main")

sys.env.get("LIVY_REPL_JAVA_OPTS").foreach(builder.driverJavaOptions)
livyConf.getOption(CONF_LIVY_REPL_DRIVER_CLASS_PATH).foreach(builder.driverClassPath)
proxyUser.foreach(builder.proxyUser)
createInteractiveRequest.proxyUser.foreach(builder.proxyUser)

livyConf.getOption(CONF_LIVY_REPL_CALLBACK_URL).foreach { case callbackUrl =>
builder.env("LIVY_CALLBACK_URL", f"$callbackUrl/sessions/$id/callback")
Expand All @@ -63,7 +60,7 @@ object InteractiveSessionProcess extends Logging {
builder.redirectOutput(Redirect.PIPE)
builder.redirectError(Redirect.INHERIT)

builder.start(AbsolutePath(livyJar(livyConf)), List(kind.toString))
builder.start(AbsolutePath(livyJar(livyConf)), List(createInteractiveRequest.kind.toString))
}

private def livyJar(conf: LivyConf): String = {
Expand All @@ -74,9 +71,8 @@ object InteractiveSessionProcess extends Logging {
}

private class InteractiveSessionProcess(id: Int,
kind: Kind,
proxyUser: Option[String],
process: Process) extends InteractiveWebSession(id, kind, proxyUser) {
createInteractiveRequest: CreateInteractiveRequest,
process: Process) extends InteractiveWebSession(id, createInteractiveRequest) {

val stdoutThread = new Thread {
override def run() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
package com.cloudera.hue.livy.server.interactive

import com.cloudera.hue.livy.LivyConf
import com.cloudera.hue.livy.sessions.Kind

import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.{ExecutionContext, Future}

class InteractiveSessionProcessFactory(livyConf: LivyConf) extends InteractiveSessionFactory {

implicit def executor: ExecutionContext = ExecutionContext.global

override def createSession(id: Int, kind: Kind, proxyUser: Option[String] = None): Future[InteractiveSession] = {
override def createSession(id: Int, createInteractiveRequest: CreateInteractiveRequest): Future[InteractiveSession] = {
Future {
InteractiveSessionProcess.create(livyConf, id, kind, proxyUser)
InteractiveSessionProcess.create(livyConf, id, createInteractiveRequest)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit

import com.cloudera.hue.livy.Logging
import com.cloudera.hue.livy.msgs.ExecuteRequest
import InteractiveSession.SessionFailedToStart
import com.cloudera.hue.livy.server.interactive.InteractiveSession.SessionFailedToStart
import com.cloudera.hue.livy.sessions._
import com.fasterxml.jackson.core.JsonParseException
import org.json4s.JsonAST.JString
Expand Down Expand Up @@ -66,11 +66,11 @@ class InteractiveSessionServlet(sessionManager: SessionManager)
}

post("/") {
val createSessionRequest = parsedBody.extract[CreateSessionRequest]
val createInteractiveRequest = parsedBody.extract[CreateInteractiveRequest]

new AsyncResult {
val is = {
val sessionFuture = sessionManager.createSession(createSessionRequest.lang, createSessionRequest.proxyUser)
val sessionFuture = sessionManager.createSession(createInteractiveRequest)

sessionFuture.map { case session =>
Created(session,
Expand Down Expand Up @@ -196,7 +196,6 @@ class InteractiveSessionServlet(sessionManager: SessionManager)
}
}

private case class CreateSessionRequest(lang: Kind, proxyUser: Option[String])
private case class CallbackRequest(url: String)

private object Serializers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.net.URL
import com.cloudera.hue.livy.msgs.ExecuteRequest
import com.cloudera.hue.livy.repl.python.PythonSession
import com.cloudera.hue.livy.repl.scala.SparkSession
import com.cloudera.hue.livy.sessions.{Kind, PySpark, Spark, State}
import com.cloudera.hue.livy.sessions.{PySpark, Spark, State}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
Expand All @@ -32,26 +32,28 @@ object InteractiveSessionThread {
val LIVY_HOME = System.getenv("LIVY_HOME")
val LIVY_REPL = LIVY_HOME + "/bin/livy-repl"

def create(id: Int, kind: Kind): InteractiveSession = {
val session = kind match {
def create(id: Int, createInteractiveRequest: CreateInteractiveRequest): InteractiveSession = {
val session = createInteractiveRequest.kind match {
case Spark() =>
SparkSession.create()
case PySpark() =>
PythonSession.createPySpark()
}
new InteractiveSessionThread(id, kind, session)
new InteractiveSessionThread(id, createInteractiveRequest, session)
}
}

private class InteractiveSessionThread(val id: Int,
val kind: Kind,
session: com.cloudera.hue.livy.repl.Session) extends InteractiveSession {
createInteractiveRequest: CreateInteractiveRequest,
session: com.cloudera.hue.livy.repl.Session) extends InteractiveSession {

protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global

private var executedStatements = 0
private var statements_ = new ArrayBuffer[Statement]

override def kind = createInteractiveRequest.kind

override def proxyUser: Option[String] = None

override def lastActivity: Long = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
package com.cloudera.hue.livy.server.interactive

import com.cloudera.hue.livy.LivyConf
import com.cloudera.hue.livy.sessions.Kind

import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.{ExecutionContext, Future}

class InteractiveSessionThreadFactory(livyConf: LivyConf) extends InteractiveSessionFactory {

implicit def executor: ExecutionContext = ExecutionContext.global

override def createSession(id: Int, kind: Kind, proxyUser: Option[String] = None): Future[InteractiveSession] = {
override def createSession(id: Int, createInteractiveRequest: CreateInteractiveRequest): Future[InteractiveSession] = {
Future {
InteractiveSessionThread.create(id, kind)
InteractiveSessionThread.create(id, createInteractiveRequest)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ package com.cloudera.hue.livy.server.interactive
import java.lang.ProcessBuilder.Redirect
import java.util.concurrent.TimeUnit

import com.cloudera.hue.livy.sessions.Error
import com.cloudera.hue.livy.spark.SparkSubmitProcessBuilder
import com.cloudera.hue.livy.spark.SparkSubmitProcessBuilder.AbsolutePath
import com.cloudera.hue.livy.{LineBufferedProcess, Utils, LivyConf}
import com.cloudera.hue.livy.sessions.{Kind, Error}
import com.cloudera.hue.livy.yarn.{Client, Job}
import com.cloudera.hue.livy.{LineBufferedProcess, LivyConf, Utils}

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}

object InteractiveSessionYarn {
protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global

private val CONF_LIVY_JAR = "livy.yarn.jar"
private lazy val regex = """Application report for (\w+)""".r.unanchored

def create(livyConf: LivyConf, client: Client, id: Int, kind: Kind, proxyUser: Option[String] = None): InteractiveSession = {
def create(livyConf: LivyConf, client: Client, id: Int, createInteractiveRequest: CreateInteractiveRequest): InteractiveSession = {
val callbackUrl = System.getProperty("livy.server.callback-url")
val url = f"$callbackUrl/sessions/$id/callback"

Expand All @@ -45,12 +45,12 @@ object InteractiveSessionYarn {
builder.master("yarn-cluster")
builder.className("com.cloudera.hue.livy.repl.Main")
builder.driverJavaOptions(f"-Dlivy.repl.callback-url=$url -Dlivy.repl.port=0")
proxyUser.foreach(builder.proxyUser)
createInteractiveRequest.proxyUser.foreach(builder.proxyUser)

builder.redirectOutput(Redirect.PIPE)
builder.redirectErrorStream(redirect = true)

val process = builder.start(AbsolutePath(livyJar(livyConf)), List(kind.toString))
val process = builder.start(AbsolutePath(livyJar(livyConf)), List(createInteractiveRequest.kind.toString))

val job = Future {
val proc = new LineBufferedProcess(process)
Expand All @@ -62,7 +62,7 @@ object InteractiveSessionYarn {
job
}

new InteractiveSessionYarn(id, kind, proxyUser, job)
new InteractiveSessionYarn(id, createInteractiveRequest, job)
}

private def livyJar(livyConf: LivyConf) = {
Expand All @@ -75,9 +75,8 @@ object InteractiveSessionYarn {
}

private class InteractiveSessionYarn(id: Int,
kind: Kind,
proxyUser: Option[String],
job: Future[Job]) extends InteractiveWebSession(id, kind, proxyUser) {
createInteractiveRequest: CreateInteractiveRequest,
job: Future[Job]) extends InteractiveWebSession(id, createInteractiveRequest) {
job.onFailure { case _ =>
_state = Error()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
package com.cloudera.hue.livy.server.interactive

import com.cloudera.hue.livy.LivyConf
import com.cloudera.hue.livy.sessions.Kind
import com.cloudera.hue.livy.yarn.Client

import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.{ExecutionContext, Future}

class InteractiveSessionYarnFactory(livyConf: LivyConf) extends InteractiveSessionFactory {

implicit def executor: ExecutionContext = ExecutionContext.global

val client = new Client(livyConf)

override def createSession(id: Int, kind: Kind, proxyUser: Option[String] = None): Future[InteractiveSession] = {
override def createSession(id: Int, createInteractiveRequest: CreateInteractiveRequest): Future[InteractiveSession] = {
Future {
InteractiveSessionYarn.create(livyConf, client, id, kind, proxyUser)
InteractiveSessionYarn.create(livyConf, client, id, createInteractiveRequest)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ import dispatch._
import org.json4s.jackson.Serialization.write
import org.json4s.{DefaultFormats, Formats}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration
import scala.concurrent.{Future, _}

class InteractiveWebSession(val id: Int,
val kind: Kind,
val proxyUser: Option[String]) extends InteractiveSession with Logging {
class InteractiveWebSession(val id: Int, createInteractiveRequest: CreateInteractiveRequest) extends InteractiveSession with Logging {

protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global
protected implicit def jsonFormats: Formats = DefaultFormats
Expand All @@ -47,6 +44,10 @@ class InteractiveWebSession(val id: Int,
private[this] var _executedStatements = 0
private[this] var _statements = IndexedSeq[Statement]()

override def kind = createInteractiveRequest.kind

override def proxyUser = createInteractiveRequest.proxyUser

override def url: Option[URL] = _url

override def url_=(url: URL) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.cloudera.hue.livy.server.interactive

import com.cloudera.hue.livy.LivyConf
import com.cloudera.hue.livy.server.interactive.InteractiveSessionProcess
import com.cloudera.hue.livy.sessions.Spark
import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}

Expand All @@ -28,5 +27,5 @@ class InteractiveSessionProcessSpec extends BaseSessionSpec with FunSpecLike wit
val livyConf = new LivyConf()
livyConf.set("livy.repl.driverClassPath", sys.props("java.class.path"))

def createSession() = InteractiveSessionProcess.create(livyConf, 0, Spark())
def createSession() = InteractiveSessionProcess.create(livyConf, 0, CreateInteractiveRequest(kind = Spark()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class InteractiveSessionServletSpec extends ScalatraSuite with FunSpecLike {
}

class MockInteractiveSessionFactory() extends InteractiveSessionFactory {
override def createSession(id: Int, kind: Kind, proxyUser: Option[String]): Future[InteractiveSession] = {
override def createSession(id: Int, createInteractiveRequest: CreateInteractiveRequest): Future[InteractiveSession] = {
Future.successful(new MockInteractiveSession(id))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

package com.cloudera.hue.livy.server.interactive

import com.cloudera.hue.livy.server.interactive.BaseSessionSpec
import com.cloudera.hue.livy.sessions.Spark
import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}

class InteractiveSessionThreadSpec extends BaseSessionSpec with FunSpecLike with Matchers with BeforeAndAfter {

def createSession() = InteractiveSessionThread.create(0, Spark())
def createSession() = InteractiveSessionThread.create(0, CreateInteractiveRequest(kind = Spark()))
}

0 comments on commit 68c44bb

Please sign in to comment.