Skip to content

Commit

Permalink
LIVY-88. Replace net.databinder.dispatch with com.ning.async-http-client
Browse files Browse the repository at this point in the history
Closes apache#87
  • Loading branch information
zjffdu authored and Marcelo Vanzin committed Mar 17, 2016
1 parent 011cc98 commit 761599f
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 76 deletions.
9 changes: 9 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<artifactId>json4s-core_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand All @@ -104,6 +109,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.ning</groupId>
<artifactId>async-http-client</artifactId>
</dependency>
</dependencies>

</project>
17 changes: 16 additions & 1 deletion core/src/main/scala/com/cloudera/livy/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

package com.cloudera.livy

import java.io.{File, FileInputStream, InputStreamReader}
import java.io.{Closeable, File, FileInputStream, InputStreamReader}
import java.util.Properties

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.TimeoutException
import scala.concurrent.duration.Duration

import com.ning.http.client.Response
import org.json4s._
import org.json4s.jackson.JsonMethods._


object Utils {
def getPropertiesFromFile(file: File): Map[String, String] = {
val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
Expand Down Expand Up @@ -95,4 +100,14 @@ object Utils {
}
}

def usingResource[A <: Closeable, B](resource: A)(f: A => B): B = {
try {
f(resource)
} finally {
resource.close()
}
}

def toJson(r: Response): JValue =
parse(StringInput(r.getResponseBody), true)
}
20 changes: 7 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
<hadoop.version>2.6.0-cdh5.5.0</hadoop.version>
<spark.version>1.5.0-cdh5.5.0</spark.version>
<commons-codec.version>1.9</commons-codec.version>
<dispatch.version>0.11.2</dispatch.version>
<guava.version>14.0.1</guava.version>
<httpclient.version>4.5</httpclient.version>
<httpcore.version>4.4.1</httpcore.version>
Expand All @@ -58,6 +57,7 @@
<metrics.version>3.1.0</metrics.version>
<mockito.version>1.9.5</mockito.version>
<netty.version>4.0.23.Final</netty.version>
<async-http-client.version>1.9.33</async-http-client.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<py4j.version>0.8.2.1</py4j.version>
<scala.binary.version>2.10</scala.binary.version>
Expand Down Expand Up @@ -496,18 +496,6 @@
<version>${snappy-java.version}</version>
</dependency>

<dependency>
<groupId>net.databinder.dispatch</groupId>
<artifactId>dispatch-core_${scala.binary.version}</artifactId>
<version>${dispatch.version}</version>
</dependency>

<dependency>
<groupId>net.databinder.dispatch</groupId>
<artifactId>dispatch-json4s-jackson_${scala.binary.version}</artifactId>
<version>${dispatch.version}</version>
</dependency>

<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
Expand All @@ -520,6 +508,12 @@
<version>1.7.0</version>
</dependency>

