From 24e10b265572b9666d329e0b2f275f5c836a0a7e Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Thu, 12 May 2016 14:41:52 -0700
Subject: [PATCH] LIVY-147. Prepend default FS to user-provided paths when
 needed.

Spark's default behavior is to treat schema-less URIs as local, which
doesn't make a whole lot of sense for Livy (since clients most probably
don't share Livy's local filesystem).

So, instead, prepend Hadoop's default FS to paths when they don't already
have a scheme, so that users don't have to figure that out by themselves.
This should also make session requests more backwards compatible with v0.1,
since a similar functionality existed there in certain cases.

Closes #133
---
 .../java/com/cloudera/livy/LivyClient.java    | 10 ++++--
 .../cloudera/livy/client/http/HttpClient.java |  4 ---
 .../framework/BaseIntegrationTestSuite.scala  | 10 ++++++
 .../com/cloudera/livy/test/BatchIT.scala      | 17 ++++-----
 .../com/cloudera/livy/test/JobApiIT.scala     | 19 +++++++++-
 .../java/com/cloudera/livy/rsc/RSCClient.java |  6 ++--
 .../scala/com/cloudera/livy/LivyConf.scala    |  4 +++
 .../livy/server/batch/BatchSession.scala      |  9 ++---
 .../interactive/InteractiveSession.scala      | 18 ++++++----
 .../InteractiveSessionServlet.scala           |  4 +--
 .../com/cloudera/livy/sessions/Session.scala  | 35 ++++++++++++++++---
 .../cloudera/livy/test/jobs/FileReader.java   |  6 +++-
 12 files changed, 105 insertions(+), 37 deletions(-)

