Skip to content

Commit

Permalink
[Java] Refine tests and fix single-process mode (ray-project#4265)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulchen authored Mar 7, 2019
1 parent 39eed24 commit f0465bc
Show file tree
Hide file tree
Showing 23 changed files with 127 additions and 108 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ matrix:
- JDK='Oracle JDK 8'
- PYTHON=3.5 PYTHONWARNINGS=ignore
- RAY_USE_CMAKE=1
- RAY_INSTALL_JAVA=1
install:
- ./ci/travis/install-dependencies.sh
- export PATH="$HOME/miniconda/bin:$PATH"
- ./ci/travis/install-ray.sh
script:
- ./java/test.sh

Expand Down
5 changes: 5 additions & 0 deletions java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public void shutdown() {
public MockObjectStore getObjectStore() {
return store;
}

@Override
public Worker getWorker() {
return ((MockRayletClient) rayletClient).getCurrentWorker();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.ray.api.RuntimeContext;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.task.TaskSpec;

public class RuntimeContextImpl implements RuntimeContext {
Expand All @@ -22,8 +21,10 @@ public UniqueId getCurrentDriverId() {

@Override
public UniqueId getCurrentActorId() {
Preconditions.checkState(runtime.rayConfig.workerMode == WorkerMode.WORKER);
return runtime.getWorker().getCurrentActorId();
Worker worker = runtime.getWorker();
Preconditions.checkState(worker != null && !worker.getCurrentActorId().isNil(),
"This method should only be called from an actor.");
return worker.getCurrentActorId();
}

@Override
Expand Down
3 changes: 1 addition & 2 deletions java/runtime/src/main/java/org/ray/runtime/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public void loop() {
* Execute a task.
*/
public void execute(TaskSpec spec) {
LOGGER.info("Executing task {}", spec.taskId);
LOGGER.debug("Executing task {}", spec);
UniqueId returnId = spec.returnIds[0];
ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
Expand Down Expand Up @@ -123,7 +122,7 @@ public void execute(TaskSpec spec) {
maybeLoadCheckpoint(result, returnId);
currentActor = result;
}
LOGGER.info("Finished executing task {}", spec.taskId);
LOGGER.debug("Finished executing task {}", spec.taskId);
} catch (Exception e) {
LOGGER.error("Error executing task " + spec, e);
if (!spec.isActorCreationTask()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,10 @@ public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
ArrayList<ObjectStoreData> rets = new ArrayList<>();
for (byte[] id : objectIds) {
try {
Constructor<ObjectStoreData> constructor = ObjectStoreData.class.getConstructor(
byte[].class, byte[].class);
Constructor<?> constructor = ObjectStoreData.class.getDeclaredConstructors()[0];
constructor.setAccessible(true);
rets.add(constructor.newInstance(metadata.get(new UniqueId(id)),
data.get(new UniqueId(id))));
rets.add((ObjectStoreData) constructor.newInstance(data.get(new UniqueId(id)),
metadata.get(new UniqueId(id))));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MockRayletClient implements RayletClient {
private final ExecutorService exec;
private final Deque<Worker> idleWorkers;
private final Map<UniqueId, Worker> actorWorkers;
private final ThreadLocal<Worker> currentWorker;

public MockRayletClient(RayDevRuntime runtime, int numberThreads) {
this.runtime = runtime;
Expand All @@ -48,6 +49,7 @@ public MockRayletClient(RayDevRuntime runtime, int numberThreads) {
exec = Executors.newFixedThreadPool(numberThreads);
idleWorkers = new LinkedList<>();
actorWorkers = new HashMap<>();
currentWorker = new ThreadLocal<>();
}

public synchronized void onObjectPut(UniqueId id) {
Expand All @@ -60,29 +62,36 @@ public synchronized void onObjectPut(UniqueId id) {
}
}

public Worker getCurrentWorker() {
return currentWorker.get();
}

/**
* Get a worker from the worker pool to run the given task.
*/
private Worker getWorker(TaskSpec task) {
if (task.isActorTask()) {
return actorWorkers.get(task.actorId);
}
Worker worker;
if (idleWorkers.size() > 0) {
worker = idleWorkers.pop();
if (task.isActorTask()) {
worker = actorWorkers.get(task.actorId);
} else {
worker = new Worker(runtime);
}
if (task.isActorCreationTask()) {
actorWorkers.put(task.actorCreationId, worker);
if (idleWorkers.size() > 0) {
worker = idleWorkers.pop();
} else {
worker = new Worker(runtime);
}
if (task.isActorCreationTask()) {
actorWorkers.put(task.actorCreationId, worker);
}
}
currentWorker.set(worker);
return worker;
}

/**
* Return the worker to the worker pool.
*/
private void returnWorker(Worker worker) {
currentWorker.remove();
idleWorkers.push(worker);
}

Expand All @@ -105,9 +114,7 @@ public synchronized void submitTask(TaskSpec task) {
new byte[]{}, new byte[]{});
}
} finally {
if (!task.isActorCreationTask() && !task.isActorTask()) {
returnWorker(worker);
}
returnWorker(worker);
}
});
} else {
Expand Down
2 changes: 1 addition & 1 deletion java/runtime/src/main/resources/ray.default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ ray {
// ----------------------------
dev-runtime {
// Number of threads that you process tasks
execution-parallelism: 5
execution-parallelism: 10
}

}
37 changes: 13 additions & 24 deletions java/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,25 @@

# Cause the script to exit if a single command fails.
set -e

# Show explicitly which commands are currently running.
set -x

ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
$ROOT_DIR/../build.sh -l java

pushd $ROOT_DIR/../java
echo "Compiling Java code."
mvn clean install -Dmaven.test.skip
check_style=$(mvn checkstyle:check)
echo "${check_style}"
[[ ${check_style} =~ "BUILD FAILURE" ]] && exit 1

# test raylet
mvn test | tee mvn_test
if [ `grep -c "BUILD FAILURE" mvn_test` -eq '0' ]; then
rm mvn_test
echo "Tests passed under CLUSTER mode!"
else
rm mvn_test
exit 1
fi
# test raylet under SINGLE_PROCESS mode
mvn test -Dray.run-mode=SINGLE_PROCESS | tee dev_mvn_test
if [ `grep -c "BUILD FAILURE" dev_mvn_test` -eq '0' ]; then
rm dev_mvn_test
echo "Tests passed under SINGLE_PROCESS mode!"
else
rm dev_mvn_test
exit 1
fi

echo "Checking code format."
mvn checkstyle:check

echo "Running tests under cluster mode."
ENABLE_MULTI_LANGUAGE_TESTS=1 mvn test

echo "Running tests under single-process mode."
mvn test -Dray.run-mode=SINGLE_PROCESS

set +x
set +e

popd
2 changes: 1 addition & 1 deletion java/test/src/main/java/org/ray/api/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class TestUtils {
public static void skipTestUnderSingleProcess() {
AbstractRayRuntime runtime = (AbstractRayRuntime)Ray.internal();
if (runtime.getRayConfig().runMode == RunMode.SINGLE_PROCESS) {
throw new SkipException("Skip case.");
throw new SkipException("This test doesn't work under single-process mode.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,9 @@ public int getPid() {
}
}

@Override
public void beforeEachCase() {
TestUtils.skipTestUnderSingleProcess();
}

@Test
public void testActorReconstruction() throws InterruptedException, IOException {
TestUtils.skipTestUnderSingleProcess();
ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1);
RayActor<Counter> actor = Ray.createActor(Counter::new, options);
// Call increase 3 times.
Expand Down Expand Up @@ -130,6 +126,8 @@ public void checkpointExpired(UniqueId actorId, UniqueId checkpointId) {

@Test
public void testActorCheckpointing() throws IOException, InterruptedException {
TestUtils.skipTestUnderSingleProcess();

ActorCreationOptions options = new ActorCreationOptions(new HashMap<>(), 1);
RayActor<CheckpointableCounter> actor = Ray.createActor(CheckpointableCounter::new, options);
// Call increase 3 times.
Expand All @@ -138,8 +136,6 @@ public void testActorCheckpointing() throws IOException, InterruptedException {
}
// Assert that the actor wasn't resumed from a checkpoint.
Assert.assertFalse(Ray.call(CheckpointableCounter::wasResumedFromCheckpoint, actor).get());

// Kill the actor process.
int pid = Ray.call(CheckpointableCounter::getPid, actor).get();
Runtime.getRuntime().exec("kill -9 " + pid);
// Wait for the actor to be killed.
Expand Down
2 changes: 2 additions & 0 deletions java/test/src/main/java/org/ray/api/test/ActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.annotation.RayRemote;
import org.ray.api.exception.UnreconstructableException;
import org.ray.api.id.UniqueId;
Expand Down Expand Up @@ -90,6 +91,7 @@ public void testForkingActorHandle() {

@Test
public void testUnreconstructableActorObject() throws InterruptedException {
TestUtils.skipTestUnderSingleProcess();
RayActor<Counter> counter = Ray.createActor(Counter::new, 100);
// Call an actor method.
RayObject value = Ray.call(Counter::getValue, counter);
Expand Down
25 changes: 9 additions & 16 deletions java/test/src/main/java/org/ray/api/test/BaseTest.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package org.ray.api.test;

import java.lang.reflect.Method;
import org.ray.api.Ray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

public class BaseTest {

private static final Logger LOGGER = LoggerFactory.getLogger(BaseTest.class);

@BeforeMethod
public void setUp() {
public void setUpBase(Method method) {
LOGGER.info("===== Running test: "
+ method.getDeclaringClass().getName() + "." + method.getName());
System.setProperty("ray.home", "../..");
System.setProperty("ray.resources", "CPU:4,RES-A:4");
beforeInitRay();
Ray.init();
beforeEachCase();
}

@AfterMethod
public void tearDown() {
public void tearDownBase() {
// TODO(qwang): This is double check to check that the socket file is removed actually.
// We could not enable this until `systemInfo` enabled.
//File rayletSocketFIle = new File(Ray.systemInfo().rayletSocketName());
Ray.shutdown();
afterShutdownRay();

//remove raylet socket file
//rayletSocketFIle.delete();
Expand All @@ -31,15 +35,4 @@ public void tearDown() {
System.clearProperty("ray.resources");
}

protected void beforeInitRay() {

}

protected void afterShutdownRay() {

}

protected void beforeEachCase() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ public class ClientExceptionTest extends BaseTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientExceptionTest.class);

@Override
public void beforeEachCase() {
TestUtils.skipTestUnderSingleProcess();
}

@Test
public void testWaitAndCrash() {
TestUtils.skipTestUnderSingleProcess();
UniqueId randomId = UniqueId.randomId();
RayObject<String> notExisting = new RayObjectImpl(randomId);

Expand Down
10 changes: 5 additions & 5 deletions java/test/src/main/java/org/ray/api/test/FailureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,29 @@ private static void assertTaskFailedWithRayTaskException(RayObject<?> rayObject)
}
}

@Override
public void beforeEachCase() {
TestUtils.skipTestUnderSingleProcess();
}

@Test
public void testNormalTaskFailure() {
TestUtils.skipTestUnderSingleProcess();
assertTaskFailedWithRayTaskException(Ray.call(FailureTest::badFunc));
}

@Test
public void testActorCreationFailure() {
TestUtils.skipTestUnderSingleProcess();
RayActor<BadActor> actor = Ray.createActor(BadActor::new, true);
assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor));
}

@Test
public void testActorTaskFailure() {
TestUtils.skipTestUnderSingleProcess();
RayActor<BadActor> actor = Ray.createActor(BadActor::new, false);
assertTaskFailedWithRayTaskException(Ray.call(BadActor::badMethod, actor));
}

@Test
public void testWorkerProcessDying() {
TestUtils.skipTestUnderSingleProcess();
try {
Ray.call(FailureTest::badFunc2).get();
Assert.fail("This line shouldn't be reached.");
Expand All @@ -90,6 +89,7 @@ public void testWorkerProcessDying() {

@Test
public void testActorProcessDying() {
TestUtils.skipTestUnderSingleProcess();
RayActor<BadActor> actor = Ray.createActor(BadActor::new, false);
try {
Ray.call(BadActor::badMethod2, actor).get();
Expand Down
Loading

0 comments on commit f0465bc

Please sign in to comment.