Skip to content

Commit

Permalink
[Core] Support default actor lifetime. (ray-project#21283)
Browse files Browse the repository at this point in the history
Support the ability to specify a default lifetime for actors which are not specified lifetime when creating. This is a job level configuration item.
#### API Change
The Python API looks like:
```python
  ray.init(job_config=JobConfig(default_actor_lifetime="detached"))
```

Java API looks like:
```java
  System.setProperty("ray.job.default-actor-lifetime", defaultActorLifetime.name());
  Ray.init();
```

One example usage is:
```python
  ray.init(job_config=JobConfig(default_actor_lifetime="detached"))
  a1 = A.options(lifetime="non_detached").remote()   # a1 is a non-detached actor.
  a2 = A.remote()  # a2 is a non-detached actor.
```

Co-authored-by: Kai Yang <[email protected]>
Co-authored-by: Qing Wang <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2022
1 parent b00385f commit a37d9a2
Show file tree
Hide file tree
Showing 20 changed files with 328 additions and 46 deletions.
23 changes: 12 additions & 11 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,18 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
bundle_id.second);
placement_group_scheduling_strategy->set_placement_group_capture_child_tasks(false);
}
ray::core::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/false,
name,
ray_namespace,
/*is_asyncio=*/false,
scheduling_strategy};
ray::core::ActorCreationOptions actor_options{
create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/std::make_optional<bool>(false),
name,
ray_namespace,
/*is_asyncio=*/false,
scheduling_strategy};
ActorID actor_id;
auto status = core_worker.CreateActor(BuildRayFunction(invocation), invocation.args,
actor_options, "", &actor_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
/** The options for creating actor. */
public class ActorCreationOptions extends BaseTaskOptions {
public final String name;
public final ActorLifetime lifetime;
public ActorLifetime lifetime;
public final int maxRestarts;
public final List<String> jvmOptions;
public final int maxConcurrency;
Expand Down Expand Up @@ -46,7 +46,7 @@ private ActorCreationOptions(
/** The inner class for building ActorCreationOptions. */
public static class Builder {
private String name;
private ActorLifetime lifetime = ActorLifetime.NON_DETACHED;
private ActorLifetime lifetime = null;
private Map<String, Double> resources = new HashMap<>();
private int maxRestarts = 0;
private List<String> jvmOptions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ private BaseActorHandle createActorImpl(
if (functionDescriptor.getLanguage() != Language.JAVA && options != null) {
Preconditions.checkState(options.jvmOptions == null || options.jvmOptions.size() == 0);
}

BaseActorHandle actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options);
return actor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.ray.api.id.ActorId;
import io.ray.api.id.JobId;
import io.ray.api.id.UniqueId;
import io.ray.api.options.ActorLifetime;
import io.ray.api.runtimecontext.ResourceValue;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.context.NativeWorkerContext;
Expand Down Expand Up @@ -126,6 +127,10 @@ public void start() {
runtimeEnvInfoBuilder.setSerializedRuntimeEnv("{}");
}
jobConfigBuilder.setRuntimeEnvInfo(runtimeEnvInfoBuilder.build());
jobConfigBuilder.setDefaultActorLifetime(
rayConfig.defaultActorLifetime == ActorLifetime.DETACHED
? JobConfig.ActorLifetime.DETACHED
: JobConfig.ActorLifetime.NON_DETACHED);
serializedJobConfig = jobConfigBuilder.build().toByteArray();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigValue;
import io.ray.api.id.JobId;
import io.ray.api.options.ActorLifetime;
import io.ray.runtime.generated.Common.WorkerType;
import io.ray.runtime.util.NetworkUtil;
import java.io.File;
Expand Down Expand Up @@ -51,6 +52,8 @@ public class RayConfig {

public int startupToken;

public final ActorLifetime defaultActorLifetime;

public static class LoggerConf {
public final String loggerName;
public final String fileName;
Expand Down Expand Up @@ -139,6 +142,9 @@ public RayConfig(Config config) {
namespace = null;
}

defaultActorLifetime = config.getEnum(ActorLifetime.class, "ray.job.default-actor-lifetime");
Preconditions.checkState(defaultActorLifetime != null);

// jvm options for java workers of this job.
jvmOptionsForJavaWorker = config.getStringList("ray.job.jvm-options");

Expand Down
6 changes: 6 additions & 0 deletions java/runtime/src/main/resources/ray.default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ ray {
/// Jobs in different namespaces cannot access each other.
/// If it's not specified, a randomized value will be used instead.
namespace: ""

// The default lifetime of actors in this job.
// If the lifetime of an actor is not specified explicitly at runtime, this
// default value will be applied.
// The available values are `NON_DETACHED` and `DETACHED`.
default-actor-lifetime: NON_DETACHED
}

// Configurations about raylet
Expand Down
118 changes: 118 additions & 0 deletions java/test/src/main/java/io/ray/test/DefaultActorLifetimeTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.ray.test;

import io.ray.api.ActorHandle;
import io.ray.api.Ray;
import io.ray.api.options.ActorLifetime;
import io.ray.runtime.exception.RayActorException;
import io.ray.runtime.util.SystemUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "cluster")
public class DefaultActorLifetimeTest {

private static class OwnerActor {
private ActorHandle<ChildActor> childActor;

public ActorHandle<ChildActor> createChildActor(ActorLifetime childActorLifetime) {
if (childActorLifetime == null) {
childActor = Ray.actor(ChildActor::new).remote();
} else {
childActor = Ray.actor(ChildActor::new).setLifetime(childActorLifetime).remote();
}
if ("ok".equals(childActor.task(ChildActor::ready).remote().get())) {
return childActor;
}
return null;
}

int getPid() {
return SystemUtil.pid();
}

String ready() {
return "ok";
}
}

private static class ChildActor {
String ready() {
return "ok";
}
}

@Test(
groups = {"cluster"},
dataProvider = "parameters")
public void testDefaultActorLifetime(
ActorLifetime defaultActorLifetime, ActorLifetime childActorLifetime)
throws IOException, InterruptedException {
if (defaultActorLifetime != null) {
System.setProperty("ray.job.default-actor-lifetime", defaultActorLifetime.name());
}
try {
System.setProperty("ray.job.num-java-workers-per-process", "1");
Ray.init();

/// 1. create owner and invoke createChildActor.
ActorHandle<OwnerActor> owner = Ray.actor(OwnerActor::new).remote();
ActorHandle<ChildActor> child =
owner.task(OwnerActor::createChildActor, childActorLifetime).remote().get();
Assert.assertEquals("ok", child.task(ChildActor::ready).remote().get());
int ownerPid = owner.task(OwnerActor::getPid).remote().get();

/// 2. Kill owner and make sure it's dead.
Runtime.getRuntime().exec("kill -9 " + ownerPid);
Supplier<Boolean> isOwnerDead =
() -> {
try {
owner.task(OwnerActor::ready).remote().get();
return false;
} catch (RayActorException e) {
return true;
}
};
Assert.assertTrue(TestUtils.waitForCondition(isOwnerDead, 3000));

/// 3. Assert child state.
Supplier<Boolean> isChildDead =
() -> {
try {
child.task(ChildActor::ready).remote().get();
return false;
} catch (RayActorException e) {
return true;
}
};
ActorLifetime actualLifetime = defaultActorLifetime;
if (childActorLifetime != null) {
actualLifetime = childActorLifetime;
}
Assert.assertNotNull(actualLifetime);
if (actualLifetime == ActorLifetime.DETACHED) {
TimeUnit.SECONDS.sleep(5);
Assert.assertFalse(isChildDead.get());
} else {
Assert.assertTrue(TestUtils.waitForCondition(isChildDead, 5000));
}
} finally {
Ray.shutdown();
}
}

@DataProvider
public static Object[][] parameters() {
Object[] defaultEnums = new Object[] {ActorLifetime.DETACHED, ActorLifetime.NON_DETACHED};
Object[] enums = new Object[] {null, ActorLifetime.DETACHED, ActorLifetime.NON_DETACHED};
Object[][] params = new Object[6][2];
for (int i = 0; i < 6; ++i) {
params[i][0] = defaultEnums[i / 3];
params[i][1] = enums[i % 3];
}
return params;
}
}
1 change: 0 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ from libcpp.memory cimport (
shared_ptr,
unique_ptr
)

from ray.includes.common cimport (
CBuffer,
CRayObject,
Expand Down
15 changes: 13 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ from libcpp.memory cimport (
make_unique,
unique_ptr,
)
from ray.includes.optional cimport (
optional,
nullopt,
make_optional,
)

from libcpp.string cimport string as c_string
from libcpp.utility cimport pair
from libcpp.unordered_map cimport unordered_map
Expand Down Expand Up @@ -1500,7 +1506,7 @@ cdef class CoreWorker:
resources,
placement_resources,
int32_t max_concurrency,
c_bool is_detached,
is_detached,
c_string name,
c_string ray_namespace,
c_bool is_asyncio,
Expand All @@ -1519,6 +1525,7 @@ cdef class CoreWorker:
CActorID c_actor_id
c_vector[CConcurrencyGroup] c_concurrency_groups
CSchedulingStrategy c_scheduling_strategy
optional[c_bool] is_detached_optional = nullopt

self.python_scheduling_strategy_to_c(
scheduling_strategy, &c_scheduling_strategy)
Expand All @@ -1533,13 +1540,17 @@ cdef class CoreWorker:
prepare_actor_concurrency_groups(
concurrency_groups_dict, &c_concurrency_groups)

if is_detached is not None:
is_detached_optional = make_optional[c_bool](
True if is_detached else False)

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_restarts, max_task_retries, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, name,
dynamic_worker_options, is_detached_optional, name,
ray_namespace,
is_asyncio,
c_scheduling_strategy,
Expand Down
8 changes: 5 additions & 3 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,13 +713,15 @@ def _remote(self,
f"ray.get_actor('{name}', namespace='{namespace}')")

if lifetime is None:
detached = False
detached = None
elif lifetime == "detached":
detached = True
elif lifetime == "non_detached":
detached = False
else:
raise ValueError(
"actor `lifetime` argument must be either `None` or 'detached'"
)
"actor `lifetime` argument must be one of 'detached', "
"'non_detached' and 'None'.")

# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
Expand Down
6 changes: 4 additions & 2 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t, uint32_t
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector as c_vector
from libcpp.pair cimport pair as c_pair

from ray.includes.optional cimport (
optional,
)
from ray.includes.unique_ids cimport (
CActorID,
CJobID,
Expand Down Expand Up @@ -265,7 +267,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached, c_string &name, c_string &ray_namespace,
optional[c_bool] is_detached, c_string &name, c_string &ray_namespace,
c_bool is_asyncio,
const CSchedulingStrategy &scheduling_strategy,
c_string serialized_runtime_env,
Expand Down
19 changes: 18 additions & 1 deletion python/ray/job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class JobConfig:
runtime_env (dict): A runtime environment dictionary (see
``runtime_env.py`` for detailed documentation).
client_job (bool): A boolean represent the source of the job.
default_actor_lifetime (str): The default value of actor lifetime.
"""

def __init__(self,
Expand All @@ -26,7 +27,8 @@ def __init__(self,
runtime_env=None,
client_job=False,
metadata=None,
ray_namespace=None):
ray_namespace=None,
default_actor_lifetime="non_detached"):
self.num_java_workers_per_process = num_java_workers_per_process
self.jvm_options = jvm_options or []
self.code_search_path = code_search_path or []
Expand All @@ -39,6 +41,7 @@ def __init__(self,
self.metadata = metadata or {}
self.ray_namespace = ray_namespace
self.set_runtime_env(runtime_env)
self.set_default_actor_lifetime(default_actor_lifetime)

def set_metadata(self, key: str, value: str) -> None:
self.metadata[key] = value
Expand Down Expand Up @@ -66,6 +69,18 @@ def set_ray_namespace(self, ray_namespace: str) -> None:
self.ray_namespace = ray_namespace
self._cached_pb = None

def set_default_actor_lifetime(self, default_actor_lifetime: str) -> None:
if default_actor_lifetime == "detached":
self._default_actor_lifetime = \
gcs_utils.JobConfig.ActorLifetime.DETACHED
elif default_actor_lifetime == "non_detached":
self._default_actor_lifetime = \
gcs_utils.JobConfig.ActorLifetime.NON_DETACHED
else:
raise ValueError(
"Default actor lifetime must be one of `detached`, `non_detached`"
)

def _validate_runtime_env(self):
# TODO(edoakes): this is really unfortunate, but JobConfig is imported
# all over the place so this causes circular imports. We should remove
Expand Down Expand Up @@ -96,6 +111,8 @@ def get_proto_job_config(self):
parsed_env.serialize()
pb.runtime_env_info.runtime_env_eager_install = eager_install

if self._default_actor_lifetime is not None:
pb.default_actor_lifetime = self._default_actor_lifetime
self._cached_pb = pb

return self._cached_pb
Expand Down
Loading

0 comments on commit a37d9a2

Please sign in to comment.