Skip to content

Commit

Permalink
[Java] Enable direct call by default. (ray-project#7408)
Browse files Browse the repository at this point in the history
* WIP

* Address comments.

* Linting

* Fix

* Fix

* Fix test

* Fix

* Fix single process ci

* Fix ut

* Update java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java

* Address comments

* Fix linting

* Minor update comments.

* Fix streaming CI
  • Loading branch information
jovany-wang authored Mar 13, 2020
1 parent 6993a47 commit f4656d8
Show file tree
Hide file tree
Showing 20 changed files with 34 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public class ActorCreationOptions extends BaseTaskOptions {
public final int maxConcurrency;

private ActorCreationOptions(Map<String, Double> resources, int maxReconstructions,
boolean useDirectCall, String jvmOptions, int maxConcurrency) {
super(resources, useDirectCall);
String jvmOptions, int maxConcurrency) {
super(resources);
this.maxReconstructions = maxReconstructions;
this.jvmOptions = jvmOptions;
this.maxConcurrency = maxConcurrency;
Expand All @@ -32,7 +32,6 @@ public static class Builder {

private Map<String, Double> resources = new HashMap<>();
private int maxReconstructions = NO_RECONSTRUCTION;
private boolean useDirectCall = DEFAULT_USE_DIRECT_CALL;
private String jvmOptions = null;
private int maxConcurrency = 1;

Expand All @@ -46,24 +45,15 @@ public Builder setMaxReconstructions(int maxReconstructions) {
return this;
}

// Since direct call is not fully supported yet (see issue #5559),
// users are not allowed to set the option to true.
// TODO (kfstorm): uncomment when direct call is ready.
// public Builder setUseDirectCall(boolean useDirectCall) {
// this.useDirectCall = useDirectCall;
// return this;
// }

public Builder setJvmOptions(String jvmOptions) {
this.jvmOptions = jvmOptions;
return this;
}

// The max number of concurrent calls to allow for this actor.
//
// This only works with direct actor calls. The max concurrency defaults to 1
// for threaded execution. Note that the execution order is not guaranteed
// when max_concurrency > 1.
// The max concurrency defaults to 1 for threaded execution.
// Note that the execution order is not guaranteed when max_concurrency > 1.
public Builder setMaxConcurrency(int maxConcurrency) {
if (maxConcurrency <= 0) {
throw new IllegalArgumentException("maxConcurrency must be greater than 0.");
Expand All @@ -75,7 +65,7 @@ public Builder setMaxConcurrency(int maxConcurrency) {

public ActorCreationOptions createActorCreationOptions() {
return new ActorCreationOptions(
resources, maxReconstructions, useDirectCall, jvmOptions, maxConcurrency);
resources, maxReconstructions, jvmOptions, maxConcurrency);
}
}

Expand Down
10 changes: 1 addition & 9 deletions java/api/src/main/java/org/ray/api/options/BaseTaskOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,21 @@
* The options class for RayCall or ActorCreation.
*/
public abstract class BaseTaskOptions {
// DO NOT set this environment variable. It's only used for test purposes.
// Please use `setUseDirectCall` instead.
public static final boolean DEFAULT_USE_DIRECT_CALL = "1"
.equals(System.getenv("DEFAULT_USE_DIRECT_CALL"));

public final Map<String, Double> resources;

public final boolean useDirectCall;

public BaseTaskOptions() {
resources = new HashMap<>();
useDirectCall = DEFAULT_USE_DIRECT_CALL;
}

public BaseTaskOptions(Map<String, Double> resources, boolean useDirectCall) {
public BaseTaskOptions(Map<String, Double> resources) {
for (Map.Entry<String, Double> entry : resources.entrySet()) {
if (entry.getValue().compareTo(0.0) <= 0) {
throw new IllegalArgumentException(String.format("Resource capacity should be " +
"positive, but got resource %s = %f.", entry.getKey(), entry.getValue()));
}
}
this.resources = resources;
this.useDirectCall = useDirectCall;
}

}
15 changes: 3 additions & 12 deletions java/api/src/main/java/org/ray/api/options/CallOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
*/
public class CallOptions extends BaseTaskOptions {

private CallOptions(Map<String, Double> resources, boolean useDirectCall) {
super(resources, useDirectCall);
private CallOptions(Map<String, Double> resources) {
super(resources);
}

/**
Expand All @@ -18,23 +18,14 @@ private CallOptions(Map<String, Double> resources, boolean useDirectCall) {
public static class Builder {

private Map<String, Double> resources = new HashMap<>();
private boolean useDirectCall = DEFAULT_USE_DIRECT_CALL;

public Builder setResources(Map<String, Double> resources) {
this.resources = resources;
return this;
}

// Since direct call is not fully supported yet (see issue #5559),
// users are not allowed to set the option to true.
// TODO (kfstorm): uncomment when direct call is ready.
// public Builder setUseDirectCall(boolean useDirectCall) {
// this.useDirectCall = useDirectCall;
// return this;
// }

public CallOptions createCallOptions() {
return new CallOptions(resources, useDirectCall);
return new CallOptions(resources);
}
}
}
14 changes: 3 additions & 11 deletions java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.ray.api.options.CallOptions;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtimecontext.RuntimeContext;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.context.RuntimeContextImpl;
import org.ray.runtime.context.WorkerContext;
Expand Down Expand Up @@ -166,7 +165,7 @@ public Callable wrapCallable(Callable callable) {
private RayObject callNormalFunction(FunctionDescriptor functionDescriptor,
Object[] args, int numReturns, CallOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false);
.wrap(args, functionDescriptor.getLanguage());
List<ObjectId> returnIds = taskSubmitter.submitTask(functionDescriptor,
functionArgs, numReturns, options);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
Expand All @@ -180,7 +179,7 @@ private RayObject callNormalFunction(FunctionDescriptor functionDescriptor,
private RayObject callActorFunction(RayActor rayActor,
FunctionDescriptor functionDescriptor, Object[] args, int numReturns) {
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage(), isDirectCall(rayActor));
.wrap(args, functionDescriptor.getLanguage());
List<ObjectId> returnIds = taskSubmitter.submitActorTask(rayActor,
functionDescriptor, functionArgs, numReturns, null);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
Expand All @@ -194,21 +193,14 @@ private RayObject callActorFunction(RayActor rayActor,
private RayActor createActorImpl(FunctionDescriptor functionDescriptor,
Object[] args, ActorCreationOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false);
.wrap(args, functionDescriptor.getLanguage());
if (functionDescriptor.getLanguage() != Language.JAVA && options != null) {
Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions));
}
RayActor actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options);
return actor;
}

private boolean isDirectCall(RayActor rayActor) {
if (rayActor instanceof NativeRayActor) {
return ((NativeRayActor) rayActor).isDirectCallActor();
}
return false;
}

public WorkerContext getWorkerContext() {
return workerContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.ray.api.RayActor;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.context.NativeWorkerContext;
import org.ray.runtime.functionmanager.FunctionManager;
Expand Down Expand Up @@ -137,9 +136,6 @@ public void setResource(String resourceName, double capacity, UniqueId nodeId) {

@Override
public void killActor(RayActor<?> actor) {
if (!((NativeRayActor) actor).isDirectCallActor()) {
throw new UnsupportedOperationException("Only direct call actors can be killed.");
}
nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ public Language getLanguage() {
return Language.forNumber(nativeGetLanguage(nativeCoreWorkerPointer, actorId));
}

public boolean isDirectCallActor() {
return nativeIsDirectCallActor(nativeCoreWorkerPointer, actorId);
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(toBytes());
Expand Down Expand Up @@ -119,9 +115,6 @@ protected void finalize() {
private static native int nativeGetLanguage(
long nativeCoreWorkerPointer, byte[] actorId);

private static native boolean nativeIsDirectCallActor(
long nativeCoreWorkerPointer, byte[] actorId);

static native List<String> nativeGetActorCreationTaskFunctionDescriptor(
long nativeCoreWorkerPointer, byte[] actorId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@

import java.util.ArrayList;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayObject;
import org.ray.api.id.ObjectId;
import org.ray.api.runtime.RayRuntime;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.generated.Common.Language;
import org.ray.runtime.object.NativeRayObject;
import org.ray.runtime.object.ObjectSerializer;
Expand All @@ -32,27 +28,17 @@ public class ArgumentsBuilder {
/**
* Convert real function arguments to task spec arguments.
*/
public static List<FunctionArg> wrap(Object[] args, Language language, boolean isDirectCall) {
public static List<FunctionArg> wrap(Object[] args, Language language) {
List<FunctionArg> ret = new ArrayList<>();
for (Object arg : args) {
ObjectId id = null;
NativeRayObject value = null;
if (arg instanceof RayObject) {
if (isDirectCall) {
throw new IllegalArgumentException(
"Passing RayObject to a direct call actor is not supported.");
}
id = ((RayObject) arg).getId();
} else {
value = ObjectSerializer.serialize(arg);
if (!isDirectCall && value.data.length > LARGEST_SIZE_PASS_BY_VALUE) {
RayRuntime runtime = Ray.internal();
if (runtime instanceof RayMultiWorkerNativeRuntime) {
runtime = ((RayMultiWorkerNativeRuntime) runtime).getCurrentRuntime();
}
id = ((AbstractRayRuntime) runtime).getObjectStore()
.putRaw(value);
value = null;
if (value.data.length > LARGEST_SIZE_PASS_BY_VALUE) {
// Do nothing since we are not support pass by reference in direct call.
}
}
if (language == Language.PYTHON) {
Expand Down
3 changes: 0 additions & 3 deletions java/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ echo "Running tests under cluster mode."
# bazel test //java:all_tests --action_env=ENABLE_MULTI_LANGUAGE_TESTS=1 --test_output="errors" || cluster_exit_code=$?
ENABLE_MULTI_LANGUAGE_TESTS=1 run_testng java -cp $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output $ROOT_DIR/testng.xml

echo "Running tests under cluster mode with direct actor call turned on."
ENABLE_MULTI_LANGUAGE_TESTS=1 DEFAULT_USE_DIRECT_CALL=1 run_testng java -cp $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output $ROOT_DIR/testng.xml

echo "Running tests under single-process mode."
# bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --test_output="errors" || single_exit_code=$?
run_testng java -Dray.run-mode="SINGLE_PROCESS" -cp $ROOT_DIR/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output $ROOT_DIR/testng.xml
Expand Down
26 changes: 5 additions & 21 deletions java/test/src/main/java/org/ray/api/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.function.Supplier;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.runtime.RayRuntime;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
Expand All @@ -20,8 +19,12 @@ public static class LargeObject implements Serializable {

private static final int WAIT_INTERVAL_MS = 5;

public static boolean isSingleProcessMode() {
return getRuntime().getRayConfig().runMode == RunMode.SINGLE_PROCESS;
}

public static void skipTestUnderSingleProcess() {
if (getRuntime().getRayConfig().runMode == RunMode.SINGLE_PROCESS) {
if (isSingleProcessMode()) {
throw new SkipException("This test doesn't work under single-process mode.");
}
}
Expand All @@ -32,25 +35,6 @@ public static void skipTestUnderClusterMode() {
}
}

public static boolean isDirectActorCallEnabled() {
return ActorCreationOptions.DEFAULT_USE_DIRECT_CALL;
}

public static void skipTestIfDirectActorCallEnabled() {
skipTestIfDirectActorCallEnabled(true);
}

private static void skipTestIfDirectActorCallEnabled(boolean enabled) {
if (enabled == ActorCreationOptions.DEFAULT_USE_DIRECT_CALL) {
throw new SkipException(String.format("This test doesn't work when direct actor call is %s.",
enabled ? "enabled" : "disabled"));
}
}

public static void skipTestIfDirectActorCallDisabled() {
skipTestIfDirectActorCallEnabled(false);
}

/**
* Wait until the given condition is met.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public String countDown() {
}

public void testConcurrentCall() {
TestUtils.skipTestIfDirectActorCallDisabled();
TestUtils.skipTestUnderSingleProcess();

ActorCreationOptions op = new ActorCreationOptions.Builder()
.setMaxConcurrency(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ public void testActorReconstruction() throws InterruptedException, IOException {
// Wait for the actor to be killed.
TimeUnit.SECONDS.sleep(1);

// Try calling increase on this actor again and check the value is now 4.
int value = actor.call(Counter::increase).get();
Assert.assertEquals(value, options.useDirectCall ? 1 : 4);
Assert.assertEquals(value, 1);

Assert.assertTrue(actor.call(Counter::wasCurrentActorReconstructed).get());

Expand Down
15 changes: 6 additions & 9 deletions java/test/src/main/java/org/ray/api/test/ActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,12 @@ public void testCreateAndCallActor() {
}

/**
* Test getting a direct object (an object that is returned by a direct-call task) twice from the
* object store.
* Test getting an object twice from the local memory store.
*
* Direct objects are stored in core worker's local memory. And it will be removed after the first
* Objects are stored in core worker's local memory. And it will be removed after the first
* get. To enable getting it twice, we cache the object in `RayObjectImpl`.
*
* NOTE(hchen): this test will run for non-direct actors as well, which doesn't have the above
* issue and should also succeed.
*/
public void testGetDirectObjectTwice() {
public void testGetObjectTwice() {
RayActor<Counter> actor = Ray.createActor(Counter::new, 1);
RayObject<Integer> result = actor.call(Counter::getValue);
Assert.assertEquals(result.get(), Integer.valueOf(1));
Expand Down Expand Up @@ -122,11 +118,12 @@ public void testPassActorAsParameter() {
.get());
}

// TODO(qwang): Will re-enable this test case once ref counting is supported in Java.
@Test(enabled = false)
public void testUnreconstructableActorObject() throws InterruptedException {
TestUtils.skipTestUnderSingleProcess();

// The UnreconstructableException is created by raylet.
// TODO (kfstorm): This should be supported by direct actor call.
TestUtils.skipTestIfDirectActorCallEnabled();
RayActor<Counter> counter = Ray.createActor(Counter::new, 100);
// Call an actor method.
RayObject value = counter.call(Counter::getValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public boolean hang() throws InterruptedException {

public void testKillActor() {
TestUtils.skipTestUnderSingleProcess();
TestUtils.skipTestIfDirectActorCallDisabled();
RayActor<HangActor> actor = Ray.createActor(HangActor::new);
Assert.assertTrue(actor.call(HangActor::alive).get());
RayObject<Boolean> result = actor.call(HangActor::hang);
Expand Down
8 changes: 4 additions & 4 deletions java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ public void testDeleteObjects() {
final boolean result = TestUtils.waitForCondition(() ->
!TestUtils.getRuntime().getObjectStore()
.wait(ImmutableList.of(helloId.getId()), 1, 0).get(0), 50);
if (TestUtils.isDirectActorCallEnabled()) {
// Direct call will not delete object from im-memory store.
Assert.assertFalse(result);
} else {
if (TestUtils.isSingleProcessMode()) {
Assert.assertTrue(result);
} else {
// The object will not be deleted under cluster mode.
Assert.assertFalse(result);
}
}

Expand Down
Loading

0 comments on commit f4656d8

Please sign in to comment.