Skip to content

Commit

Permalink
LIVY-134. Fix real cluster support for integration tests.
Browse files Browse the repository at this point in the history
This change bring real cluster support back to integration tests,
by fixing and simplifying the code that starts Livy on a remote
machine. Since the tests now need that, there's also support to
download the Hadoop configuration from the cluster so that tests
can write directly to HDFS. At the moment, that means kerberized
cluster do not work.

The change also cleans up a lot of unused features of the test
framework; unused methods were removed, and the ClusterPool type
was removed, since right now there's only ever a single cluster.
I also removed the "withClue" calls since log fetching is now
done differently, and logs end up in local files in the build
directory.

Closes apache#126
  • Loading branch information
Marcelo Vanzin committed May 9, 2016
1 parent 3ae23e6 commit 304cc46
Showing 10 changed files with 402 additions and 422 deletions.
Original file line number Diff line number Diff line change
@@ -68,38 +68,37 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
protected def rtest(desc: String)(testFn: => Unit): Unit = {
test(desc) {
assume(cluster.isRealSpark(), "SparkR tests require a real Spark installation.")
assume(cluster.hasSparkR(), "Spark under test does not support R.")
testFn
}
}

test("initialize test cluster") {
cluster = ClusterPool.get.lease()
cluster = Cluster.get()
httpClient = new AsyncHttpClient()
livyClient = new LivyRestClient(httpClient, livyEndpoint)
}

