Skip to content

Commit

Permalink
Add servlet to handle livy client sessions.
Browse files Browse the repository at this point in the history
This is not complete; it allows creating / tearing down sessions and
submitting synchronous / asynchronous jobs, but there's currently no
way to retrieve a job's results (or even monitoring it's state, since
the backend code for that is not implemented).

Also includes a new base test class for the servlets that does away
with a lot of copy & paste.
  • Loading branch information
harishreedharan authored and Marcelo Vanzin committed Jan 12, 2016
1 parent d298921 commit 35c9541
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 117 deletions.
7 changes: 7 additions & 0 deletions api/src/main/java/com/cloudera/livy/LivyClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public LivyClientBuilder setAll(Properties props) {
return this;
}

public LivyClientBuilder setIfMissing(String key, String value) {
if (!config.containsKey(key)) {
setConf(key, value);
}
return this;
}

public LivyClient build() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import com.cloudera.livy.client.local.rpc.RpcServer;
import static com.cloudera.livy.client.local.LocalConf.Entry.*;

class LocalClient implements LivyClient {
public class LocalClient implements LivyClient {
private static final Logger LOG = LoggerFactory.getLogger(LocalClient.class);

private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ abstract class SessionServlet[S <: Session](sessionManager: SessionManager[S])
new AsyncResult {
val is = Future {
val session = sessionManager.create(parsedBody)
Created(session,
Created(serializeSession(session),
headers = Map("Location" -> url(getSession, "id" -> session.id.toString))
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.livy.server.client

import java.io.{ByteArrayInputStream, ObjectInputStream}

import org.json4s.JValue
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.write
import org.scalatra._

import com.cloudera.livy.Job
import com.cloudera.livy.server.SessionServlet
import com.cloudera.livy.sessions.{SessionManager, SessionState}
import com.cloudera.livy.spark.client._

class ClientSessionServlet(sessionManager: SessionManager[ClientSession])
extends SessionServlet[ClientSession](sessionManager) {

post("/:id/submit-job") {
sessionManager.get(params("id").toInt) match {
case Some(session) =>
val req = parsedBody.extract[SerializedJob]
if (req.job != null && req.job.length > 0) {
Created(session.submitJob(req.job))
} else {
BadRequest("No job provided.")
}
case None =>
}
}

post("/:id/run-job") {
sessionManager.get(params("id").toInt) match {
case Some(session) =>
val req = parsedBody.extract[SerializedJob]
if (req.job != null && req.job.length > 0) {
val jobId = session.runJob(req.job)
Created(JobSubmitted(jobId))
} else {
BadRequest("No job provided.")
}
case None =>
}
}

post("/:id/add-jar") {
sessionManager.get(params("id").toInt) match {
case Some(session) =>
session.addJar(parsedBody.extract[AddJar].uri)
case None =>
}
}

post("/:id/add-file") {
sessionManager.get(params("id").toInt) match {
case Some(session) =>
session.addFile(parsedBody.extract[AddFile].uri)
case None =>
}
}

post("/:id/job-status") {
sessionManager.get(params("id").toInt) match {
case Some(session) =>
case None =>
}
}

override protected def serializeSession(session: ClientSession): JValue = {
val view = ClientSessionView(session.id, session.state)
parse(write(view))
}

case class ClientSessionView(id: Int, state: SessionState)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cloudera.livy.server

import org.json4s.{DefaultFormats, Formats, JValue}
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.write
import org.scalatest.{BeforeAndAfterAll, FunSpecLike}
import org.scalatra.test.scalatest.ScalatraSuite

import com.cloudera.livy.LivyConf
import com.cloudera.livy.sessions.{Session, SessionFactory, SessionManager}

abstract class BaseSessionServletSpec[S <: Session] extends ScalatraSuite
with FunSpecLike with BeforeAndAfterAll {

protected implicit def jsonFormats: Formats = DefaultFormats

override protected def withFixture(test: NoArgTest) = {
assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.")
test()
}

override def afterAll(): Unit = {
super.afterAll()
sessionManager.shutdown()
}


def sessionFactory: SessionFactory[S]
def servlet: SessionServlet[S]

val livyConf = new LivyConf()
val sessionManager = new SessionManager(livyConf, sessionFactory)

addServlet(servlet, "/*")

protected def toJson(msg: AnyRef): String = write(msg)

protected def headers: Map[String, String] = Map("Content-Type" -> "application/json")

protected def getJson(uri: String, expectedStatus: Int = 200)(fn: (JValue) => Unit): Unit = {
get(uri)(doTest(expectedStatus, fn))
}

protected def postJson(uri: String, body: AnyRef, expectedStatus: Int = 201)
(fn: (JValue) => Unit): Unit = {
post(uri, body = toJson(body), headers = headers)(doTest(expectedStatus, fn))
}

protected def deleteJson(uri: String, expectedStatus: Int = 200)(fn: (JValue) => Unit): Unit = {
delete(uri)(doTest(expectedStatus, fn))
}

private def doTest(expectedStatus: Int, fn: (JValue) => Unit): Unit = {
status should be (expectedStatus)
header("Content-Type") should include("application/json")
fn(parse(body))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,19 @@ package com.cloudera.livy.server.batch
import java.io.FileWriter
import java.nio.file.{Files, Path}
import java.util.concurrent.TimeUnit
import com.cloudera.livy.sessions.{SessionManager, SessionState}
import com.cloudera.livy.spark.SparkProcessBuilderFactory
import com.cloudera.livy.spark.batch.{BatchSessionProcessFactory, CreateBatchRequest}
import com.cloudera.livy.{LivyConf, Utils}
import org.json4s.JsonAST.{JArray, JInt, JObject, JString}
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.write
import org.json4s.{DefaultFormats, Formats}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpecLike}
import org.scalatra.test.scalatest.ScalatraSuite

import scala.concurrent.duration.Duration

class BatchServletSpec extends ScalatraSuite with FunSpecLike with BeforeAndAfterAll with BeforeAndAfter {
import org.json4s.JsonAST.{JArray, JInt, JObject, JString}

protected implicit def jsonFormats: Formats = DefaultFormats
import com.cloudera.livy.Utils
import com.cloudera.livy.server.BaseSessionServletSpec
import com.cloudera.livy.sessions.SessionState
import com.cloudera.livy.sessions.batch.BatchSession
import com.cloudera.livy.spark.SparkProcessBuilderFactory
import com.cloudera.livy.spark.batch.{BatchSessionProcessFactory, CreateBatchRequest}

override protected def withFixture(test: NoArgTest) = {
assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.")
test()
}
class BatchServletSpec extends BaseSessionServletSpec[BatchSession] {

val script: Path = {
val script = Files.createTempFile("livy-test", ".py")
Expand All @@ -58,80 +50,55 @@ class BatchServletSpec extends ScalatraSuite with FunSpecLike with BeforeAndAfte
script
}

val livyConf = new LivyConf()
val batchFactory = new BatchSessionProcessFactory(new SparkProcessBuilderFactory(livyConf))
val batchManager = new SessionManager(livyConf, batchFactory)
val servlet = new BatchSessionServlet(batchManager)

addServlet(servlet, "/*")

after {
batchManager.shutdown()
override def sessionFactory: BatchSessionProcessFactory = {
new BatchSessionProcessFactory(new SparkProcessBuilderFactory(livyConf))
}
override def servlet = new BatchSessionServlet(sessionManager)

describe("Batch Servlet") {
it("should create and tear down a batch") {
get("/") {
status should equal (200)
header("Content-Type") should include("application/json")
val parsedBody = parse(body)
parsedBody \ "sessions" should equal (JArray(List()))
getJson("/") { data =>
data \ "sessions" should equal (JArray(List()))
}

val createBatchRequest = write(CreateBatchRequest(
file = script.toString
))

post("/", body = createBatchRequest, headers = Map("Content-Type" -> "application/json")) {
status should equal (201)
header("Content-Type") should include("application/json")
postJson("/", CreateBatchRequest(file = script.toString)) { data =>
header("Location") should equal("/0")
val parsedBody = parse(body)
parsedBody \ "id" should equal (JInt(0))
data \ "id" should equal (JInt(0))

val batch = batchManager.get(0)
val batch = sessionManager.get(0)
batch should be (defined)
}

// Wait for the process to finish.
{
val batch = batchManager.get(0).get
val batch = sessionManager.get(0).get
Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS))
(batch.state match {
case SessionState.Success(_) => true
case _ => false
}) should be (true)
}

get("/0") {
status should equal (200)
header("Content-Type") should include("application/json")
val parsedBody = parse(body)
parsedBody \ "id" should equal (JInt(0))
parsedBody \ "state" should equal (JString("success"))
getJson("/0") { data =>
data \ "id" should equal (JInt(0))
data \ "state" should equal (JString("success"))

val batch = batchManager.get(0)
val batch = sessionManager.get(0)
batch should be (defined)
}

get("/0/log?size=1000") {
status should equal (200)
header("Content-Type") should include("application/json")
val parsedBody = parse(body)
parsedBody \ "id" should equal (JInt(0))
(parsedBody \ "log").extract[List[String]] should contain ("hello world")
getJson("/0/log?size=1000") { data =>
data \ "id" should equal (JInt(0))
(data \ "log").extract[List[String]] should contain ("hello world")

val batch = batchManager.get(0)
val batch = sessionManager.get(0)
batch should be (defined)
}

delete("/0") {
status should equal (200)
header("Content-Type") should include("application/json")
val parsedBody = parse(body)
parsedBody should equal (JObject(("msg", JString("deleted"))))
deleteJson("/0") { data =>
data should equal (JObject(("msg", JString("deleted"))))

val batch = batchManager.get(0)
val batch = sessionManager.get(0)
batch should not be defined
}
}
Expand Down
Loading

0 comments on commit 35c9541

Please sign in to comment.