Skip to content

Commit

Permalink
[core] Renamed _max_retries in actor methods to max_task_retries. (ra…
Browse files Browse the repository at this point in the history
…y-project#41761)

Renamed _max_retries in actor methods to max_task_retries.

Also added tests for threaded actors

Signed-off-by: Ruiyang Wang [email protected]
  • Loading branch information
rynewang authored Jan 23, 2024
1 parent 762c25f commit a217f2e
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 77 deletions.
13 changes: 6 additions & 7 deletions doc/source/ray-core/fault_tolerance/actors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,15 @@ You can set ``retry_exceptions`` in the `@ray.method(retry_exceptions=...)` deco

Retry behavior depends on the value you set ``retry_exceptions`` to:
- ``retry_exceptions == False`` (default): No retries for user exceptions.
- ``retry_exceptions == True``: Ray retries a method on user exception up to ``max_retries`` times.
- ``retry_exceptions`` is a list of exceptions: Ray retries a method on user exception up to ``max_retries`` times, only if the method raises an exception from these specific classes.
- ``retry_exceptions == True``: Ray retries a method on user exception up to ``max_task_retries`` times.
- ``retry_exceptions`` is a list of exceptions: Ray retries a method on user exception up to ``max_task_retries`` times, only if the method raises an exception from these specific classes.

``max_task_retries`` applies to both exceptions and actor crashes. Ray searches for the first non-default value of ``max_task_retries`` in this order:

.. - The method call's value, for example, `actor.method.options(_max_retries=2)`. Ray ignores this value if you didn't set it.
.. - The method definition's value, for example, `@ray.method(_max_retries=2)`. Ray ignores this value if you didn't set it.
``max_task_retries`` applies to both exceptions and actor crashes. A Ray actor can set this option to apply to all of its methods. A method can also set an overriding option for itself. Ray searches for the first non-default value of ``max_task_retries`` in this order:

- The method call's value, for example, `actor.method.options(max_task_retries=2)`. Ray ignores this value if you don't set it.
- The method definition's value, for example, `@ray.method(max_task_retries=2)`. Ray ignores this value if you don't set it.
- The actor creation call's value, for example, `Actor.options(max_task_retries=2)`. Ray ignores this value if you didn't set it.
- The Actor class definition's value, for example, `@ray.remote(max_task_retries=2)` decorator. Ray ignores this value if you didn't set it.
- The default value,`0`.

For example, if a method sets `max_retries=5` and `retry_exceptions=True`, and the actor sets `max_restarts=2`, Ray executes the method up to 6 times: once for the initial invocation, and 5 additional retries. The 6 invocations may include 2 actor crashes. After the 6th invocation, a `ray.get` call to the result Ray ObjectRef raises the exception raised in the last invocation, or `ray.exceptions.RayActorError` if the actor crashed in the last invocation.
For example, if a method sets `max_task_retries=5` and `retry_exceptions=True`, and the actor sets `max_restarts=2`, Ray executes the method up to 6 times: once for the initial invocation, and 5 additional retries. The 6 invocations may include 2 actor crashes. After the 6th invocation, a `ray.get` call to the result Ray ObjectRef raises the exception raised in the last invocation, or `ray.exceptions.RayActorError` if the actor crashed in the last invocation.
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4190,7 +4190,7 @@ cdef class CoreWorker:
method_meta.decorators,
method_meta.signatures,
method_meta.num_returns,
method_meta.max_retries,
method_meta.max_task_retries,
method_meta.retry_exceptions,
method_meta.generator_backpressure_num_objects, # noqa
actor_method_cpu,
Expand All @@ -4203,7 +4203,7 @@ cdef class CoreWorker:
{}, # method decorators
{}, # method signatures
{}, # method num_returns
{}, # method max_retries
{}, # method max_task_retries
{}, # method retry_exceptions
{}, # generator_backpressure_num_objects
0, # actor method cpu
Expand Down
77 changes: 39 additions & 38 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def bar(self):
valid_kwargs = [
"num_returns",
"concurrency_group",
"_max_retries",
"max_task_retries",
"retry_exceptions",
"_generator_backpressure_num_objects",
]
Expand All @@ -92,8 +92,8 @@ def bar(self):
def annotate_method(method):
if "num_returns" in kwargs:
method.__ray_num_returns__ = kwargs["num_returns"]
if "_max_retries" in kwargs:
method.__ray_max_retries__ = kwargs["_max_retries"]
if "max_task_retries" in kwargs:
method.__ray_max_task_retries__ = kwargs["max_task_retries"]
if "retry_exceptions" in kwargs:
method.__ray_retry_exceptions__ = kwargs["retry_exceptions"]
if "concurrency_group" in kwargs:
Expand Down Expand Up @@ -123,7 +123,7 @@ class ActorMethod:
invocation should return. If None is given, it uses
DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS for a normal actor task
and "streaming" for a generator task (when `is_generator` is True).
_max_retries: [Internal] Number of retries on method failure.
_max_task_retries: Number of retries on method failure.
_retry_exceptions: Boolean of whether you want to retry all user-raised
exceptions, or a list of allowlist exceptions to retry.
_is_generator: True if a given method is a Python generator.
Expand All @@ -144,7 +144,7 @@ def __init__(
actor,
method_name,
num_returns: Optional[Union[int, str]],
_max_retries: int,
max_task_retries: int,
retry_exceptions: Union[bool, list, tuple],
is_generator: bool,
generator_backpressure_num_objects: int,
Expand All @@ -162,7 +162,7 @@ def __init__(
else:
self._num_returns = ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS

self._max_retries = _max_retries
self._max_task_retries = max_task_retries
self._retry_exceptions = retry_exceptions
self._is_generator = is_generator
self._generator_backpressure_num_objects = generator_backpressure_num_objects
Expand Down Expand Up @@ -271,18 +271,17 @@ def _remote(
kwargs=None,
name="",
num_returns=None,
_max_retries=None,
max_task_retries=None,
retry_exceptions=None,
concurrency_group=None,
_generator_backpressure_num_objects=None,
):
if num_returns is None:
num_returns = self._num_returns
max_retries = _max_retries
if max_retries is None:
max_retries = self._max_retries
if max_retries is None:
max_retries = 0
if max_task_retries is None:
max_task_retries = self._max_task_retries
if max_task_retries is None:
max_task_retries = 0
if retry_exceptions is None:
retry_exceptions = self._retry_exceptions
if _generator_backpressure_num_objects is None:
Expand All @@ -302,7 +301,7 @@ def invocation(args, kwargs):
kwargs=kwargs,
name=name,
num_returns=num_returns,
max_retries=max_retries,
max_task_retries=max_task_retries,
retry_exceptions=retry_exceptions,
concurrency_group_name=concurrency_group,
generator_backpressure_num_objects=(
Expand All @@ -321,7 +320,7 @@ def __getstate__(self):
"actor": self._actor_ref(),
"method_name": self._method_name,
"num_returns": self._num_returns,
"max_retries": self._max_retries,
"max_task_retries": self._max_task_retries,
"retry_exceptions": self._retry_exceptions,
"decorator": self._decorator,
"is_generator": self._is_generator,
Expand All @@ -333,7 +332,7 @@ def __setstate__(self, state):
state["actor"],
state["method_name"],
state["num_returns"],
state["max_retries"],
state["max_task_retries"],
state["retry_exceptions"],
state["is_generator"],
state["generator_backpressure_num_objects"],
Expand All @@ -354,7 +353,7 @@ class _ActorClassMethodMetadata(object):
signatures: The signatures of the methods.
num_returns: The default number of return values for
each actor method.
max_retries: Number of retries on method failure.
max_task_retries: Number of retries on method failure.
retry_exceptions: Boolean of whether you want to retry all user-raised
exceptions, or a list of allowlist exceptions to retry, for each method.
"""
Expand Down Expand Up @@ -392,7 +391,7 @@ def create(cls, modified_class, actor_creation_function_descriptor):
self.decorators = {}
self.signatures = {}
self.num_returns = {}
self.max_retries = {}
self.max_task_retries = {}
self.retry_exceptions = {}
self.method_is_generator = {}
self.generator_backpressure_num_objects = {}
Expand Down Expand Up @@ -420,12 +419,13 @@ def create(cls, modified_class, actor_creation_function_descriptor):
else:
self.num_returns[method_name] = None

# Only contains entries from `@ray.method(_max_retries=...)`
# Only contains entries from `@ray.method(max_task_retries=...)`
# Ray may not populate the others with max_task_retries here because you may
# have set in `actor.method.options(_max_retries=...)`. So Ray always stores
# both max_retries and max_task_retries, and favors the former.
if hasattr(method, "__ray_max_retries__"):
self.max_retries[method_name] = method.__ray_max_retries__
# have set in `actor.method.options(max_task_retries=...)`. So Ray always
# stores max_task_retries both from the method and from the actor, and
# favors the former.
if hasattr(method, "__ray_max_task_retries__"):
self.max_task_retries[method_name] = method.__ray_max_task_retries__

if hasattr(method, "__ray_retry_exceptions__"):
self.retry_exceptions[method_name] = method.__ray_retry_exceptions__
Expand Down Expand Up @@ -722,16 +722,17 @@ def options(self, **actor_options):
A value of -1 indicates that an actor should be restarted
indefinitely.
max_task_retries: How many times to
retry an actor task if the task fails due to a system error,
retry an actor task if the task fails due to a runtime error,
e.g., the actor has died. If set to -1, the system will
retry the failed task until the task succeeds, or the actor
has reached its max_restarts limit. If set to `n > 0`, the
system will retry the failed task up to n times, after which the
task will throw a `RayActorError` exception upon :obj:`ray.get`.
Note that Python exceptions are not considered system errors
and don't trigger retries. [Internal use: You can override this number
with the method's "_max_retries" option at @ray.method decorator or
at .option() time.]
Note that Python exceptions may trigger retries *only if*
`retry_exceptions` is set for the method, in that case when
`max_task_retries` runs out the task will rethrow the exception from
the task. You can override this number with the method's
`max_task_retries` option in `@ray.method` decorator or in `.option()`.
max_pending_calls: Set the max number of pending calls
allowed on the actor handle. When this value is exceeded,
PendingCallsLimitExceeded will be raised for further tasks.
Expand Down Expand Up @@ -1164,7 +1165,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
meta.method_meta.decorators,
meta.method_meta.signatures,
meta.method_meta.num_returns,
meta.method_meta.max_retries,
meta.method_meta.max_task_retries,
meta.method_meta.retry_exceptions,
meta.method_meta.generator_backpressure_num_objects,
actor_method_cpu,
Expand Down Expand Up @@ -1210,7 +1211,7 @@ class ActorHandle:
invocation side, whereas a regular decorator can be used to change
the behavior on the execution side.
_ray_method_signatures: The signatures of the actor methods.
_ray_method_max_retries: Max number of retries on method failure.
_ray_method_max_task_retries: Max number of retries on method failure.
_ray_method_num_returns: The default number of return values for
each method.
_ray_method_retry_exceptions: The default value of boolean of whether you want
Expand All @@ -1237,7 +1238,7 @@ def __init__(
method_decorators,
method_signatures,
method_num_returns: Dict[str, int],
method_max_retries: Dict[str, int],
method_max_task_retries: Dict[str, int],
method_retry_exceptions: Dict[str, Union[bool, list, tuple]],
method_generator_backpressure_num_objects: Dict[str, int],
actor_method_cpus: int,
Expand All @@ -1253,7 +1254,7 @@ def __init__(
self._ray_method_decorators = method_decorators
self._ray_method_signatures = method_signatures
self._ray_method_num_returns = method_num_returns
self._ray_method_max_retries = method_max_retries
self._ray_method_max_task_retries = method_max_task_retries
self._ray_method_retry_exceptions = method_retry_exceptions
self._ray_method_generator_backpressure_num_objects = (
method_generator_backpressure_num_objects
Expand Down Expand Up @@ -1281,7 +1282,7 @@ def __init__(
self,
method_name,
self._ray_method_num_returns[method_name],
self._ray_method_max_retries.get(
self._ray_method_max_task_retries.get(
method_name, self._ray_max_task_retries
)
or 0, # never None
Expand Down Expand Up @@ -1315,7 +1316,7 @@ def _actor_method_call(
kwargs: Dict[str, Any] = None,
name: str = "",
num_returns: Optional[int] = None,
max_retries: int = None,
max_task_retries: int = None,
retry_exceptions: Union[bool, list, tuple] = None,
concurrency_group_name: Optional[str] = None,
generator_backpressure_num_objects: Optional[int] = None,
Expand All @@ -1333,7 +1334,7 @@ def _actor_method_call(
kwargs: A dictionary of keyword arguments for the actor method.
name: The name to give the actor method call task.
num_returns: The number of return values for the method.
max_retries: Number of retries when method fails.
max_task_retries: Number of retries when method fails.
retry_exceptions: Boolean of whether you want to retry all user-raised
exceptions, or a list of allowlist exceptions to retry.
Expand Down Expand Up @@ -1397,7 +1398,7 @@ def _actor_method_call(
list_args,
name,
num_returns,
max_retries,
max_task_retries,
retry_exceptions,
retry_exception_allowlist,
self._ray_actor_method_cpus,
Expand Down Expand Up @@ -1445,7 +1446,7 @@ def remote(self, *args, **kwargs):
self, # actor
item, # method_name
ray_constants.DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS,
0, # max_retries
0, # max_task_retries
False, # retry_exceptions
False, # is_generator
self._ray_method_generator_backpressure_num_objects.get(item, -1),
Expand Down Expand Up @@ -1497,7 +1498,7 @@ def _serialization_helper(self):
"method_decorators": self._ray_method_decorators,
"method_signatures": self._ray_method_signatures,
"method_num_returns": self._ray_method_num_returns,
"method_max_retries": self._ray_method_max_retries,
"method_max_task_retries": self._ray_method_max_task_retries,
"method_retry_exceptions": self._ray_method_retry_exceptions,
"method_generator_backpressure_num_objects": (
self._ray_method_generator_backpressure_num_objects
Expand Down Expand Up @@ -1541,7 +1542,7 @@ def _deserialization_helper(cls, state, outer_object_ref=None):
state["method_decorators"],
state["method_signatures"],
state["method_num_returns"],
state["method_max_retries"],
state["method_max_task_retries"],
state["method_retry_exceptions"],
state["method_generator_backpressure_num_objects"],
state["actor_method_cpus"],
Expand Down
Loading

0 comments on commit a217f2e

Please sign in to comment.