class LivyRestClient(httpClient: AsyncHttpClient, livyEndpoint: String) {

def startSession(kind: Kind): Int = {
withClue(cluster.getLivyLog()) {
val requestBody = new CreateInteractiveRequest()
requestBody.kind = kind
val requestBody = new CreateInteractiveRequest()
requestBody.kind = kind

val rep = httpClient.preparePost(s"$livyEndpoint/sessions")
.setBody(mapper.writeValueAsString(requestBody))
.execute()
.get()
val rep = httpClient.preparePost(s"$livyEndpoint/sessions")
.setBody(mapper.writeValueAsString(requestBody))
.execute()
.get()

val sessionId: Int = withClue(rep.getResponseBody) {
rep.getStatusCode should equal(HttpServletResponse.SC_CREATED)
val newSession = mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
newSession should contain key ("id")
val sessionId: Int = withClue(rep.getResponseBody) {
rep.getStatusCode should equal(HttpServletResponse.SC_CREATED)
val newSession = mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
newSession should contain key ("id")

newSession("id").asInstanceOf[Int]
}

sessionId
newSession("id").asInstanceOf[Int]
}

sessionId
}

/** Stops a session. If an id < 0 is provided, do nothing. */
@@ -130,43 +129,39 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
}

def runStatement(sessionId: Int, stmt: String): Int = {
withClue(cluster.getLivyLog()) {
val requestBody = Map("code" -> stmt)
val rep = httpClient.preparePost(s"$livyEndpoint/sessions/$sessionId/statements")
.setBody(mapper.writeValueAsString(requestBody))
.execute()
.get()

val stmtId: Int = withClue(rep.getResponseBody) {
rep.getStatusCode should equal(HttpServletResponse.SC_CREATED)
val newStmt = mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
newStmt should contain key ("id")

newStmt("id").asInstanceOf[Int]
}
stmtId
val requestBody = Map("code" -> stmt)
val rep = httpClient.preparePost(s"$livyEndpoint/sessions/$sessionId/statements")
.setBody(mapper.writeValueAsString(requestBody))
.execute()
.get()

val stmtId: Int = withClue(rep.getResponseBody) {
rep.getStatusCode should equal(HttpServletResponse.SC_CREATED)
val newStmt = mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
newStmt should contain key ("id")

newStmt("id").asInstanceOf[Int]
}
stmtId
}

def getStatementResult(sessionId: Int, stmtId: Int): String = {
withClue(cluster.getLivyLog()) {
val rep = httpClient.prepareGet(s"$livyEndpoint/sessions/$sessionId/statements/$stmtId")
.execute()
.get()

val stmtResult = withClue(rep.getResponseBody) {
rep.getStatusCode should equal(HttpServletResponse.SC_OK)
val newStmt = mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
newStmt should contain key ("output")
val output = newStmt("output").asInstanceOf[Map[String, Any]]
output should contain key ("data")
val data = output("data").asInstanceOf[Map[String, Any]]
data should contain key ("text/plain")
data("text/plain").asInstanceOf[String]
}
val rep = httpClient.prepareGet(s"$livyEndpoint/sessions/$sessionId/statements/$stmtId")
.execute()
.get()

stmtResult
val stmtResult = withClue(rep.getResponseBody) {
rep.getStatusCode should equal(HttpServletResponse.SC_OK)
val newStmt = mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
newStmt should contain key ("output")
val output = newStmt("output").asInstanceOf[Map[String, Any]]
output should contain key ("data")
val data = output("data").asInstanceOf[Map[String, Any]]
data should contain key ("text/plain")
data("text/plain").asInstanceOf[String]
}

stmtResult
}
}

Original file line number Diff line number Diff line change
@@ -18,23 +18,129 @@

package com.cloudera.livy.test.framework

import java.io.File
import java.io._
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties

import scala.collection.JavaConverters._
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import com.cloudera.livy.Logging

/**
* An common interface to run test on real cluster and mini cluster.
*/
trait Cluster {
def deploy(): Unit
def cleanUp(): Unit
def getYarnRmEndpoint: String
def upload(srcPath: String, destPath: String): Unit
def configDir(): File
def isRealSpark(): Boolean
def hasSparkR(): Boolean

def runLivy(): Unit
def stopLivy(): Unit
def livyEndpoint: String
def getLivyLog(): String
def hdfsScratchDir(): Path

def doAsClusterUser[T](task: => T): T

lazy val hadoopConf = {
val conf = new Configuration(false)
configDir().listFiles().foreach { f =>
if (f.getName().endsWith(".xml")) {
conf.addResource(new Path(f.toURI()))
}
}
conf
}

lazy val fs = doAsClusterUser {
FileSystem.get(hadoopConf)
}

}

object Cluster extends Logging {
private val CLUSTER_TYPE = "cluster.type"

private lazy val config = {
sys.props.get("cluster.spec")
.filter { path => path.nonEmpty && path != "default" }
.map { path =>
val in = Option(getClass.getClassLoader.getResourceAsStream(path))
.getOrElse(new FileInputStream(path))
val p = new Properties()
val reader = new InputStreamReader(in, UTF_8)
try {
p.load(reader)
} finally {
reader.close()
}
p.asScala.toMap
}
.getOrElse(Map(CLUSTER_TYPE -> "mini"))
}

private lazy val cluster = {
var _cluster: Cluster = null
try {
_cluster = config.get(CLUSTER_TYPE) match {
case Some("real") => new RealCluster(config)
case Some("mini") => new MiniCluster(config)
case t => throw new Exception(s"Unknown or unset cluster.type $t")
}
Runtime.getRuntime.addShutdownHook(new Thread {
override def run(): Unit = {
info("Shutting down cluster pool.")
_cluster.cleanUp()
}
})
_cluster.deploy()
} catch {
case e: Throwable =>
error("Failed to initialize cluster.", e)
Option(_cluster).foreach { c =>
Try(c.cleanUp()).recover { case e =>
error("Furthermore, failed to clean up cluster after failure.", e)
}
}
throw e
}
_cluster
}

def get(): Cluster = cluster
}

trait ClusterUtils {

protected def saveProperties(props: Map[String, String], dest: File): Unit = {
val jprops = new Properties()
props.foreach { case (k, v) => jprops.put(k, v) }

val tempFile = new File(dest.getAbsolutePath() + ".tmp")
val out = new OutputStreamWriter(new FileOutputStream(tempFile), UTF_8)
try {
jprops.store(out, "Configuration")
} finally {
out.close()
}
tempFile.renameTo(dest)
}

protected def loadProperties(file: File): Map[String, String] = {
val in = new InputStreamReader(new FileInputStream(file), UTF_8)
val props = new Properties()
try {
props.load(in)
} finally {
in.close()
}
props.asScala.toMap
}

def runCommand(cmd: String): String
}

This file was deleted.

Loading

0 comments on commit 304cc46

Please sign in to comment.