Skip to content

Commit

Permalink
LIVY-147. Prepend default FS to user-provided paths when needed.
Browse files Browse the repository at this point in the history
Spark's default behavior is to treat schema-less URIs as local, which
doesn't make a whole lot of sense for Livy (since clients most probably
don't share Livy's local filesystem).

So, instead, prepend Hadoop's default FS to paths when they don't already
have a scheme, so that users don't have to figure that out by themselves.
This should also make session requests more backwards compatible with v0.1,
since a similar functionality existed there in certain cases.

Closes apache#133
  • Loading branch information
Marcelo Vanzin committed May 13, 2016
1 parent 174851d commit 24e10b2
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 37 deletions.
10 changes: 8 additions & 2 deletions api/src/main/java/com/cloudera/livy/LivyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ public interface LivyClient {

/**
* Adds a jar file to the running remote context.
*
* <p>
* Note that the URL should be reachable by the Spark driver process. If running the driver
* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
* on that node (and not on the client machine).
* <p>
* If the provided URI has no scheme, it's considered to be relative to the default file system
* configured in the Livy server.
*
* @param uri The location of the jar file.
* @return A future that can be used to monitor the operation.
Expand All @@ -90,10 +93,13 @@ public interface LivyClient {

/**
* Adds a file to the running remote context.
*
* <p>
* Note that the URL should be reachable by the Spark driver process. If running the driver
* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist
* on that node (and not on the client machine).
* <p>
* If the provided URI has no scheme, it's considered to be relative to the default file system
* configured in the Livy server.
*
* @param uri The location of the file.
* @return A future that can be used to monitor the operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ public Void call() throws Exception {
}

private Future<?> addResource(final String command, final URI resource) {
if (resource.getScheme() == null || resource.getScheme() == "file") {
throw new IllegalArgumentException("Local resources are not yet supported: " + resource);
}

Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.cloudera.livy.test.framework

import java.io.File
import java.util.UUID
import javax.servlet.http.HttpServletResponse

import scala.concurrent.duration._
Expand All @@ -29,6 +30,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.ning.http.client.AsyncHttpClient
import org.apache.hadoop.fs.Path
import org.scalatest._
import org.scalatest.concurrent.Eventually._

Expand Down Expand Up @@ -69,6 +71,14 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
}
}

/** Uploads a file to HDFS and returns just its path. */
protected def uploadToHdfs(file: File): String = {
val hdfsPath = new Path(cluster.hdfsScratchDir(),
UUID.randomUUID().toString() + "-" + file.getName())
cluster.fs.copyFromLocalFile(new Path(file.toURI()), hdfsPath)
hdfsPath.toUri().getPath()
}

/** Wrapper around test() to be used by pyspark tests. */
protected def pytest(desc: String)(testFn: => Unit): Unit = {
test(desc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ import com.cloudera.livy.test.framework.BaseIntegrationTestSuite

class BatchIT extends BaseIntegrationTestSuite {

private var testLibPath: Path = _
private var testLibPath: String = _

test("upload test lib") {
val hdfsPath = new Path(cluster.hdfsScratchDir(),
"testlib-" + UUID.randomUUID().toString() + ".jar")
cluster.fs.copyFromLocalFile(new Path(new File(testLib).toURI()), hdfsPath)
testLibPath = cluster.fs.makeQualified(hdfsPath)
testLibPath = uploadToHdfs(new File(testLib))
}

test("submit spark app") {
Expand All @@ -68,7 +65,7 @@ class BatchIT extends BaseIntegrationTestSuite {
pytest("submit a pyspark application") {
val hdfsPath = uploadResource("pytest.py")
val output = newOutputPath()
val result = runScript(hdfsPath.toString, args = List(output))
val result = runScript(hdfsPath, args = List(output))
assert(result.state === SessionState.Success().toString)
assert(cluster.fs.isDirectory(new Path(output)))
}
Expand All @@ -77,15 +74,15 @@ class BatchIT extends BaseIntegrationTestSuite {
// TODO comment in Spark's ApplicationMaster.scala.
ignore("submit a SparkR application") {
val hdfsPath = uploadResource("rtest.R")
val result = runScript(hdfsPath.toString)
val result = runScript(hdfsPath)
assert(result.state === SessionState.Success().toString)
}

private def newOutputPath(): String = {
cluster.hdfsScratchDir().toString() + "/" + UUID.randomUUID().toString()
}

private def uploadResource(name: String): Path = {
private def uploadResource(name: String): String = {
val hdfsPath = new Path(cluster.hdfsScratchDir(), UUID.randomUUID().toString() + "-" + name)
val in = getClass.getResourceAsStream("/" + name)
val out = cluster.fs.create(hdfsPath)
Expand All @@ -95,7 +92,7 @@ class BatchIT extends BaseIntegrationTestSuite {
in.close()
out.close()
}
cluster.fs.makeQualified(hdfsPath)
hdfsPath.toUri().getPath()
}

private def runScript(script: String, args: List[String] = Nil): SessionInfo = {
Expand All @@ -107,7 +104,7 @@ class BatchIT extends BaseIntegrationTestSuite {

private def runSpark(klass: Class[_], args: List[String] = Nil): SessionInfo = {
val request = new CreateBatchRequest()
request.file = testLibPath.toString()
request.file = testLibPath
request.className = Some(klass.getName())
request.args = args
runBatch(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

package com.cloudera.livy.test

import java.io.File
import java.io.{File, FileOutputStream}
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.{Future => JFuture, TimeUnit}
import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry
import javax.servlet.http.HttpServletResponse

import scala.util.Try

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll

import com.cloudera.livy.{LivyClient, LivyClientBuilder, Logging}
Expand Down Expand Up @@ -98,6 +102,19 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logg
assert(result === "hello")
}

test("add file from HDFS") {
assume(client != null, "Client not active.")
val file = Files.createTempFile("filetest2", ".txt")
Files.write(file, "hello".getBytes(UTF_8))

val uri = new URI(uploadToHdfs(file.toFile()))
waitFor(client.addFile(uri))

val task = new FileReader(new File(uri.getPath()).getName(), false)
val result = waitFor(client.submit(task))
assert(result === "hello")
}

test("run simple jobs") {
assume(client != null, "Client not active.")

Expand Down
6 changes: 4 additions & 2 deletions rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ public synchronized void stop(boolean shutdownContext) {

eventLoopGroup.shutdownGracefully();
}
LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId,
shutdownContext);
if (contextInfo != null) {
LOG.debug("Disconnected from context {}, shutdown = {}.", contextInfo.clientId,
shutdownContext);
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.io.File
import java.lang.{Boolean => JBoolean, Long => JLong}
import java.nio.file.Files

import org.apache.hadoop.conf.Configuration

import com.cloudera.livy.client.common.ClientConf
import com.cloudera.livy.client.common.ClientConf.ConfEntry

Expand Down Expand Up @@ -57,6 +59,8 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {

private lazy val _superusers = Option(get(SUPERUSERS)).map(_.split("[, ]+").toSeq).getOrElse(Nil)

lazy val hadoopConf = new Configuration()

/**
* Create a LivyConf that loads defaults from the system properties and the classpath.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class BatchSession(
builder.conf(request.conf)
proxyUser.foreach(builder.proxyUser)
request.className.foreach(builder.className)
request.jars.foreach(builder.jar)
request.pyFiles.foreach(builder.pyFile)
request.files.foreach(builder.file)
resolveURIs(request.jars).foreach(builder.jar)
resolveURIs(request.pyFiles).foreach(builder.pyFile)
resolveURIs(request.files).foreach(builder.file)
request.driverMemory.foreach(builder.driverMemory)
request.driverCores.foreach(builder.driverCores)
request.executorMemory.foreach(builder.executorMemory)
Expand All @@ -55,7 +55,8 @@ class BatchSession(
builder.redirectOutput(Redirect.PIPE)
builder.redirectErrorStream(true)

builder.start(Some(request.file), request.args)
val file = resolveURIs(Seq(request.file))(0)
builder.start(Some(file), request.args)
}

protected implicit def executor: ExecutionContextExecutor = ExecutionContext.global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ class InteractiveSession(
kind match {
case PySpark() =>
val pySparkFiles = if (!LivyConf.TEST_MODE) findPySparkArchives() else Nil
val allPyFiles = pySparkFiles ++ resolveURIs(request.pyFiles)
builder.setConf(SparkYarnIsPython, "true")
builder.setConf(SparkSubmitPyFiles, (pySparkFiles ++ request.pyFiles).mkString(","))
builder.setConf(SparkSubmitPyFiles, allPyFiles.mkString(","))
case SparkR() =>
val sparkRArchive = if (!LivyConf.TEST_MODE) findSparkRArchive() else None
sparkRArchive.foreach { archive =>
Expand All @@ -90,15 +91,18 @@ class InteractiveSession(
}
builder.setConf(RSCConf.Entry.SESSION_KIND.key, kind.toString)

val allJars = livyJars(livyConf) ++ request.jars
val allJars = livyJars(livyConf) ++ resolveURIs(request.jars)

def listToConf(lst: List[String]): Option[String] = {
def listToConf(lst: Seq[String]): Option[String] = {
if (lst.size > 0) Some(lst.mkString(",")) else None
}

val archives = resolveURIs(request.archives)
val files = resolveURIs(request.files)

val userOpts: Map[Option[String], String] = Map(
listToConf(request.archives) -> "spark.yarn.dist.archives",
listToConf(request.files) -> "spark.files",
listToConf(archives) -> "spark.yarn.dist.archives",
listToConf(files) -> "spark.files",
listToConf(allJars) -> "spark.jars",
request.driverCores.map(_.toString) -> "spark.driver.cores",
request.driverMemory.map(_.toString + "b") -> SparkLauncher.DRIVER_MEMORY,
Expand Down Expand Up @@ -192,12 +196,12 @@ class InteractiveSession(

def addFile(uri: URI): Unit = {
recordActivity()
client.addFile(uri).get()
client.addFile(resolveURI(uri)).get()
}

def addJar(uri: URI): Unit = {
recordActivity()
client.addJar(uri).get()
client.addJar(resolveURI(uri)).get()
}

def jobStatus(id: Long): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.cloudera.livy.server.interactive

import java.net.{URI, URL}
import java.net.URI
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest

Expand All @@ -28,7 +28,7 @@ import scala.concurrent.duration._

import org.json4s.jackson.Json4sScalaModule
import org.scalatra._
import org.scalatra.servlet.{FileUploadSupport, MultipartConfig}
import org.scalatra.servlet.FileUploadSupport

import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyConf, Logging}
import com.cloudera.livy.client.common.HttpMessages._
Expand Down
35 changes: 31 additions & 4 deletions server/src/main/scala/com/cloudera/livy/sessions/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package com.cloudera.livy.sessions

import java.io.InputStream
import java.net.URI
import java.net.{URI, URISyntaxException}
import java.security.PrivilegedExceptionAction
import java.util.UUID
import java.util.concurrent.TimeUnit

import scala.concurrent.{ExecutionContext, Future}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -66,7 +65,7 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) e
if (stagingDir != null) {
debug(s"Deleting session $id staging directory $stagingDir")
doAsOwner {
val fs = FileSystem.newInstance(new Configuration())
val fs = FileSystem.newInstance(livyConf.hadoopConf)
try {
fs.delete(stagingDir, true)
} finally {
Expand Down Expand Up @@ -107,7 +106,7 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) e
}

protected def copyResourceToHDFS(dataStream: InputStream, name: String): URI = doAsOwner {
val fs = FileSystem.newInstance(new Configuration())
val fs = FileSystem.newInstance(livyConf.hadoopConf)

try {
val filePath = new Path(getStagingDir(fs), name)
Expand All @@ -129,6 +128,34 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf) e
}
}

/**
* Prepends the value of the "fs.defaultFS" configuration to any URIs that do not have a
* scheme. URIs are required to at least be absolute paths.
*
* @throws IllegalArgumentException If an invalid URI is found in the given list.
*/
protected def resolveURIs(uris: Seq[String]): Seq[String] = {
val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
uris.filter(_.nonEmpty).map { _uri =>
val uri = try {
new URI(_uri)
} catch {
case e: URISyntaxException => throw new IllegalArgumentException(e)
}
resolveURI(uri).toString()
}
}

protected def resolveURI(uri: URI): URI = {
val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
if (uri.getScheme() == null) {
require(uri.getPath().startsWith("/"), s"Path '${uri.getPath()}' is not absolute.")
new URI(defaultFS + uri.getPath())
} else {
uri
}
}

private def getStagingDir(fs: FileSystem): Path = synchronized {
if (stagingDir == null) {
val stagingRoot = Option(livyConf.get(LivyConf.SESSION_STAGING_DIR)).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -53,7 +54,10 @@ public String call(Integer i) throws Exception {
InputStream in;
if (isResource) {
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
in = ccl.getResourceAsStream("test.resource");
in = ccl.getResourceAsStream(fileName);
if (in == null) {
throw new IOException("Resource not found: " + fileName);
}
} else {
in = new FileInputStream(SparkFiles.get(fileName));
}
Expand Down

0 comments on commit 24e10b2

Please sign in to comment.