Skip to content

Commit

Permalink
Refactor actor task queues (ray-project#1118)
Browse files Browse the repository at this point in the history
* Refactor add_task_to_actor_queue into queue_actor_task and insert_actor_task_queue

* Refactor actor task queue to share the waiting task queue

* Fix
  • Loading branch information
stephanie-wang authored and robertnishihara committed Oct 14, 2017
1 parent 79ea205 commit 15486a1
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 95 deletions.
15 changes: 10 additions & 5 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ def actor_method_executor(dummy_return_id, task_counter, actor,
if not actor_checkpoint_failed:
put_dummy_object(worker, dummy_return_id)
worker.actor_task_counter = task_counter + 1
# Once the actor has resumed from a checkpoint, it counts as
# loaded.
worker.actor_loaded = True
# Report to the local scheduler whether this task succeeded in
# loading the checkpoint.
worker.actor_checkpoint_failed = actor_checkpoint_failed
Expand All @@ -168,6 +171,8 @@ def actor_method_executor(dummy_return_id, task_counter, actor,
# case the method throws an exception.
put_dummy_object(worker, dummy_return_id)
worker.actor_task_counter = task_counter + 1
# Once the actor executes a task, it counts as loaded.
worker.actor_loaded = True
# Execute the actor method.
return method(actor, *args)
return actor_method_executor
Expand Down Expand Up @@ -408,9 +413,9 @@ def __ray_checkpoint__(self, task_counter, previous_object_id):
error_to_return = None

# Save or resume the checkpoint.
if previous_object_id in worker.actor_pinned_objects:
# The preceding task executed on this actor instance. Save the
# checkpoint.
if worker.actor_loaded:
# The actor has loaded, so we are running the normal execution.
# Save the checkpoint.
print("Saving actor checkpoint. actor_counter = {}."
.format(task_counter))
actor_key = b"Actor:" + worker.actor_id
Expand All @@ -437,8 +442,8 @@ def __ray_checkpoint__(self, task_counter, previous_object_id):
# so we still consider the task successful.
error_to_return = error
else:
# The preceding task has not yet executed on this actor
# instance. Try to resume from the most recent checkpoint.
# The actor has not yet loaded. Try loading it from the most
# recent checkpoint.
checkpoint_index, checkpoint = get_actor_checkpoint(
worker, worker.actor_id)
if checkpoint_index == task_counter:
Expand Down
4 changes: 4 additions & 0 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ def __init__(self):
self.make_actor = None
self.actors = {}
self.actor_task_counter = 0
# Whether an actor instance has been loaded yet. The actor counts as
# loaded once it has either executed its first task or successfully
# resumed from a checkpoint.
self.actor_loaded = False
# This field is used to report actor checkpoint failure for the last
# task assigned. Workers are not assigned a task on startup, so we
# initialize to False.
Expand Down
17 changes: 17 additions & 0 deletions src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) {
return from_flatbuf(message->actor_id());
}

bool TaskSpec_is_actor_task(TaskSpec *spec) {
return !ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID);
}

int64_t TaskSpec_actor_counter(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
Expand All @@ -227,6 +231,19 @@ bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) {
return actor_counter < 0;
}

bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) {
if (TaskSpec_actor_counter(spec) == 0) {
/* The first task does not have any dependencies. */
return false;
} else if (TaskSpec_actor_is_checkpoint_method(spec)) {
/* Checkpoint tasks do not have any dependencies. */
return false;
} else {
/* For all other tasks, the last argument is the dummy object. */
return arg_index == (TaskSpec_num_args(spec) - 1);
}
}

UniqueID TaskSpec_driver_id(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
Expand Down
24 changes: 24 additions & 0 deletions src/common/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ FunctionID TaskSpec_function(TaskSpec *spec);
*/
UniqueID TaskSpec_actor_id(TaskSpec *spec);

/**
* Return whether this task is for an actor.
*
* @param spec The task_spec in question.
* @return Whether the task is for an actor.
*/
bool TaskSpec_is_actor_task(TaskSpec *spec);

/**
* Return the actor counter of the task. This starts at 0 and increments by 1
* every time a new task is submitted to run on the actor.
Expand All @@ -135,8 +143,24 @@ UniqueID TaskSpec_actor_id(TaskSpec *spec);
*/
int64_t TaskSpec_actor_counter(TaskSpec *spec);

/**
* Return whether the task is a checkpoint method execution.
*
* @param spec The task_spec in question.
* @return Whether the task is a checkpoint method.
*/
bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec);

/**
* Return whether the task's argument is a dummy object. Dummy objects are used
* to encode an actor's state dependencies in the task graph.
*
* @param spec The task_spec in question.
* @param arg_index The index of the argument in question.
* @return Whether the argument at arg_index is a dummy object.
*/
bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index);

/**
* Return the driver ID of the task.
*
Expand Down
Loading

0 comments on commit 15486a1

Please sign in to comment.