Skip to content

Commit

Permalink
[LIVY-245][RSC] Add support shared variables across Jobs
Browse files Browse the repository at this point in the history
Currently we cannot share variables across different Jobs in Livy, so here propose to add a cache layer in RSC to store shared objects. This cache followed LRU, the least not used will be removed when exceeding limits.

This work is based on alex-the-man 's work.

Author: jerryshao <[email protected]>

Closes apache#19 from jerryshao/LIVY-245.
  • Loading branch information
jerryshao committed Oct 20, 2017
1 parent 1bbefe6 commit bef5b4f
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 15 deletions.
13 changes: 10 additions & 3 deletions api/src/main/java/org/apache/livy/JobContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.livy;

import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.NoSuchElementException;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand All @@ -47,6 +45,15 @@ public interface JobContext {
/** Returns the JavaStreamingContext which has already been created. */
JavaStreamingContext streamingctx();

/** Get shared object */
<E> E getSharedObject(String name) throws NoSuchElementException;

/** Set shared object, it will replace the old one if already existed */
<E> void setSharedObject(String name, E object);

/** Remove shared object from cache */
<E> E removeSharedObject(String name);

/**
* Creates the SparkStreaming context.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logg
assert(result === "hello")
}

test("share variables across jobs") {
assume(client2 != null, "Client not active.")
waitFor(client2.submit(new SharedVariableCounter("x"))) shouldBe 0
waitFor(client2.submit(new SharedVariableCounter("x"))) shouldBe 1
}

scalaTest("run scala jobs") {
assume(client2 != null, "Client not active.")

Expand All @@ -173,6 +179,11 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logg
val result = waitFor(client2.submit(job))
assert(result === job.value)
}

(0 until 2).foreach { i =>
val result = waitFor(client2.submit(new ScalaSharedVariableCounter("test")))
assert(i === result)
}
}

protected def scalaTest(desc: String)(testFn: => Unit): Unit = {
Expand Down
28 changes: 28 additions & 0 deletions repl/src/main/resources/fake_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import print_function
import ast
from collections import OrderedDict
import datetime
import decimal
import io
Expand Down Expand Up @@ -100,6 +101,8 @@ def __init__(self):
self.streaming_ctx = None
self.local_tmp_dir_path = local_tmp_dir_path
self.spark_session = global_dict.get('spark')
self.shared_variables = OrderedDict()
self.max_var_size = 100

def sc(self):
return self.sc
Expand Down Expand Up @@ -149,6 +152,31 @@ def stop(self):
def spark_session(self):
return self.spark_session

def get_shared_object(self, name):
with self.lock:
try:
var = self.shared_variables[name]
del self.shared_variables[name]
self.shared_variables[name] = var
except:
var = None

return var

def set_shared_object(self, name, object):
with self.lock:
self.shared_variables[name] = object

while len(self.shared_variables) > self.max_var_size:
self.popitem(last=False)

def remove_shared_object(self, name):
with self.lock:
try:
del self.shared_variables[name]
except:
pass


class PySparkJobProcessorImpl(object):
def processBypassJob(self, serialized_job):
Expand Down
2 changes: 1 addition & 1 deletion repl/src/main/scala/org/apache/livy/repl/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Session(
private var _state: SessionState = SessionState.NotStarted()

// Number of statements kept in driver's memory
private val numRetainedStatements = livyConf.getInt(RSCConf.Entry.RETAINED_STATEMENT_NUMBER)
private val numRetainedStatements = livyConf.getInt(RSCConf.Entry.RETAINED_STATEMENTS)

private val _statements = new JLinkedHashMap[Int, Statement] {
protected override def removeEldestEntry(eldest: Entry[Int, Statement]): Boolean = {
Expand Down
2 changes: 1 addition & 1 deletion repl/src/test/scala/org/apache/livy/repl/SessionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SessionSpec extends FunSpec with Eventually with LivyBaseUnitTestSuite wit
}

it("should remove old statements when reaching threshold") {
rscConf.set(RSCConf.Entry.RETAINED_STATEMENT_NUMBER, 2)
rscConf.set(RSCConf.Entry.RETAINED_STATEMENTS, 2)
session = new Session(rscConf, new SparkConf())
session.start()

Expand Down
7 changes: 4 additions & 3 deletions rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public static enum Entry implements ConfEntry {
JOB_CANCEL_TRIGGER_INTERVAL("job-cancel.trigger-interval", "100ms"),
JOB_CANCEL_TIMEOUT("job-cancel.timeout", "30s"),

RETAINED_STATEMENT_NUMBER("retained-statements", 100);
RETAINED_STATEMENTS("retained-statements", 100),
RETAINED_SHARE_VARIABLES("retained.share-variables", 100);

private final String key;
private final Object dflt;
Expand Down Expand Up @@ -157,7 +158,7 @@ public String findLocalAddress() throws IOException {
put(RSCConf.Entry.TEST_STUCK_START_DRIVER.key, DepConf.TEST_STUCK_START_DRIVER);
put(RSCConf.Entry.JOB_CANCEL_TRIGGER_INTERVAL.key, DepConf.JOB_CANCEL_TRIGGER_INTERVAL);
put(RSCConf.Entry.JOB_CANCEL_TIMEOUT.key, DepConf.JOB_CANCEL_TIMEOUT);
put(RSCConf.Entry.RETAINED_STATEMENT_NUMBER.key, DepConf.RETAINED_STATEMENT_NUMBER);
put(RSCConf.Entry.RETAINED_STATEMENTS.key, DepConf.RETAINED_STATEMENTS);
}});

// Maps deprecated key to DeprecatedConf with the same key.
Expand All @@ -183,7 +184,7 @@ static enum DepConf implements DeprecatedConf {
TEST_STUCK_START_DRIVER("test.do_not_use.stuck_start_driver", "0.4"),
JOB_CANCEL_TRIGGER_INTERVAL("job_cancel.trigger_interval", "0.4"),
JOB_CANCEL_TIMEOUT("job_cancel.timeout", "0.4"),
RETAINED_STATEMENT_NUMBER("retained_statements", "0.4");
RETAINED_STATEMENTS("retained_statements", "0.4");

private final String key;
private final String version;
Expand Down
51 changes: 51 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/driver/JobContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.livy.rsc.driver;

import java.io.File;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand All @@ -26,6 +29,7 @@
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.livy.JobContext;
import org.apache.livy.rsc.RSCConf;
import org.apache.livy.rsc.Utils;

class JobContextImpl implements JobContext {
Expand All @@ -35,17 +39,29 @@ class JobContextImpl implements JobContext {
private final RSCDriver driver;
private final SparkEntries sparkEntries;

// Map to store shared variables across different jobs.
private final LinkedHashMap<String, Object> sharedVariables;

public JobContextImpl(SparkEntries sparkEntries, File localTmpDir, RSCDriver driver) {
this.sparkEntries = sparkEntries;

this.localTmpDir = localTmpDir;
this.driver = driver;
final int retainedVariables = driver.livyConf.getInt(RSCConf.Entry.RETAINED_SHARE_VARIABLES);
this.sharedVariables = new LinkedHashMap<String, Object>() {
@Override
protected boolean removeEldestEntry(Map.Entry<String, Object> eldest) {
return size() > retainedVariables;
}
};
}

@Override
public JavaSparkContext sc() {
return sparkEntries.sc();
}

@SuppressWarnings("unchecked")
@Override
public Object sparkSession() throws Exception {
return sparkEntries.sparkSession();
Expand All @@ -67,6 +83,41 @@ public synchronized JavaStreamingContext streamingctx(){
return streamingctx;
}

@SuppressWarnings("unchecked")
@Override
public Object getSharedObject(String name) throws NoSuchElementException {
Object obj;
synchronized (sharedVariables) {
// Remove the entry and insert again to achieve LRU.
obj = sharedVariables.remove(name);
if (obj == null) {
throw new NoSuchElementException("Cannot find shared variable named " + name);
}
sharedVariables.put(name, obj);
}

return obj;

}

@Override
public void setSharedObject(String name, Object object) {
synchronized (sharedVariables) {
sharedVariables.put(name, object);
}
}

@SuppressWarnings("unchecked")
@Override
public Object removeSharedObject(String name) {
Object obj;
synchronized (sharedVariables) {
obj = sharedVariables.remove(name);
}

return obj;
}

@Override
public synchronized void createStreamingContext(long batchDuration) {
Utils.checkState(streamingctx == null, "Streaming context is not null.");
Expand Down
64 changes: 57 additions & 7 deletions rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@
import org.apache.livy.LivyClientBuilder;
import org.apache.livy.client.common.Serializer;
import org.apache.livy.rsc.rpc.RpcException;
import org.apache.livy.test.jobs.Echo;
import org.apache.livy.test.jobs.Failure;
import org.apache.livy.test.jobs.FileReader;
import org.apache.livy.test.jobs.GetCurrentUser;
import org.apache.livy.test.jobs.SQLGetTweets;
import org.apache.livy.test.jobs.Sleeper;
import org.apache.livy.test.jobs.SmallCount;
import org.apache.livy.test.jobs.*;
import static org.apache.livy.rsc.RSCConf.Entry.*;

public class TestSparkClient {
Expand All @@ -79,6 +73,7 @@ private Properties createConf(boolean local) {
conf.put(LIVY_JARS.key(), "");
conf.put("spark.repl.enableHiveContext", "true");
conf.put("spark.sql.catalogImplementation", "hive");
conf.put(RETAINED_SHARE_VARIABLES.key(), "2");
return conf;
}

Expand Down Expand Up @@ -114,6 +109,61 @@ public void call(LivyClient client) throws Exception {
});
}

@Test
public void testSharedVariable() throws Exception {
runTest(true, new TestFunction() {
@Override
void call(LivyClient client) throws Exception {
for (int i = 0; i < 2; i ++) {
JobHandle<Integer> handler = client.submit(new SharedVariableCounter("test"));
assertEquals(Integer.valueOf(i), handler.get(TIMEOUT, TimeUnit.SECONDS));
}
}
});
}

private static class SharedVariableTest implements Job<Void> {

@Override
public Void call(JobContext jc) throws Exception {
jc.setSharedObject("1", 1);
jc.setSharedObject("2", 2);

Integer obj = jc.getSharedObject("1");
assertEquals(obj, Integer.valueOf(1));

jc.setSharedObject("3", 3);

Exception e = null;
try {
jc.getSharedObject("2");
} catch (NoSuchElementException exp) {
e = exp;
}

assertNotNull(e);

obj = jc.removeSharedObject("2");
assertNull(obj);

obj = jc.removeSharedObject("3");
assertEquals(obj, Integer.valueOf(3));

return null;
}
}

@Test
public void testRemoveSharedVariableWithLRU() throws Exception {
runTest(true, new TestFunction() {
@Override
void call(LivyClient client) throws Exception {
JobHandle<Void> handler = client.submit(new SharedVariableTest());
handler.get(TIMEOUT, TimeUnit.SECONDS);
}
});
}

@Test
public void testJobFailure() throws Exception {
runTest(true, new TestFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ class ScalaJobContext private[livy] (context: JobContext) {

def sparkSession[E]: E = context.sparkSession()

/** Set shared object, it will replace the old one if already existed */
def setSharedVariable[E](name: String, obj: E): Unit = context.setSharedObject(name, obj)

/** Get shared object */
def getSharedVariable[E](name: String): E = context.getSharedObject(name)

/** Remove shared object from cache */
def removeSharedVariable[E](name: String): E = context.removeSharedObject(name)

/**
* Creates the SparkStreaming context.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.test.jobs;

import java.util.NoSuchElementException;

import org.apache.livy.Job;
import org.apache.livy.JobContext;

public class SharedVariableCounter implements Job<Integer> {

private final String name;

public SharedVariableCounter(String name) {
this.name = name;
}

@Override
public Integer call(JobContext jc) {
Integer value = -1;

try {
value = jc.getSharedObject(name);
} catch (NoSuchElementException e) {
jc.setSharedObject(name, value);
}

Integer newValue = value + 1;
jc.setSharedObject(name, newValue);

return newValue;
}
}
Loading

0 comments on commit bef5b4f

Please sign in to comment.