Skip to content

Commit

Permalink
Support file/jar upload.
Browse files Browse the repository at this point in the history
- Modified the servlet to add upload support
- Add 2 new APIs: `uploadFile` and `uploadJar` which upload the files from the client to Livy
- Added tests for the client and the servlet.

Closes apache#46
  • Loading branch information
harishreedharan committed Feb 3, 2016
1 parent deb195e commit 4274573
Show file tree
Hide file tree
Showing 25 changed files with 308 additions and 42 deletions.
15 changes: 15 additions & 0 deletions api/src/main/java/com/cloudera/livy/LivyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.cloudera.livy;

import java.io.File;
import java.net.URI;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -57,6 +58,13 @@ public interface LivyClient {
*/
void stop();

/**
* Upload a jar to be added to the Spark application classpath
* @param jar The local file to be uploaded
* @return A future that can be used to monitor this operation
*/
Future<?> uploadJar(File jar);

/**
* Adds a jar file to the running remote context.
*
Expand All @@ -69,6 +77,13 @@ public interface LivyClient {
*/
Future<?> addJar(URI uri);

/**
* Upload a file to be passed to the Spark application
* @param file The local file to be uploaded
* @return A future that can be used to monitor this operation
*/
Future<?> uploadFile(File file);

/**
* Adds a file to the running remote context.
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/test/java/com/cloudera/livy/TestClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.cloudera.livy;

import java.io.File;
import java.net.URI;
import java.util.Properties;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -60,11 +61,21 @@ public void stop() {
throw new UnsupportedOperationException();
}

@Override
public Future<?> uploadJar(File jar) {
throw new UnsupportedOperationException();
}

@Override
public Future<?> addJar(URI uri) {
throw new UnsupportedOperationException();
}

@Override
public Future<?> uploadFile(File file) {
throw new UnsupportedOperationException();
}

@Override
public Future<?> addFile(URI uri) {
throw new UnsupportedOperationException();
Expand Down
5 changes: 5 additions & 0 deletions client-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.1</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.cloudera.livy.client.http;

import java.io.File;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
Expand Down Expand Up @@ -112,16 +113,35 @@ public synchronized void stop() {
}
}

public Future<?> uploadJar(File jar) {
return uploadResource(jar, "upload-jar", "jar");
}

@Override
public Future<?> addJar(URI uri) {
return addResource("add-jar", uri);
}

public Future<?> uploadFile(File file) {
return uploadResource(file, "upload-file", "file");
}

@Override
public Future<?> addFile(URI uri) {
return addResource("add-file", uri);
}

private Future<?> uploadResource(final File file, final String command, final String paramName) {
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
conn.post(file, Void.class, paramName, "/%d/%s", sessionId, command);
return null;
}
};
return executor.submit(task);
}

private Future<?> addResource(final String command, final URI resource) {
if (resource.getScheme() == null || resource.getScheme() == "file") {
throw new IllegalArgumentException("Local resources are not yet supported: " + resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@

package com.cloudera.livy.client.http;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
Expand Down Expand Up @@ -96,11 +97,11 @@ synchronized void close() throws IOException {
}

synchronized <V> V delete(Class<V> retType, String uri, Object... uriParams) throws Exception {
return sendRequest(new HttpDelete(), retType, uri, uriParams);
return sendJSONRequest(new HttpDelete(), retType, uri, uriParams);
}

synchronized <V> V get(Class<V> retType, String uri, Object... uriParams) throws Exception {
return sendRequest(new HttpGet(), retType, uri, uriParams);
return sendJSONRequest(new HttpGet(), retType, uri, uriParams);
}

synchronized <V> V post(
Expand All @@ -111,22 +112,44 @@ synchronized <V> V post(
HttpPost post = new HttpPost();
byte[] bodyBytes = mapper.writeValueAsBytes(body);
post.setEntity(new ByteArrayEntity(bodyBytes));
return sendJSONRequest(post, retType, uri, uriParams);
}

synchronized <V> V post(
File f,
Class<V> retType,
String paramName,
String uri,
Object... uriParams) throws Exception {
HttpPost post = new HttpPost();
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.addPart(paramName, new FileBody(f));
post.setEntity(builder.build());
return sendRequest(post, retType, uri, uriParams);
}

private <V> V sendRequest(
private <V> V sendJSONRequest(
HttpRequestBase req,
Class<V> retType,
String uri,
Object... uriParams) throws Exception {
req.setURI(new URI(server.getScheme(), null, server.getHost(), server.getPort(),
uriRoot + String.format(uri, uriParams), null, null));
req.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON);
req.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
req.setHeader(HttpHeaders.CONTENT_ENCODING, "UTF-8");
return sendRequest(req, retType, uri, uriParams);
}

private <V> V sendRequest(
HttpRequestBase req,
Class<V> retType,
String uri,
Object... uriParams) throws Exception {
req.setURI(new URI(server.getScheme(), null, server.getHost(), server.getPort(),
uriRoot + String.format(uri, uriParams), null, null));
req.setURI(new URI(server.getScheme(), null, server.getHost(), server.getPort(),
uriRoot + String.format(uri, uriParams), null, null));

CloseableHttpResponse res = client.execute(req);
try {
try (CloseableHttpResponse res = client.execute(req)) {
int status = (res.getStatusLine().getStatusCode() / 100) * 100;
HttpEntity entity = res.getEntity();
if (status == HttpStatus.SC_OK) {
Expand All @@ -140,8 +163,6 @@ private <V> V sendRequest(
throw new IOException(String.format("%s: %s", res.getStatusLine().getReasonPhrase(),
error));
}
} finally {
res.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

package com.cloudera.livy.client.http

import java.io.{File, InputStream}
import java.net.{InetAddress, URI}
import java.nio.file.{Files, Paths}
import java.util.concurrent.{ExecutionException, Future => JFuture, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.AtomicLong
import javax.servlet.ServletContext

import org.mockito.ArgumentCaptor

import scala.concurrent.{ExecutionContext, Future}

import org.mockito.Matchers.{eq => meq, _}
Expand Down Expand Up @@ -123,6 +127,11 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll {
verify(session, times(1)).addJar(meq(juri))
}

withClient("should upload files and jars") {
uploadAndVerify("file")
uploadAndVerify("jar")
}

withClient("should time out handle get() call") {
// JobHandleImpl does exponential backoff checking the result of a job. Given an initial
// wait of 100ms, 4 iterations should result in a wait of 800ms, so the handle should at that
Expand All @@ -148,6 +157,24 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll {

}

private def uploadAndVerify(cmd: String): Unit = {
val f = File.createTempFile("uploadTestFile", cmd)
val expectedStr = "Test data"
val expectedData = expectedStr.getBytes()
Files.write(Paths.get(f.getAbsolutePath), expectedData)
val b = new Array[Byte](expectedData.length)
val captor = ArgumentCaptor.forClass(classOf[InputStream])
if (cmd == "file") {
client.uploadFile(f).get(TIMEOUT_S, TimeUnit.SECONDS)
verify(session, times(1)).addFile(captor.capture(), meq(f.getName))
} else {
client.uploadJar(f).get(TIMEOUT_S, TimeUnit.SECONDS)
verify(session, times(1)).addJar(captor.capture(), meq(f.getName))
}
captor.getValue.read(b)
assert(expectedStr === new String(b))
}

private def runJob(sync: Boolean, genStatusFn: Long => Seq[JobStatus]): (Long, JFuture[Long]) = {
val jobId = java.lang.Long.valueOf(ID_GENERATOR.incrementAndGet())
when(session.submitJob(any(classOf[Array[Byte]]))).thenReturn(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,21 @@ public void stop() {
}
}

@Override
public Future<?> uploadJar(File jar) {
throw new UnsupportedOperationException("Use addJar to add the jar to the remote context!");
}

@Override
public Future<?> addJar(URI uri) {
return run(new AddJarJob(uri.toString()));
}

@Override
public Future<?> uploadFile(File file) {
throw new UnsupportedOperationException("Use addFile to add the file to the remote context!");
}

@Override
public Future<?> addFile(URI uri) {
return run(new AddFileJob(uri.toString()));
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object LivyConf {
val SPARK_HOME_KEY = "livy.server.spark-home"
val SPARK_SUBMIT_KEY = "livy.server.spark-submit"
val IMPERSONATION_ENABLED_KEY = "livy.impersonation.enabled"
val LIVY_HOME_KEY = "livy.home"

sealed trait SessionKind
case class Process() extends SessionKind
Expand Down Expand Up @@ -108,6 +109,8 @@ class LivyConf(loadDefaults: Boolean) {
/** Return the location of the spark home directory */
def sparkHome(): Option[String] = getOption(SPARK_HOME_KEY).orElse(sys.env.get("SPARK_HOME"))

def livyHome(): Option[String] = getOption(LIVY_HOME_KEY).orElse(sys.env.get("LIVY_HOME"))

/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
getOption(SPARK_SUBMIT_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ package com.cloudera.livy.sessions

abstract class SessionFactory[S <: Session, R] {

protected var livyHome: String = null

def create(id: Int, createRequest: R): S

def setLivyHome(livyHome: String): Unit = {
this.livyHome = livyHome
}

def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.cloudera.livy.sessions

import java.nio.file.Files
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -30,7 +31,7 @@ object SessionManager {
val SESSION_TIMEOUT = "livy.server.session.timeout"
}

class SessionManager[S <: Session, R](livyConf: LivyConf, factory: SessionFactory[S, R])
class SessionManager[S <: Session, R](val livyConf: LivyConf, factory: SessionFactory[S, R])
extends Logging {

private implicit def executor: ExecutionContext = ExecutionContext.global
Expand All @@ -41,6 +42,19 @@ class SessionManager[S <: Session, R](livyConf: LivyConf, factory: SessionFactor
private[this] final val sessionTimeout =
TimeUnit.MILLISECONDS.toNanos(livyConf.getLong(SessionManager.SESSION_TIMEOUT, 1000 * 60 * 60))

val livyHome = livyConf.livyHome().getOrElse {
val isTest = sys.env.get("livy.test").map(_ == "true").isDefined
if (isTest) {
Files.createTempDirectory("livyTemp").toUri.toString
} else {
throw new RuntimeException("livy.home must be specified!")
}
}

factory.setLivyHome(livyHome)

logger.info(s"Live Home = $livyHome")

private[this] final val garbageCollector = new GarbageCollector

garbageCollector.setDaemon(true)
Expand Down
Loading

0 comments on commit 4274573

Please sign in to comment.