Skip to content

Commit

Permalink
[Core] Ensure "get_if_exists" takes effect in the decorator. (ray-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
suquark authored Apr 28, 2022
1 parent 561e169 commit b0f00a1
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 36 deletions.
61 changes: 25 additions & 36 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,6 @@ def method(self):

class ActorOptionWrapper:
def remote(self, *args, **kwargs):
# Handle the get-or-create case.
if updated_options.get("get_if_exists"):
return self._get_or_create_impl(args, kwargs)

# Normal create case.
return actor_cls._remote(args=args, kwargs=kwargs, **updated_options)

def bind(self, *args, **kwargs):
Expand All @@ -581,27 +576,6 @@ def bind(self, *args, **kwargs):
updated_options,
)

def _get_or_create_impl(self, args, kwargs):
name = updated_options["name"]
try:
return ray.get_actor(
name, namespace=updated_options.get("namespace")
)
except ValueError:
# Attempt to create it (may race with other attempts).
try:
return actor_cls._remote(
args=args,
kwargs=kwargs,
**updated_options,
)
except ValueError:
# We lost the creation race, ignore.
pass
return ray.get_actor(
name, namespace=updated_options.get("namespace")
)

return ActorOptionWrapper()

@_tracing_actor_creation
Expand Down Expand Up @@ -667,6 +641,31 @@ def _remote(self, args=None, kwargs=None, **actor_options):
Returns:
A handle to the newly created actor.
"""
name = actor_options.get("name")
namespace = actor_options.get("namespace")
if name is not None:
if not isinstance(name, str):
raise TypeError(f"name must be None or a string, got: '{type(name)}'.")
elif name == "":
raise ValueError("Actor name cannot be an empty string.")
if namespace is not None:
ray._private.utils.validate_namespace(namespace)

# Handle the get-or-create case.
if actor_options.get("get_if_exists"):
try:
return ray.get_actor(name, namespace=namespace)
except ValueError:
# Attempt to create it (may race with other attempts).
updated_options = actor_options.copy()
updated_options["get_if_exists"] = False # prevent infinite loop
try:
return self._remote(args, kwargs, **updated_options)
except ValueError:
# We lost the creation race, ignore.
pass
return ray.get_actor(name, namespace=namespace)

# We pop the "concurrency_groups" coming from "@ray.remote" here. We no longer
# need it in "_remote()".
actor_options.pop("concurrency_groups", None)
Expand Down Expand Up @@ -701,8 +700,6 @@ def _remote(self, args=None, kwargs=None, **actor_options):

# TODO(suquark): cleanup these fields
max_concurrency = actor_options["max_concurrency"]
name = actor_options["name"]
namespace = actor_options["namespace"]
lifetime = actor_options["lifetime"]
runtime_env = actor_options["runtime_env"]
placement_group = actor_options["placement_group"]
Expand All @@ -718,14 +715,6 @@ def _remote(self, args=None, kwargs=None, **actor_options):
worker = ray.worker.global_worker
worker.check_connected()

if name is not None:
if not isinstance(name, str):
raise TypeError(f"name must be None or a string, got: '{type(name)}'.")
elif name == "":
raise ValueError("Actor name cannot be an empty string.")
if namespace is not None:
ray._private.utils.validate_namespace(namespace)

# Check whether the name is already taken.
# TODO(edoakes): this check has a race condition because two drivers
# could pass the check and then create the same named actor. We should
Expand Down
24 changes: 24 additions & 0 deletions python/ray/tests/test_get_or_create_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,34 @@ def pid(self):
assert ray.get(b.ping.remote()) == "ok"
assert ray.get(b.pid.remote()) == ray.get(a.pid.remote())

with pytest.raises(TypeError):
Actor.options(name=object(), get_if_exists=True).remote()

with pytest.raises(TypeError):
Actor.options(name="x", namespace=object(), get_if_exists=True).remote()

with pytest.raises(ValueError):
Actor.options(num_cpus=1, get_if_exists=True).remote()


def test_shared_actor(shutdown_only):
ray.init(num_cpus=1)

@ray.remote(name="x", namespace="test", get_if_exists=True)
class SharedActor:
def ping(self):
return "ok"

def pid(self):
return os.getpid()

a = SharedActor.remote()
b = SharedActor.remote()
assert ray.get(a.ping.remote()) == "ok"
assert ray.get(b.ping.remote()) == "ok"
assert ray.get(b.pid.remote()) == ray.get(a.pid.remote())


def test_no_verbose_output():
script = """
import ray
Expand Down

0 comments on commit b0f00a1

Please sign in to comment.