<dependency>
<groupId>com.ning</groupId>
<artifactId>async-http-client</artifactId>
<version>${async-http-client.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
5 changes: 0 additions & 5 deletions repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>net.databinder.dispatch</groupId>
<artifactId>dispatch-core_${scala.binary.version}</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
29 changes: 15 additions & 14 deletions repl/src/main/scala/com/cloudera/livy/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ package com.cloudera.livy.repl

import java.util.concurrent.TimeUnit
import javax.servlet.ServletContext
import javax.servlet.http.HttpServletResponse

import scala.annotation.tailrec
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

import dispatch._
import com.ning.http.client.AsyncHttpClient
import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.Serialization.write
import org.scalatra.LifeCycle
import org.scalatra.servlet.ScalatraListener

import com.cloudera.livy.{LivyConf, Logging, WebServer}
import com.cloudera.livy.{LivyConf, Logging, Utils, WebServer}
import com.cloudera.livy.repl.python.PythonInterpreter
import com.cloudera.livy.repl.scalaRepl.SparkInterpreter
import com.cloudera.livy.repl.sparkr.SparkRInterpreter
Expand Down Expand Up @@ -86,8 +87,6 @@ object Main extends Logging {
server.join()
} finally {
server.stop()
// Make sure to close all our outstanding http requests.
Http.shutdown()
}
}
}
Expand Down Expand Up @@ -135,17 +134,19 @@ class ScalatraBootstrap extends LifeCycle with Logging {

// Wait for our url to be discovered.
val replUrl = waitForReplUrl()

var req = url(callbackUrl).setContentType("application/json", "UTF-8")
req = req << write(Map("url" -> replUrl))

info(s"Calling $callbackUrl...")
val rep = Http(req OK as.String)
rep.onFailure {
case _ => System.exit(1)
val response = Utils.usingResource(new AsyncHttpClient()) { client =>
client.preparePost(callbackUrl)
.setHeader("Content-Type", "application/json;charset=UTF-8")
.setBody(write(Map("url" -> replUrl)))
.execute().get()
}
response.getStatusCode match {
case HttpServletResponse.SC_OK => Future.successful(())
case statusCode =>
info("callback fail, " + response.getResponseBody)
System.exit(1)
}

Await.result(rep, Duration(10, TimeUnit.SECONDS))
} catch {
case e: Throwable =>
error("Exception is thrown in notifyCallback()", e)
Expand Down
10 changes: 0 additions & 10 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>net.databinder.dispatch</groupId>
<artifactId>dispatch-core_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>net.databinder.dispatch</groupId>
<artifactId>dispatch-json4s-jackson_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>com.cloudera.livy</groupId>
<artifactId>livy-repl</artifactId>
Expand Down
3 changes: 0 additions & 3 deletions server/src/main/scala/com/cloudera/livy/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ object Main extends Logging {
server.join()
} finally {
server.stop()

// Make sure to close all our outstanding http requests.
dispatch.Http.shutdown()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ abstract class SessionServlet[S <: Session](livyConf: LivyConf)
}

error {
case e: dispatch.StatusCode => ActionResult(ResponseStatus(e.code), e.getMessage, Map.empty)
case e: IllegalArgumentException => BadRequest(e.getMessage)
case e =>
SessionServlet.error("internal error", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ import scala.annotation.tailrec
import scala.concurrent.{Future, _}
import scala.concurrent.duration.Duration

import dispatch._
import com.ning.http.client.AsyncHttpClient
import org.json4s._
import org.json4s.{DefaultFormats, Formats, JValue}
import org.json4s.JsonAST.{JNull, JString}
import org.json4s.jackson.Serialization.write

import com.cloudera.livy.{ExecuteRequest, LivyConf, Utils}
import com.cloudera.livy.{ExecuteRequest, LivyConf}
import com.cloudera.livy.sessions._
import com.cloudera.livy.sessions.interactive.Statement
import com.cloudera.livy.utils.SparkProcessBuilder
import com.cloudera.livy.Utils

object InteractiveSession {
val LivyReplDriverClassPath = "livy.repl.driverClassPath"
Expand Down Expand Up @@ -167,19 +169,23 @@ class InteractiveSession(
_state match {
case SessionState.Idle() =>
_state = SessionState.Busy()

Http(svc.DELETE OK as.String).either() match {
case (Right(_) | Left(_: ConnectException)) =>
// Make sure to eat any connection errors because the repl shut down before it sent
// out an OK.
Future {
try {
Utils.usingResource(new AsyncHttpClient()) { client =>
client.prepareDelete(url.get.toString).execute()
}
synchronized {
_state = SessionState.Dead()
}

Future.successful(())

case Left(t: Throwable) =>
Future.failed(t)
} catch {
case e: ConnectException =>
synchronized {
_state = SessionState.Dead()
}
Future.successful(())
case ex => Future.failed(ex)
}
}
case SessionState.NotStarted() =>
Future {
Expand Down Expand Up @@ -243,12 +249,17 @@ class InteractiveSession(
_state = SessionState.Busy()
recordActivity()

val req = (svc / "execute").setContentType("application/json", "UTF-8") << write(content)

val future = Http(req OK as.json4s.Json).map { case resp: JValue =>
parseResponse(resp).getOrElse {
val future = Future {
val response = Utils.usingResource(new AsyncHttpClient()) { client =>
client.preparePost(url.get.toString + "/execute")
.setHeader("Content-Type", "application/json;charset=UTF-8")
.setBody(write(content))
.execute().get
}
val jsonResponse = Utils.toJson(response)
parseResponse(jsonResponse).getOrElse {
// The result isn't ready yet. Loop until it is.
val id = (resp \ "id").extract[Int]
val id = (jsonResponse \ "id").extract[Int]
waitForStatement(id)
}
}
Expand All @@ -266,17 +277,14 @@ class InteractiveSession(
Utils.waitUntil({ () => state != oldState }, atMost)
}

private def svc = {
val url = _url.head
dispatch.url(url.toString)
}

@tailrec
private def waitForStatement(id: Int): JValue = {
val req = (svc / "history" / id).setContentType("application/json", "UTF-8")
val resp = Await.result(Http(req OK as.json4s.Json), Duration.Inf)

parseResponse(resp) match {
val response = Utils.usingResource(new AsyncHttpClient()) { client =>
client.prepareGet(url.get.toString + "/history/" + id)
.setHeader("Content-Type", "application/json;charset=UTF-8")
.execute().get
}
parseResponse(Utils.toJson(response)) match {
case Some(result) => result
case None =>
Thread.sleep(1000)
Expand Down Expand Up @@ -305,10 +313,13 @@ class InteractiveSession(
}

private def replErroredOut() = {
val req = svc.setContentType("application/json", "UTF-8")
val response = Await.result(Http(req OK as.json4s.Json), Duration.Inf)
val response = Utils.usingResource(new AsyncHttpClient()) { client =>
client.prepareGet(url.get.toString)
.setHeader("Content-Type", "application/json;charset=UTF-8")
.execute().get
}

response \ "state" match {
Utils.toJson(response) \ "state" match {
case JString("error") => true
case _ => false
}
Expand Down Expand Up @@ -372,7 +383,6 @@ class InteractiveSession(
}
}
}

// Error out the job if the process errors out.
Future {
if (process.waitFor() == 0) {
Expand Down

0 comments on commit 761599f

Please sign in to comment.