Skip to content

Commit

Permalink
Nondeterministic reconstruction for actors (ray-project#1344)
Browse files Browse the repository at this point in the history
* Add failing unit test for nondeterministic reconstruction

* Retry scheduling actor tasks if reassigned to local scheduler

* Update execution edges asynchronously upon dispatch for nondeterministic reconstruction

* Fix bug for updating checkpoint task execution dependencies

* Update comments for deterministic reconstruction

* cleanup

* Add (and skip) failing test case for nondeterministic reconstruction

* Suppress test output
  • Loading branch information
stephanie-wang authored Jan 21, 2018
1 parent 83949a5 commit 74718ef
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 19 deletions.
22 changes: 11 additions & 11 deletions src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,12 @@ bool TaskSpec_is_actor_checkpoint_method(TaskSpec *spec) {
return message->is_actor_checkpoint_method();
}

bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) {
if (TaskSpec_actor_counter(spec) == 0) {
/* The first task does not have any dependencies. */
return false;
} else if (TaskSpec_is_actor_checkpoint_method(spec)) {
/* Checkpoint tasks do not have any dependencies. */
return false;
} else {
/* For all other tasks, the last argument is the dummy object. */
return arg_index == (TaskSpec_num_args(spec) - 1);
}
ObjectID TaskSpec_actor_dummy_object(TaskSpec *spec) {
CHECK(TaskSpec_is_actor_task(spec));
/* The last return value for actor tasks is the dummy object that
* represents that this task has completed execution. */
int64_t num_returns = TaskSpec_num_returns(spec);
return TaskSpec_return(spec, num_returns - 1);
}

UniqueID TaskSpec_driver_id(const TaskSpec *spec) {
Expand Down Expand Up @@ -392,6 +387,11 @@ std::vector<ObjectID> TaskExecutionSpec::ExecutionDependencies() {
return execution_dependencies_;
}

void TaskExecutionSpec::SetExecutionDependencies(
const std::vector<ObjectID> &dependencies) {
execution_dependencies_ = dependencies;
}

int64_t TaskExecutionSpec::SpecSize() {
return task_spec_size_;
}
Expand Down
17 changes: 12 additions & 5 deletions src/common/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ class TaskExecutionSpec {
/// dependencies.
std::vector<ObjectID> ExecutionDependencies();

/// Set the task's execution dependencies.
///
/// @param dependencies The value to set the execution dependencies to.
/// @return Void.
void SetExecutionDependencies(const std::vector<ObjectID> &dependencies);

/// Get the task spec size.
///
/// @return The size of the immutable task spec.
Expand Down Expand Up @@ -239,14 +245,15 @@ int64_t TaskSpec_actor_counter(TaskSpec *spec);
bool TaskSpec_is_actor_checkpoint_method(TaskSpec *spec);

/**
* Return whether the task's argument is a dummy object. Dummy objects are used
* to encode an actor's state dependencies in the task graph.
* Return an actor task's dummy return value. Dummy objects are used to
* encode an actor's state dependencies in the task graph. The dummy object
* is local if and only if the task that returned it has completed
* execution.
*
* @param spec The task_spec in question.
* @param arg_index The index of the argument in question.
* @return Whether the argument at arg_index is a dummy object.
* @return The dummy object ID that the actor task will return.
*/
bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index);
ObjectID TaskSpec_actor_dummy_object(TaskSpec *spec);

/**
* Return the driver ID of the task.
Expand Down
44 changes: 41 additions & 3 deletions src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ typedef struct {
* order that the tasks were submitted, per handle. Tasks from different
* handles to the same actor may be interleaved. */
std::unordered_map<ActorID, int64_t, UniqueIDHasher> task_counters;
/** The return value of the most recently executed task. The next task to
* execute should take this as an execution dependency at dispatch time. Set
* to nil if there are no execution dependencies (e.g., this is the first
* task to execute). */
ObjectID execution_dependency;
/** The index of the task assigned to this actor. Set to -1 if no task is
* currently assigned. If the actor process reports back success for the
* assigned task execution, then the corresponding task_counter should be
Expand Down Expand Up @@ -219,6 +224,9 @@ void create_actor(SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker) {
LocalActorInfo entry;
entry.task_counters[ActorID::nil()] = 0;
/* The actor has not yet executed any tasks, so there are no execution
* dependencies for the next task to be scheduled. */
entry.execution_dependency = ObjectID::nil();
entry.assigned_task_counter = -1;
entry.assigned_task_handle_id = ActorID::nil();
entry.task_queue = new std::list<TaskExecutionSpec>();
Expand Down Expand Up @@ -320,9 +328,31 @@ bool dispatch_actor_task(LocalSchedulerState *state,
return false;
}

/* Update the task's execution dependencies to reflect the actual execution
* order to support deterministic reconstruction. */
/* NOTE(swang): The update of an actor task's execution dependencies is
* performed asynchronously. This means that if this local scheduler dies, we
* may lose updates that are in flight to the task table. We only guarantee
* deterministic reconstruction ordering for tasks whose updates are
* reflected in the task table. */
std::vector<ObjectID> ordered_execution_dependencies;
/* Only overwrite execution dependencies for tasks that have a
* submission-time dependency (meaning it is not the initial task). */
if (!entry.execution_dependency.is_nil()) {
/* A checkpoint resumption should be able to run at any time, so only add
* execution dependencies for non-checkpoint tasks. */
if (!TaskSpec_is_actor_checkpoint_method(spec)) {
/* All other tasks have a dependency on the task that executed most
* recently on the actor. */
ordered_execution_dependencies.push_back(entry.execution_dependency);
}
}
task->SetExecutionDependencies(ordered_execution_dependencies);

/* Assign the first task in the task queue to the worker and mark the worker
* as unavailable. */
assign_task_to_worker(state, *task, entry.worker);
entry.execution_dependency = TaskSpec_actor_dummy_object(spec);
entry.assigned_task_counter = next_task_counter;
entry.assigned_task_handle_id = next_task_handle_id;
entry.worker_available = false;
Expand Down Expand Up @@ -962,9 +992,17 @@ void give_task_to_local_scheduler_retry(UniqueID id,
ActorID actor_id = TaskSpec_actor_id(spec);
CHECK(state->actor_mapping.count(actor_id) == 1);

give_task_to_local_scheduler(
state, state->algorithm_state, *execution_spec,
state->actor_mapping[actor_id].local_scheduler_id);
if (state->actor_mapping[actor_id].local_scheduler_id ==
get_db_client_id(state->db)) {
/* The task is now scheduled to us. Call the callback directly. */
handle_task_scheduled(state, state->algorithm_state, *execution_spec);
} else {
/* The task is scheduled to a remote local scheduler. Try to hand it to
* them again. */
give_task_to_local_scheduler(
state, state->algorithm_state, *execution_spec,
state->actor_mapping[actor_id].local_scheduler_id);
}
}

/**
Expand Down
80 changes: 80 additions & 0 deletions test/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,86 @@ def g():
# self.assertRaises(Exception):
# ray.get(g.remote())

def _testNondeterministicReconstruction(self, num_forks,
num_items_per_fork,
num_forks_to_wait):
ray.worker._init(start_ray_local=True, num_local_schedulers=2,
num_workers=0, redirect_output=True)

# Make a shared queue.
@ray.remote
class Queue(object):
def __init__(self):
self.queue = []

def local_plasma(self):
return ray.worker.global_worker.plasma_client.store_socket_name

def push(self, item):
self.queue.append(item)

def read(self):
return self.queue

# Schedule the shared queue onto the remote local scheduler.
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
actor = Queue.remote()
while ray.get(actor.local_plasma.remote()) == local_plasma:
actor = Queue.remote()

# A task that takes in the shared queue and a list of items to enqueue,
# one by one.
@ray.remote
def enqueue(queue, items):
done = None
for item in items:
done = queue.push.remote(item)
# TODO(swang): Return the object ID returned by the last method
# called on the shared queue, so that the caller of enqueue can
# wait for all of the queue methods to complete. This can be
# removed once join consistency is implemented.
return [done]

# Call the enqueue task num_forks times, each with num_items_per_fork
# unique objects to push onto the shared queue.
enqueue_tasks = []
for fork in range(num_forks):
enqueue_tasks.append(enqueue.remote(
actor, [(fork, i) for i in range(num_items_per_fork)]))
# Wait for the forks to complete their tasks.
enqueue_tasks = ray.get(enqueue_tasks)
enqueue_tasks = [fork_ids[0] for fork_ids in enqueue_tasks]
ray.wait(enqueue_tasks, num_returns=num_forks_to_wait)

# Read the queue to get the initial order of execution.
queue = ray.get(actor.read.remote())

# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding local scheduler to exit.
process = ray.services.all_processes[
ray.services.PROCESS_TYPE_PLASMA_STORE][1]
process.kill()
process.wait()

# Read the queue again and check for deterministic reconstruction.
ray.get(enqueue_tasks)
reconstructed_queue = ray.get(actor.read.remote())
# Make sure the final queue has all items from all forks.
self.assertEqual(len(reconstructed_queue), num_forks *
num_items_per_fork)
# Make sure that the prefix of the final queue matches the queue from
# the initial execution.
self.assertEqual(queue, reconstructed_queue[:len(queue)])

def testNondeterministicReconstruction(self):
self._testNondeterministicReconstruction(10, 100, 10)

@unittest.skip("Nondeterministic reconstruction currently not supported "
"when there are concurrent forks that didn't finish "
"initial execution.")
def testNondeterministicReconstructionConcurrentForks(self):
self._testNondeterministicReconstruction(10, 100, 1)


@unittest.skip("Actor placement currently does not use custom resources.")
class ActorPlacement(unittest.TestCase):
Expand Down

0 comments on commit 74718ef

Please sign in to comment.