Skip to content

Commit

Permalink
[core] Remove some legacy code for actor execution (ray-project#31614)
Browse files Browse the repository at this point in the history
Removes some legacy code related to the actor "dummy object", which was used in early versions of Ray for task scheduling.

Signed-off-by: Stephanie Wang <[email protected]>
  • Loading branch information
stephanie-wang authored Jan 17, 2023
1 parent 5e66cbf commit 22a8ab6
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 84 deletions.
6 changes: 2 additions & 4 deletions cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
TaskID::ForActorCreationTask(invocation.actor_id);
const ObjectID actor_creation_dummy_object_id =
ObjectID::FromIndex(actor_creation_task_id, 1);
builder.SetActorTaskSpec(invocation.actor_id,
actor_creation_dummy_object_id,
ObjectID(),
invocation.actor_counter);
builder.SetActorTaskSpec(
invocation.actor_id, actor_creation_dummy_object_id, invocation.actor_counter);
} else {
throw RayException("unknown task type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,7 @@ private Set<ObjectId> getUnreadyObjects(TaskSpec taskSpec) {
}
}
}
if (taskSpec.getType() == TaskType.ACTOR_TASK && !isConcurrentActor(taskSpec)) {
ObjectId dummyObjectId =
new ObjectId(
taskSpec.getActorTaskSpec().getPreviousActorTaskDummyObjectId().toByteArray());
if (!objectStore.isObjectReady(dummyObjectId)) {
unreadyObjects.add(dummyObjectId);
}
} else if (taskSpec.getType() == TaskType.ACTOR_TASK) {
if (taskSpec.getType() == TaskType.ACTOR_TASK) {
// Code path of concurrent actors.
// For concurrent actors, we should make sure the actor created
// before we submit the following actor tasks.
Expand Down Expand Up @@ -332,19 +325,13 @@ public List<ObjectId> submitActorTask(
Preconditions.checkState(numReturns <= 1);
TaskSpec.Builder builder = getTaskSpecBuilder(TaskType.ACTOR_TASK, functionDescriptor, args);
List<ObjectId> returnIds =
getReturnIds(TaskId.fromBytes(builder.getTaskId().toByteArray()), numReturns + 1);
getReturnIds(TaskId.fromBytes(builder.getTaskId().toByteArray()), numReturns);
TaskSpec taskSpec =
builder
.setNumReturns(numReturns + 1)
.setNumReturns(numReturns)
.setActorTaskSpec(
ActorTaskSpec.newBuilder()
.setActorId(ByteString.copyFrom(actor.getId().getBytes()))
.setPreviousActorTaskDummyObjectId(
ByteString.copyFrom(
((LocalModeActorHandle) actor)
.exchangePreviousActorTaskDummyObjectId(
returnIds.get(returnIds.size() - 1))
.getBytes()))
.build())
.setConcurrencyGroupName(options.concurrencyGroupName)
.build();
Expand Down
17 changes: 1 addition & 16 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,25 +296,16 @@ std::vector<ObjectID> TaskSpecification::GetDependencyIds() const {
dependencies.push_back(ArgId(i));
}
}
if (IsActorTask()) {
dependencies.push_back(PreviousActorTaskDummyObjectId());
}
return dependencies;
}

std::vector<rpc::ObjectReference> TaskSpecification::GetDependencies(
bool add_dummy_dependency) const {
std::vector<rpc::ObjectReference> TaskSpecification::GetDependencies() const {
std::vector<rpc::ObjectReference> dependencies;
for (size_t i = 0; i < NumArgs(); ++i) {
if (ArgByRef(i)) {
dependencies.push_back(message_->args(i).object_ref());
}
}
if (add_dummy_dependency && IsActorTask()) {
const auto &dummy_ref =
GetReferenceForActorDummyObject(PreviousActorTaskDummyObjectId());
dependencies.push_back(dummy_ref);
}
return dependencies;
}

Expand Down Expand Up @@ -405,12 +396,6 @@ ObjectID TaskSpecification::ActorCreationDummyObjectId() const {
message_->actor_task_spec().actor_creation_dummy_object_id());
}

ObjectID TaskSpecification::PreviousActorTaskDummyObjectId() const {
RAY_CHECK(IsActorTask());
return ObjectID::FromBinary(
message_->actor_task_spec().previous_actor_task_dummy_object_id());
}

ObjectID TaskSpecification::ActorDummyObject() const {
RAY_CHECK(IsActorTask() || IsActorCreationTask());
return ReturnId(NumReturns() - 1);
Expand Down
6 changes: 1 addition & 5 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

/// Return the dependencies of this task. This is recomputed each time, so it can
/// be used if the task spec is mutated.
/// \param add_dummy_dependency whether to add a dummy object in the returned objects.
/// \return The recomputed dependencies for the task.
std::vector<rpc::ObjectReference> GetDependencies(
bool add_dummy_dependency = true) const;
std::vector<rpc::ObjectReference> GetDependencies() const;

std::string GetDebuggerBreakpoint() const;

Expand Down Expand Up @@ -368,8 +366,6 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

ObjectID ActorCreationDummyObjectId() const;

ObjectID PreviousActorTaskDummyObjectId() const;

int MaxActorConcurrency() const;

bool IsAsyncioActor() const;
Expand Down
3 changes: 0 additions & 3 deletions src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,12 @@ class TaskSpecBuilder {
/// \return Reference to the builder object itself.
TaskSpecBuilder &SetActorTaskSpec(const ActorID &actor_id,
const ObjectID &actor_creation_dummy_object_id,
const ObjectID &previous_actor_task_dummy_object_id,
uint64_t actor_counter) {
message_->set_type(TaskType::ACTOR_TASK);
auto actor_spec = message_->mutable_actor_task_spec();
actor_spec->set_actor_id(actor_id.Binary());
actor_spec->set_actor_creation_dummy_object_id(
actor_creation_dummy_object_id.Binary());
actor_spec->set_previous_actor_task_dummy_object_id(
previous_actor_task_dummy_object_id.Binary());
actor_spec->set_actor_counter(actor_counter);
return *this;
}
Expand Down
8 changes: 1 addition & 7 deletions src/ray/core_worker/actor_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,14 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_
const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID());
const ObjectID actor_creation_dummy_object_id =
ObjectID::FromIndex(actor_creation_task_id, /*index=*/1);
builder.SetActorTaskSpec(GetActorID(),
actor_creation_dummy_object_id,
/*previous_actor_task_dummy_object_id=*/actor_cursor_,
task_counter_++);
actor_cursor_ = new_cursor;
builder.SetActorTaskSpec(GetActorID(), actor_creation_dummy_object_id, task_counter_++);
}

void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec,
const ObjectID new_cursor) {
absl::MutexLock guard(&mutex_);
auto mutable_spec = spec.GetMutableMessage().mutable_actor_task_spec();
mutable_spec->set_previous_actor_task_dummy_object_id(actor_cursor_.Binary());
mutable_spec->set_actor_counter(task_counter_++);
actor_cursor_ = new_cursor;
}

