Skip to content

Commit

Permalink
[core] Enable object reconstruction for retryable actor tasks (ray-pr…
Browse files Browse the repository at this point in the history
…oject#9557)

* Test actor plasma reconstruction

* Allow resubmission of actor tasks

* doc

* Test for actor constructor

* Kill PID before removing node

* Kill pid before node
  • Loading branch information
stephanie-wang authored Jul 24, 2020
1 parent 239196f commit f2705e2
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 24 deletions.
40 changes: 24 additions & 16 deletions doc/source/fault-tolerance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,6 @@ You can experiment with this behavior by running the following code.
except ray.exceptions.RayWorkerError:
print('FAILURE')
Task outputs over a configurable threshold (default 100KB) may be stored in
Ray's distributed object store. Thus, a node failure can cause the loss of a
task output. If this occurs, Ray will automatically attempt to recover the
value by looking for copies of the same object on other nodes. If there are no
other copies left, an ``UnreconstructableError`` will be raised.

When there are no copies of an object left, Ray also provides an option to
automatically recover the value by re-executing the task that created the
value. Arguments to the task are recursively reconstructed with the same
method. This option can be enabled with
``ray.init(enable_object_reconstruction=True)`` in standalone mode or ``ray
start --enable-object-reconstruction`` in cluster mode.


Actors
------

Expand Down Expand Up @@ -164,8 +150,8 @@ You can experiment with this behavior by running the following code.
For at-least-once actors, the system will still guarantee execution ordering
according to the initial submission order. For example, any tasks submitted
after a failed actor task will not execute on the actor until the failed actor
task has been successfully retried. The system also will not attempt to
re-execute any tasks that executed successfully before the failure.
task has been successfully retried. The system will not attempt to re-execute
any tasks that executed successfully before the failure (unless :ref:`object reconstruction <object-reconstruction>` is enabled).

At-least-once execution is best suited for read-only actors or actors with
ephemeral state that does not need to be rebuilt after a failure. For actors
Expand All @@ -174,3 +160,25 @@ manually restart the actor or automatically restart the actor with at-most-once
semantics. If the actor’s exact state at the time of failure is needed, the
application is responsible for resubmitting all tasks since the last
checkpoint.

.. _object-reconstruction:

Objects
-------

Task outputs over a configurable threshold (default 100KB) may be stored in
Ray's distributed object store. Thus, a node failure can cause the loss of a
task output. If this occurs, Ray will automatically attempt to recover the
value by looking for copies of the same object on other nodes. If there are no
other copies left, an ``UnreconstructableError`` will be raised.

When there are no copies of an object left, Ray also provides an option to
automatically recover the value by re-executing the task that created the
value. Arguments to the task are recursively reconstructed with the same
method. This option can be enabled with
``ray.init(enable_object_reconstruction=True)`` in standalone mode or ``ray
start --enable-object-reconstruction`` in cluster mode.
During reconstruction, each task will only be re-executed up to the specified
number of times, using ``max_retries`` for normal tasks and
``max_task_retries`` for actor tasks. Both limits can be set to infinity with
the value -1.
163 changes: 162 additions & 1 deletion python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import json
import os
import signal
import sys

import numpy as np
import pytest

import ray
from ray.test_utils import (
wait_for_condition, )
wait_for_condition,
wait_for_pid_to_exit,
)

SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM


def test_cached_object(ray_start_cluster):
Expand Down Expand Up @@ -217,6 +223,161 @@ def dependent_task(x):
pass


@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_actor_task(ray_start_cluster,
reconstruction_enabled):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"initial_reconstruction_timeout_milliseconds": 200,
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = 0
config = json.dumps(config)

cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_internal_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 2}, object_store_memory=10**8)
cluster.add_node(
num_cpus=1, resources={"node2": 1}, object_store_memory=10**8)
cluster.wait_for_nodes()

@ray.remote(
max_restarts=-1,
max_task_retries=-1 if reconstruction_enabled else 0,
resources={"node1": 1},
num_cpus=0)
class Actor:
def __init__(self):
pass

def large_object(self):
return np.zeros(10**7, dtype=np.uint8)

def pid(self):
return os.getpid()

@ray.remote
def dependent_task(x):
return

a = Actor.remote()
pid = ray.get(a.pid.remote())
obj = a.large_object.remote()
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))

# Workaround to kill the actor process too since there is a bug where the
# actor's plasma client hangs after the plasma store has exited.
os.kill(pid, SIGKILL)

cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, resources={"node1": 2}, object_store_memory=10**8)

wait_for_pid_to_exit(pid)

if reconstruction_enabled:
ray.get(dependent_task.remote(obj))
else:
with pytest.raises(ray.exceptions.RayTaskError) as e:
ray.get(dependent_task.remote(obj))
with pytest.raises(ray.exceptions.UnreconstructableError):
raise e.as_instanceof_cause()

# Make sure the actor handle is still usable.
pid = ray.get(a.pid.remote())


@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_actor_constructor(ray_start_cluster,
reconstruction_enabled):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"initial_reconstruction_timeout_milliseconds": 200,
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = 0
config = json.dumps(config)

cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_internal_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
cluster.add_node(
num_cpus=1, resources={"node2": 1}, object_store_memory=10**8)
cluster.wait_for_nodes()

@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
return np.zeros(10**7, dtype=np.uint8)

# Both the constructor and a method depend on the large object.
@ray.remote(max_restarts=-1)
class Actor:
def __init__(self, x):
pass

def dependent_task(self, x):
return

def pid(self):
return os.getpid()

obj = large_object.options(resources={"node1": 1}).remote()
a = Actor.options(resources={"node1": 1}).remote(obj)
ray.get(a.dependent_task.remote(obj))
pid = ray.get(a.pid.remote())

# Workaround to kill the actor process too since there is a bug where the
# actor's plasma client hangs after the plasma store has exited.
os.kill(pid, SIGKILL)

cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)

wait_for_pid_to_exit(pid)

# Wait for the actor to restart.
def probe():
try:
ray.get(a.dependent_task.remote(obj))
return True
except ray.exceptions.RayActorError:
return False
except (ray.exceptions.RayTaskError,
ray.exceptions.UnreconstructableError):
return True

wait_for_condition(probe)

if reconstruction_enabled:
ray.get(a.dependent_task.remote(obj))
else:
with pytest.raises(ray.exceptions.RayTaskError) as e:
x = a.dependent_task.remote(obj)
print(x)
ray.get(x)
with pytest.raises(ray.exceptions.UnreconstructableError):
raise e.as_instanceof_cause()


@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
config = {
Expand Down
9 changes: 9 additions & 0 deletions src/ray/core_worker/actor_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_
actor_cursor_ = new_cursor;
}

void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec,
const ObjectID new_cursor) {
absl::MutexLock guard(&mutex_);
auto mutable_spec = spec.GetMutableMessage().mutable_actor_task_spec();
mutable_spec->set_previous_actor_task_dummy_object_id(actor_cursor_.Binary());
mutable_spec->set_actor_counter(task_counter_++);
actor_cursor_ = new_cursor;
}

void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); }

} // namespace ray
14 changes: 14 additions & 0 deletions src/ray/core_worker/actor_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,22 @@ class ActorHandle {

std::string ExtensionData() const { return inner_.extension_data(); }

/// Set the actor task spec fields.
///
/// \param[in] builder Task spec builder.
/// \param[in] new_cursor Actor dummy object. This is legacy code needed for
/// raylet-based actor restart.
void SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor);

/// Reset the actor task spec fields of an existing task so that the task can
/// be re-executed.
///
/// \param[in] spec An existing task spec that has executed on the actor
/// before.
/// \param[in] new_cursor Actor dummy object. This is legacy code needed for
/// raylet-based actor restart.
void SetResubmittedActorTaskSpec(TaskSpecification &spec, const ObjectID new_cursor);

void Serialize(std::string *output);

int64_t MaxTaskRetries() const { return inner_.max_task_retries(); }
Expand Down
10 changes: 8 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
};
task_manager_.reset(new TaskManager(
memory_store_, reference_counter_, actor_reporter_,
[this](const TaskSpecification &spec, bool delay) {
[this](TaskSpecification &spec, bool delay) {
if (delay) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
Expand All @@ -392,7 +392,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
} else {
RAY_LOG(ERROR) << "Resubmitting task that produced lost plasma object: "
<< spec.DebugString();
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec));
if (spec.IsActorTask()) {
const auto &actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject());
RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec));
} else {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec));
}
}
},
check_node_alive_fn, reconstruct_object_callback));
Expand Down
8 changes: 5 additions & 3 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ Status TaskManager::ResubmitTask(const TaskID &task_id,
if (it == submissible_tasks_.end()) {
return Status::Invalid("Task spec missing");
}
if (it->second.spec.IsActorTask()) {
return Status::Invalid("Cannot reconstruct objects returned by actors");
}

if (!it->second.pending) {
resubmit = true;
Expand Down Expand Up @@ -118,6 +115,11 @@ Status TaskManager::ResubmitTask(const TaskID &task_id,
reference_counter_->UpdateResubmittedTaskReferences(*task_deps);
}

if (spec.IsActorTask()) {
const auto actor_creation_return_id = spec.ActorCreationDummyObjectId();
reference_counter_->UpdateResubmittedTaskReferences({actor_creation_return_id});
}

if (resubmit) {
retry_task_callback_(spec, /*delay=*/false);
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TaskResubmissionInterface {
virtual ~TaskResubmissionInterface() {}
};

using RetryTaskCallback = std::function<void(const TaskSpecification &spec, bool delay)>;
using RetryTaskCallback = std::function<void(TaskSpecification &spec, bool delay)>;
using ReconstructObjectCallback = std::function<void(const ObjectID &object_id)>;

class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TaskManagerTest : public ::testing::Test {
/*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))),
actor_reporter_(std::shared_ptr<ActorReporterInterface>(new MockActorManager())),
manager_(store_, reference_counter_, actor_reporter_,
[this](const TaskSpecification &spec, bool delay) {
[this](TaskSpecification &spec, bool delay) {
num_retries_++;
return Status::OK();
},
Expand Down

0 comments on commit f2705e2

Please sign in to comment.