diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala index bca80ad53..4df224ba2 100644 --- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala +++ b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala @@ -21,6 +21,8 @@ package com.cloudera.livy.test.framework import java.io.File import java.util.UUID +import scala.concurrent._ +import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NonFatal @@ -30,6 +32,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.scalatest._ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with BeforeAndAfterAll { + import scala.concurrent.ExecutionContext.Implicits.global + var cluster: Cluster = _ var httpClient: AsyncHttpClient = _ var livyClient: LivyRestClient = _ @@ -50,6 +54,14 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with Befo appReport.getDiagnostics() } + protected def restartLivy(): Unit = { + val f = future { + cluster.stopLivy() + cluster.runLivy() + } + Await.result(f, 3 minutes) + } + /** Uploads a file to HDFS and returns just its path. */ protected def uploadToHdfs(file: File): String = { val hdfsPath = new Path(cluster.hdfsScratchDir(), diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala index 7c0bbbebc..3669ccfb1 100644 --- a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala +++ b/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala @@ -125,9 +125,7 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll { withTestLib(classOf[SimpleSparkApp], List(output2, "false")) { s2 => s2.verifySessionRunning() - // Restart Livy. - cluster.stopLivy() - cluster.runLivy() + restartLivy() // Verify previous active session still appears after restart. s2.verifySessionRunning() diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala index 120107cf0..b4e8c1dff 100644 --- a/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala +++ b/integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala @@ -153,9 +153,7 @@ class InteractiveIT extends BaseIntegrationTestSuite { val stmt1 = s.run("1") stmt1.verifyResult("res0: Int = 1") - // Restart Livy. - cluster.stopLivy() - cluster.runLivy() + restartLivy() // Verify session still exists. s.verifySessionIdle() @@ -165,9 +163,7 @@ class InteractiveIT extends BaseIntegrationTestSuite { s.stop() - // Restart Livy. - cluster.stopLivy() - cluster.runLivy() + restartLivy() // Verify deleted session doesn't show up after recovery. s.verifySessionDoesNotExist()