diff --git a/api/src/main/java/com/cloudera/livy/LivyClient.java b/api/src/main/java/com/cloudera/livy/LivyClient.java
index da08d2eb1..73e318f01 100644
--- a/api/src/main/java/com/cloudera/livy/LivyClient.java
+++ b/api/src/main/java/com/cloudera/livy/LivyClient.java
@@ -71,10 +71,13 @@ public interface LivyClient {
 
   /**
    * Adds a jar file to the running remote context.
-   *
+   * <p>
    * Note that the URL should be reachable by the Spark driver process. If running the driver
    * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
    * on that node (and not on the client machine).
+   * <p>
+   * If the provided URI has no scheme, it's considered to be relative to the default file system
+   * configured in the Livy server.
    *
    * @param uri The location of the jar file.
    * @return A future that can be used to monitor the operation.
@@ -90,10 +93,13 @@ public interface LivyClient {
 
   /**
    * Adds a file to the running remote context.
-   *
+   * <p>
    * Note that the URL should be reachable by the Spark driver process. If running the driver
    * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
    * on that node (and not on the client machine).
+   * <p>
+   * If the provided URI has no scheme, it's considered to be relative to the default file system
+   * configured in the Livy server.
    *
    * @param uri The location of the file.
    * @return A future that can be used to monitor the operation.
diff --git a/client-http/src/main/java/com/cloudera/livy/client/http/HttpClient.java b/client-http/src/main/java/com/cloudera/livy/client/http/HttpClient.java
index 5a8247966..466b43300 100644
--- a/client-http/src/main/java/com/cloudera/livy/client/http/HttpClient.java
+++ b/client-http/src/main/java/com/cloudera/livy/client/http/HttpClient.java
@@ -160,10 +160,6 @@ public Void call() throws Exception {
   }
 
   private Future<?> addResource(final String command, final URI resource) {
-    if (resource.getScheme() == null || resource.getScheme() == "file") {
-      throw new IllegalArgumentException("Local resources are not yet supported: " + resource);
-    }
-
     Callable<Void> task = new Callable<Void>() {
       @Override
       public Void call() throws Exception {
diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
index 119f9177c..33051fed9 100644
--- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
+++ b/integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala
@@ -19,6 +19,7 @@
 package com.cloudera.livy.test.framework
 
 import java.io.File
+import java.util.UUID
 import javax.servlet.http.HttpServletResponse
 
 import scala.concurrent.duration._
@@ -29,6 +30,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import com.ning.http.client.AsyncHttpClient
+import org.apache.hadoop.fs.Path
 import org.scalatest._
 import org.scalatest.concurrent.Eventually._
 
@@ -69,6 +71,14 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
     }
   }
 
+  /** Uploads a file to HDFS and returns just its path. */
+  protected def uploadToHdfs(file: File): String = {
+    val hdfsPath = new Path(cluster.hdfsScratchDir(),
+      UUID.randomUUID().toString() + "-" + file.getName())
+    cluster.fs.copyFromLocalFile(new Path(file.toURI()), hdfsPath)
+    hdfsPath.toUri().getPath()
+  }
+
   /** Wrapper around test() to be used by pyspark tests. */
   protected def pytest(desc: String)(testFn: => Unit): Unit = {
     test(desc) {
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
index be9979910..aba9b2ecb 100644
--- a/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
+++ b/integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala
@@ -37,13 +37,10 @@ import com.cloudera.livy.test.framework.BaseIntegrationTestSuite
 
 class BatchIT extends BaseIntegrationTestSuite {
 
-  private var testLibPath: Path = _
+  private var testLibPath: String = _
 
   test("upload test lib") {
-    val hdfsPath = new Path(cluster.hdfsScratchDir(),
-      "testlib-" + UUID.randomUUID().toString() + ".jar")
-    cluster.fs.copyFromLocalFile(new Path(new File(testLib).toURI()), hdfsPath)
-    testLibPath = cluster.fs.makeQualified(hdfsPath)
+    testLibPath = uploadToHdfs(new File(testLib))
   }
 
   test("submit spark app") {
@@ -68,7 +65,7 @@ class BatchIT extends BaseIntegrationTestSuite {
   pytest("submit a pyspark application") {
     val hdfsPath = uploadResource("pytest.py")
     val output = newOutputPath()
-    val result = runScript(hdfsPath.toString, args = List(output))
+    val result = runScript(hdfsPath, args = List(output))
     assert(result.state === SessionState.Success().toString)
     assert(cluster.fs.isDirectory(new Path(output)))
   }
@@ -77,7 +74,7 @@ class BatchIT extends BaseIntegrationTestSuite {
   // TODO comment in Spark's ApplicationMaster.scala.
   ignore("submit a SparkR application") {
     val hdfsPath = uploadResource("rtest.R")
-    val result = runScript(hdfsPath.toString)
+    val result = runScript(hdfsPath)
     assert(result.state === SessionState.Success().toString)
   }
 
@@ -85,7 +82,7 @@ class BatchIT extends BaseIntegrationTestSuite {
     cluster.hdfsScratchDir().toString() + "/" + UUID.randomUUID().toString()
   }
 
-  private def uploadResource(name: String): Path = {
+  private def uploadResource(name: String): String = {
     val hdfsPath = new Path(cluster.hdfsScratchDir(), UUID.randomUUID().toString() + "-" + name)
     val in = getClass.getResourceAsStream("/" + name)
     val out = cluster.fs.create(hdfsPath)
@@ -95,7 +92,7 @@ class BatchIT extends BaseIntegrationTestSuite {
       in.close()
       out.close()
     }
-    cluster.fs.makeQualified(hdfsPath)
+    hdfsPath.toUri().getPath()
   }
 
   private def runScript(script: String, args: List[String] = Nil): SessionInfo = {
@@ -107,7 +104,7 @@ class BatchIT extends BaseIntegrationTestSuite {
 
   private def runSpark(klass: Class[_], args: List[String] = Nil): SessionInfo = {
     val request = new CreateBatchRequest()
-    request.file = testLibPath.toString()
+    request.file = testLibPath
     request.className = Some(klass.getName())
     request.args = args
     runBatch(request)
diff --git a/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala b/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
index 564d25040..10854acba 100644
--- a/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
+++ b/integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala
@@ -18,15 +18,19 @@
 
 package com.cloudera.livy.test
 
-import java.io.File
+import java.io.{File, FileOutputStream}
 import java.net.URI
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
+import java.util.UUID
 import java.util.concurrent.{Future => JFuture, TimeUnit}
+import java.util.jar.JarOutputStream
+import java.util.zip.ZipEntry
 import javax.servlet.http.HttpServletResponse
 
 import scala.util.Try
 
+import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterAll
 
 import com.cloudera.livy.{LivyClient, LivyClientBuilder, Logging}
@@ -98,6 +102,19 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logg
     assert(result === "hello")
   }
 
+  test("add file from HDFS") {
+    assume(client != null, "Client not active.")
+    val file = Files.createTempFile("filetest2", ".txt")
+    Files.write(file, "hello".getBytes(UTF_8))
+
+    val uri = new URI(uploadToHdfs(file.toFile()))
+    waitFor(client.addFile(uri))
+
+    val task = new FileReader(new File(uri.getPath()).getName(), false)
+    val result = waitFor(client.submit(task))
+    assert(result === "hello")
+  }
+
   test("run simple jobs") {
     assume(client != null, "Client not active.")
 
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
index 7c4f4259f..e7db6d9f3 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
@@ -219,8 +219,10 @@ public synchronized void stop(boolean shutdownContext) {
 
         eventLoopGroup.shutdownGracefully();
       }
-      LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId,
-        shutdownContext);
+      if (contextInfo != null) {
+        LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId,
+          shutdownContext);
+      }
     }
   }
 
diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala
index 89771d3f7..e7bfc3115 100644
--- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala
+++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala
@@ -22,6 +22,8 @@ import java.io.File
 import java.lang.{Boolean => JBoolean, Long => JLong}
 import java.nio.file.Files
 
+import org.apache.hadoop.conf.Configuration
+
 import com.cloudera.livy.client.common.ClientConf
 import com.cloudera.livy.client.common.ClientConf.ConfEntry
 
@@ -57,6 +59,8 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
 
   private lazy val _superusers = Option(get(SUPERUSERS)).map(_.split("[, ]+").toSeq).getOrElse(Nil)
 
+  lazy val hadoopConf = new Configuration()
+
   /**
    * Create a LivyConf that loads defaults from the system properties and the classpath.
    * @return
diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala
index 67c9b9174..0c1aff0c2 100644
--- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala
@@ -41,9 +41,9 @@ class BatchSession(
     builder.conf(request.conf)
     proxyUser.foreach(builder.proxyUser)
     request.className.foreach(builder.className)
-    request.jars.foreach(builder.jar)
-    request.pyFiles.foreach(builder.pyFile)
-    request.files.foreach(builder.file)
+    resolveURIs(request.jars).foreach(builder.jar)
+    resolveURIs(request.pyFiles).foreach(builder.pyFile)
+    resolveURIs(request.files).foreach(builder.file)
     request.driverMemory.foreach(builder.driverMemory)
     request.driverCores.foreach(builder.driverCores)
     request.executorMemory.foreach(builder.executorMemory)
@@ -55,7 +55,8 @@ class BatchSession(
     builder.redirectOutput(Redirect.PIPE)
     builder.redirectErrorStream(true)
 
-    builder.start(Some(request.file), request.args)
+    val file = resolveURIs(Seq(request.file))(0)
+    builder.start(Some(file), request.args)
   }
 
   protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
index d5e92f08b..c8ee7f457 100644
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
@@ -79,8 +79,9 @@ class InteractiveSession(
     kind match {
       case PySpark() =>
         val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil
+        val allPyFiles = pySparkFiles ++ resolveURIs(request.pyFiles)
         builder.setConf(SparkYarnIsPython, "true")
-        builder.setConf(SparkSubmitPyFiles, (pySparkFiles ++ request.pyFiles).mkString(","))
+        builder.setConf(SparkSubmitPyFiles, allPyFiles.mkString(","))
       case SparkR() =>
         val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
         sparkRArchive.foreach { archive =>
@@ -90,15 +91,18 @@ class InteractiveSession(
     }
     builder.setConf(RSCConf.Entry.SESSION_KIND.key, kind.toString)
 
-    val allJars = livyJars(livyConf) ++ request.jars
+    val allJars = livyJars(livyConf) ++ resolveURIs(request.jars)
 
-    def listToConf(lst: List[String]): Option[String] = {
+    def listToConf(lst: Seq[String]): Option[String] = {
       if (lst.size > 0) Some(lst.mkString(",")) else None
     }
 
+    val archives = resolveURIs(request.archives)
+    val files = resolveURIs(request.files)
+
     val userOpts: Map[Option[String], String] = Map(
-      listToConf(request.archives) -> "spark.yarn.dist.archives",
-      listToConf(request.files) -> "spark.files",
+      listToConf(archives) -> "spark.yarn.dist.archives",
+      listToConf(files) -> "spark.files",
       listToConf(allJars) -> "spark.jars",
       request.driverCores.map(_.toString) -> "spark.driver.cores",
       request.driverMemory.map(_.toString + "b") -> SparkLauncher.DRIVER_MEMORY,
@@ -192,12 +196,12 @@ class InteractiveSession(
 
   def addFile(uri: URI): Unit = {
     recordActivity()
-    client.addFile(uri).get()
+    client.addFile(resolveURI(uri)).get()
   }
 
   def addJar(uri: URI): Unit = {
     recordActivity()
-    client.addJar(uri).get()
+    client.addJar(resolveURI(uri)).get()
   }
 
   def jobStatus(id: Long): Any = {
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
index 819665fc3..b382f0ea6 100644
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
+++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala
@@ -18,7 +18,7 @@
 
 package com.cloudera.livy.server.interactive
 
-import java.net.{URI, URL}
+import java.net.URI
 import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
@@ -28,7 +28,7 @@ import scala.concurrent.duration._
 
 import org.json4s.jackson.Json4sScalaModule
 import org.scalatra._
-import org.scalatra.servlet.{FileUploadSupport, MultipartConfig}
+import org.scalatra.servlet.FileUploadSupport
 
 import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyConf, Logging}
 import com.cloudera.livy.client.common.HttpMessages._
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
index 346e97fd1..1d62c1b4b 100644
--- a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
+++ b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
@@ -19,14 +19,13 @@
 package com.cloudera.livy.sessions
 
 import java.io.InputStream
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 import java.security.PrivilegedExceptionAction
 import java.util.UUID
 import java.util.concurrent.TimeUnit
 
 import scala.concurrent.{ExecutionContext, Future}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.security.UserGroupInformation
@@ -66,7 +65,7 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) e
       if (stagingDir != null) {
         debug(s"Deleting session $id staging directory $stagingDir")
         doAsOwner {
-          val fs = FileSystem.newInstance(new Configuration())
+          val fs = FileSystem.newInstance(livyConf.hadoopConf)
           try {
             fs.delete(stagingDir, true)
           } finally {
@@ -107,7 +106,7 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) e
   }
 
   protected def copyResourceToHDFS(dataStream: InputStream, name: String): URI = doAsOwner {
-    val fs = FileSystem.newInstance(new Configuration())
+    val fs = FileSystem.newInstance(livyConf.hadoopConf)
 
     try {
       val filePath = new Path(getStagingDir(fs), name)
@@ -129,6 +128,34 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) e
     }
   }
 
+  /**
+   * Prepends the value of the "fs.defaultFS" configuration to any URIs that do not have a
+   * scheme. URIs are required to at least be absolute paths.
+   *
+   * @throws IllegalArgumentException If an invalid URI is found in the given list.
+   */
+  protected def resolveURIs(uris: Seq[String]): Seq[String] = {
+    val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
+    uris.filter(_.nonEmpty).map { _uri =>
+      val uri = try {
+        new URI(_uri)
+      } catch {
+        case e: URISyntaxException => throw new IllegalArgumentException(e)
+      }
+      resolveURI(uri).toString()
+    }
+  }
+
+  protected def resolveURI(uri: URI): URI = {
+    val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
+    if (uri.getScheme() == null) {
+      require(uri.getPath().startsWith("/"), s"Path '${uri.getPath()}' is not absolute.")
+      new URI(defaultFS + uri.getPath())
+    } else {
+      uri
+    }
+  }
+
   private def getStagingDir(fs: FileSystem): Path = synchronized {
     if (stagingDir == null) {
       val stagingRoot = Option(livyConf.get(LivyConf.SESSION_STAGING_DIR)).getOrElse {
diff --git a/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java b/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
index 5c29304ba..04ee82018 100644
--- a/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
+++ b/test-lib/src/main/java/com/cloudera/livy/test/jobs/FileReader.java
@@ -21,6 +21,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -53,7 +54,10 @@ public String call(Integer i) throws Exception {
       InputStream in;
       if (isResource) {
         ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-        in = ccl.getResourceAsStream("test.resource");
+        in = ccl.getResourceAsStream(fileName);
+        if (in == null) {
+          throw new IOException("Resource not found: " + fileName);
+        }
       } else {
         in = new FileInputStream(SparkFiles.get(fileName));
       }