Skip to content

Commit

Permalink
LIVY-126. Add integration tests for batches.
Browse files Browse the repository at this point in the history
Run a couple of simple apps using the batch API. Some adjustment was
needed to make things work; mainly, "spark.submit.deployMode" is not
supported by the default Spark dependency (1.5.0 from CDH), so we need
to use the old style "yarn-cluster" otherwise batches will fail to
launch.

For the tests to work I also needed explicit access to the HDFS cluster,
which means I broke RealCluster in the process. Need to figure out how
to implement that kind of functionality when not using a mini cluster.

Closes apache#119
  • Loading branch information
Marcelo Vanzin committed May 2, 2016
1 parent cf73a66 commit d609977
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package com.cloudera.livy.test.framework

import java.io.File
import javax.servlet.http.HttpServletResponse

import scala.concurrent.duration._
import scala.language.postfixOps

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand All @@ -42,6 +44,11 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
.registerModule(DefaultScalaModule)
.registerModule(new SessionKindModule())

protected val testLib = sys.props("java.class.path")
.split(File.pathSeparator)
.find(new File(_).getName().startsWith("livy-test-lib-"))
.get

test("initialize test cluster") {
cluster = ClusterPool.get.lease()
httpClient = new AsyncHttpClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.cloudera.livy.test.framework

import java.io.File

/**
* An common interface to run test on real cluster and mini cluster.
*/
Expand All @@ -26,6 +28,7 @@ trait Cluster {
def cleanUp(): Unit
def getYarnRmEndpoint: String
def upload(srcPath: String, destPath: String): Unit
def configDir(): File

def runLivy(): Unit
def stopLivy(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,14 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU

private val tempDir = new File(sys.props("java.io.tmpdir"))
private var sparkConfDir: File = _
private var configDir: File = _
private var _configDir: File = _
private var hdfs: Option[ProcessInfo] = None
private var yarn: Option[ProcessInfo] = None
private var livy: Option[ProcessInfo] = None
private var livyUrl: String = _

override def configDir(): File = _configDir

// Explicitly remove the "test-lib" dependency from the classpath of child processes. We
// want tests to explicitly upload this jar when necessary, to test those code paths.
private val childClasspath = {
Expand All @@ -200,8 +202,7 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
sparkConfDir = mkdir("spark-conf")

val sparkConf = Map(
SparkLauncher.SPARK_MASTER -> "yarn",
"spark.submit.deployMode" -> "cluster",
SparkLauncher.SPARK_MASTER -> "yarn-cluster",
"spark.executor.instances" -> "1",
"spark.scheduler.minRegisteredResourcesRatio" -> "0.0",
"spark.ui.enabled" -> "false",
Expand All @@ -214,7 +215,7 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
)
saveProperties(sparkConf, new File(sparkConfDir, "spark-defaults.conf"))

configDir = mkdir("hadoop-conf")
_configDir = mkdir("hadoop-conf")
saveProperties(config, new File(configDir, "cluster.conf"))
hdfs = Some(start(MiniHdfsMain.getClass, new File(configDir, "core-site.xml")))
yarn = Some(start(MiniYarnMain.getClass, new File(configDir, "yarn-site.xml")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class RealCluster(
private var livyHomePath: Option[String] = Some("/usr/bin/livy")
private var pathsToCleanUp = ListBuffer.empty[String]

override def configDir(): File = throw new UnsupportedOperationException()

def sshClient[T](body: SshClient => SSH.Result[T]): Validated[T] = {
val sshLogin = PublicKeyLogin(
config.sshLogin, None, config.sshPubKey :: Nil)
Expand Down
119 changes: 119 additions & 0 deletions integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cloudera.livy.test

import java.io.File
import java.util.UUID
import javax.servlet.http.HttpServletResponse._

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.scalatest.concurrent.Eventually._

import com.cloudera.livy.client.common.HttpMessages._
import com.cloudera.livy.server.batch.CreateBatchRequest
import com.cloudera.livy.sessions.SessionState
import com.cloudera.livy.test.apps._
import com.cloudera.livy.test.framework.BaseIntegrationTestSuite

class BatchIT extends BaseIntegrationTestSuite {

private var testLibPath: Path = _
private var conf: Configuration = _
private var fs: FileSystem = _

test("upload test lib") {
// Load the HDFS config from the generated path, if available.
// TODO: how to do this for remote clusters.
conf = new Configuration(false)
cluster.configDir().listFiles().foreach { f =>
if (f.getName().endsWith(".xml")) {
conf.addResource(new Path(f.toURI()))
}
}

val hdfsPath = new Path("/testlib-" + UUID.randomUUID().toString() + ".jar")
fs = FileSystem.get(hdfsPath.toUri(), conf)
fs.copyFromLocalFile(new Path(new File(testLib).toURI()), hdfsPath)
testLibPath = fs.makeQualified(hdfsPath)
}

test("submit spark app") {
assume(testLibPath != null, "Test lib not uploaded.")
val output = "/" + UUID.randomUUID().toString()
val result = runBatch(classOf[SimpleSparkApp], args = List(output))
assert(result.state === SessionState.Success().toString)
assert(fs.isDirectory(new Path(output)))
}

test("submit an app that fails") {
assume(testLibPath != null, "Test lib not uploaded.")
val output = "/" + UUID.randomUUID().toString()
val result = runBatch(classOf[FailingApp], args = List(output))
assert(result.state === SessionState.Error().toString)

// The file is written to make sure the app actually ran, instead of just failing for
// some other reason.
assert(fs.isFile(new Path(output)))
}

private def runBatch(klass: Class[_], args: List[String] = Nil): SessionInfo = {
val request = new CreateBatchRequest()
request.file = testLibPath.toString()
request.className = Some(klass.getName())
request.args = args
request.conf = Map("spark.yarn.maxAppAttempts" -> "1")

val response = httpClient.preparePost(s"$livyEndpoint/batches")
.setBody(mapper.writeValueAsString(request))
.execute()
.get()

assert(response.getStatusCode() === SC_CREATED)

val batchInfo = mapper.readValue(response.getResponseBodyAsStream(), classOf[SessionInfo])

val terminalStates = Set(SessionState.Error(), SessionState.Dead(), SessionState.Success())
.map(_.toString)

var finished = false
try {
eventually(timeout(1 minute), interval(1 second)) {
val response2 = httpClient.prepareGet(s"$livyEndpoint/batches/${batchInfo.id}")
.execute()
.get()
assert(response2.getStatusCode() === SC_OK)
val result = mapper.readValue(response2.getResponseBodyAsStream(), classOf[SessionInfo])
assert(terminalStates.contains(result.state))

finished = true
result
}
} finally {
if (!finished) {
httpClient.prepareDelete(s"$livyEndpoint/batches/${batchInfo.id}").execute()
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {

test("upload jar") {
assume(client != null, "Client not active.")

val testLib = sys.props("java.class.path")
.split(File.pathSeparator)
.find(new File(_).getName().startsWith("livy-test-lib-"))
.get

waitFor(client.uploadJar(new File(testLib)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
env.put(key, value)
}

env.asScala.foreach { case (k, v) => info(s" env: $k = $v") }

_redirectOutput.foreach(pb.redirectOutput)
_redirectError.foreach(pb.redirectError)
_redirectErrorStream.foreach(pb.redirectErrorStream)
Expand Down
40 changes: 40 additions & 0 deletions test-lib/src/main/java/com/cloudera/livy/test/apps/FailingApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cloudera.livy.test.apps;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FailingApp {

public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Missing output path.");
}
String output = args[0];

FileSystem fs = FileSystem.get(new Configuration());
Path out = new Path(output);
fs.create(out).close();

throw new IllegalStateException("This app always fails.");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cloudera.livy.test.apps;

import java.util.Arrays;
import java.util.List;

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

public class SimpleSparkApp {

public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Missing output path.");
}
String output = args[0];

JavaSparkContext sc = new JavaSparkContext();
try {
List<String> data = Arrays.asList("the", "quick", "brown", "fox", "jumped", "over", "the",
"lazy", "dog");

JavaPairRDD<String, Integer> rdd = sc.parallelize(data, 3)
.mapToPair(new Counter());
rdd.saveAsTextFile(output);
} finally {
sc.close();
}
}

private static class Counter implements PairFunction<String, String, Integer> {

@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, s.length());
}

}

}

0 comments on commit d609977

Please sign in to comment.