void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); }
Expand Down
9 changes: 1 addition & 8 deletions src/ray/core_worker/actor_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ namespace core {

class ActorHandle {
public:
ActorHandle(rpc::ActorHandle inner)
: inner_(inner), actor_cursor_(ObjectID::FromBinary(inner_.actor_cursor())) {}
ActorHandle(rpc::ActorHandle inner) : inner_(inner) {}

// Constructs a new ActorHandle as part of the actor creation process.
ActorHandle(const ActorID &actor_id,
Expand Down Expand Up @@ -103,12 +102,6 @@ class ActorHandle {
private:
// Protobuf-defined persistent state of the actor handle.
const rpc::ActorHandle inner_;

/// The unique id of the dummy object returned by the previous task.
/// TODO: This can be removed once we schedule actor tasks by task counter
/// only.
// TODO: Save this state in the core worker.
ObjectID actor_cursor_ GUARDED_BY(mutex_);
// Number of tasks that have been submitted on this handle.
uint64_t task_counter_ GUARDED_BY(mutex_) = 0;

Expand Down
15 changes: 2 additions & 13 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2071,9 +2071,6 @@ std::optional<std::vector<rpc::ObjectReference>> CoreWorker::SubmitActorTask(
// number of connections. The method is idempotent.
actor_manager_->SubscribeActorState(actor_id);

// Add one for actor cursor object id for tasks.
const int num_returns = task_options.num_returns + 1;

// Build common task spec.
TaskSpecBuilder builder;
const auto next_task_index = worker_context_.GetNextTaskIndex();
Expand All @@ -2100,7 +2097,7 @@ std::optional<std::vector<rpc::ObjectReference>> CoreWorker::SubmitActorTask(
rpc_address_,
function,
args,
num_returns,
task_options.num_returns,
task_options.resources,
required_resources,
"", /* debugger_breakpoint */
Expand All @@ -2111,10 +2108,7 @@ std::optional<std::vector<rpc::ObjectReference>> CoreWorker::SubmitActorTask(
// NOTE: placement_group_capture_child_tasks and runtime_env will
// be ignored in the actor because we should always follow the actor's option.

// TODO(swang): Do we actually need to set this ObjectID?
const ObjectID new_cursor = ObjectID::FromIndex(actor_task_id, num_returns);
actor_handle->SetActorTaskSpec(builder, new_cursor);

actor_handle->SetActorTaskSpec(builder, ObjectID::Nil());
// Submit task.
TaskSpecification task_spec = builder.Build();
RAY_LOG(DEBUG) << "Submitting actor task " << task_spec.DebugString();
Expand Down Expand Up @@ -2457,8 +2451,6 @@ Status CoreWorker::ExecuteTask(
}
RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(return_objects->size() > 0);
return_objects->pop_back();
task_type = TaskType::ACTOR_TASK;
}

Expand Down Expand Up @@ -2653,9 +2645,6 @@ std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(

std::vector<rpc::ObjectReference> returned_refs;
size_t num_returns = task_spec.NumReturns();
if (task_spec.IsActorTask()) {
num_returns--;
}
for (size_t i = 0; i < num_returns; i++) {
if (!task_spec.IsActorCreationTask()) {
reference_counter_->AddOwnedObject(task_spec.ReturnId(i),
Expand Down
6 changes: 0 additions & 6 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(

// Add new owned objects for the return values of the task.
size_t num_returns = spec.NumReturns();
if (spec.IsActorTask()) {
num_returns--;
}
std::vector<rpc::ObjectReference> returned_refs;
std::vector<ObjectID> return_ids;
for (size_t i = 0; i < num_returns; i++) {
Expand Down Expand Up @@ -618,9 +615,6 @@ void TaskManager::RemoveFinishedTaskReferences(

std::vector<ObjectID> return_ids;
size_t num_returns = spec.NumReturns();
if (spec.IsActorTask()) {
num_returns--;
}
for (size_t i = 0; i < num_returns; i++) {
return_ids.push_back(spec.ReturnId(i));
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/direct_actor_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TaskSpecification CreateActorTaskHelper(ActorID actor_id,
caller_worker_id.Binary());
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_id(actor_id.Binary());
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_counter(counter);
task.GetMutableMessage().set_num_returns(1);
task.GetMutableMessage().set_num_returns(0);
return task;
}

Expand Down
7 changes: 4 additions & 3 deletions src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
}

auto num_returns = task_spec.NumReturns();
if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) {
// Decrease to account for the dummy object id.
if (task_spec.IsActorCreationTask()) {
// Decrease to account for the dummy object id returned by the actor
// creation task.
num_returns--;
}
RAY_CHECK(num_returns >= 0);
Expand Down Expand Up @@ -213,7 +214,7 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
}
};

auto dependencies = task_spec.GetDependencies(false);
auto dependencies = task_spec.GetDependencies();

if (task_spec.IsActorTask()) {
auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId());
Expand Down
2 changes: 0 additions & 2 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,6 @@ message ActorTaskSpec {
bytes actor_creation_dummy_object_id = 4;
// Number of tasks that have been submitted to this actor so far.
uint64 actor_counter = 5;
// The dummy object ID of the previous actor task.
bytes previous_actor_task_dummy_object_id = 7;
}

// Represents a task, including task spec.
Expand Down

0 comments on commit 22a8ab6

Please sign in to comment.