Skip to content

Commit

Permalink
[Core] Move submitting actor creation task to ActorTaskSubmitter (ray…
Browse files Browse the repository at this point in the history
…-project#47080)

Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Aug 13, 2024
1 parent b4897b8 commit 9ea8630
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 173 deletions.
12 changes: 0 additions & 12 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -867,18 +867,6 @@ ray_cc_test(
],
)

ray_cc_test(
name = "direct_task_transport_mock_test",
size = "small",
srcs = ["src/ray/core_worker/test/direct_task_transport_mock_test.cc"],
tags = ["team:core"],
deps = [
":core_worker_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "reference_count_test",
size = "small",
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2385,7 +2385,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
<< "Failed to register actor. Error message: "
<< status.ToString();
} else {
RAY_UNUSED(normal_task_submitter_->SubmitTask(task_spec));
RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec));
}
}));
},
Expand All @@ -2400,7 +2400,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
}
io_service_.post(
[this, task_spec = std::move(task_spec)]() {
RAY_UNUSED(normal_task_submitter_->SubmitTask(task_spec));
RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec));
},
"CoreWorker.SubmitTask");
}
Expand Down
39 changes: 36 additions & 3 deletions src/ray/core_worker/test/direct_actor_transport_mock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DirectTaskTransportTest : public ::testing::Test {
return TaskSpecification(task_spec);
}

TaskSpecification GetCreatingTaskSpec(const ActorID &actor_id) {
TaskSpecification GetActorCreationTaskSpec(const ActorID &actor_id) {
rpc::TaskSpec task_spec;
task_spec.set_task_id(TaskID::ForActorCreationTask(actor_id).Binary());
task_spec.set_type(rpc::TaskType::ACTOR_CREATION_TASK);
Expand All @@ -76,11 +76,44 @@ class DirectTaskTransportTest : public ::testing::Test {
std::shared_ptr<ray::gcs::MockGcsClient> gcs_client;
};

TEST_F(DirectTaskTransportTest, ActorCreationOk) {
auto actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000");
auto creation_task_spec = GetActorCreationTaskSpec(actor_id);
EXPECT_CALL(*task_finisher, CompletePendingTask(creation_task_spec.TaskId(), _, _, _));
rpc::ClientCallback<rpc::CreateActorReply> create_cb;
EXPECT_CALL(*gcs_client->mock_actor_accessor,
AsyncCreateActor(creation_task_spec, ::testing::_))
.WillOnce(::testing::DoAll(::testing::SaveArg<1>(&create_cb),
::testing::Return(Status::OK())));
ASSERT_TRUE(actor_task_submitter->SubmitActorCreationTask(creation_task_spec).ok());
create_cb(Status::OK(), rpc::CreateActorReply());
}

TEST_F(DirectTaskTransportTest, ActorCreationFail) {
auto actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000");
auto creation_task_spec = GetActorCreationTaskSpec(actor_id);
EXPECT_CALL(*task_finisher, CompletePendingTask(_, _, _, _)).Times(0);
EXPECT_CALL(*task_finisher,
FailOrRetryPendingTask(creation_task_spec.TaskId(),
rpc::ErrorType::ACTOR_CREATION_FAILED,
_,
_,
true,
false));
rpc::ClientCallback<rpc::CreateActorReply> create_cb;
EXPECT_CALL(*gcs_client->mock_actor_accessor,
AsyncCreateActor(creation_task_spec, ::testing::_))
.WillOnce(::testing::DoAll(::testing::SaveArg<1>(&create_cb),
::testing::Return(Status::OK())));
ASSERT_TRUE(actor_task_submitter->SubmitActorCreationTask(creation_task_spec).ok());
create_cb(Status::IOError(""), rpc::CreateActorReply());
}

TEST_F(DirectTaskTransportTest, ActorRegisterFailure) {
auto actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000");
ASSERT_TRUE(ObjectID::IsActorID(ObjectID::ForActorHandle(actor_id)));
ASSERT_EQ(actor_id, ObjectID::ToActorID(ObjectID::ForActorHandle(actor_id)));
auto creation_task_spec = GetCreatingTaskSpec(actor_id);
auto creation_task_spec = GetActorCreationTaskSpec(actor_id);
auto task_spec = GetActorTaskSpec(actor_id);
auto task_arg = task_spec.GetMutableMessage().add_args();
auto inline_obj_ref = task_arg->add_nested_inlined_refs();
Expand All @@ -105,7 +138,7 @@ TEST_F(DirectTaskTransportTest, ActorRegisterOk) {
auto actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000");
ASSERT_TRUE(ObjectID::IsActorID(ObjectID::ForActorHandle(actor_id)));
ASSERT_EQ(actor_id, ObjectID::ToActorID(ObjectID::ForActorHandle(actor_id)));
auto creation_task_spec = GetCreatingTaskSpec(actor_id);
auto creation_task_spec = GetActorCreationTaskSpec(actor_id);
auto task_spec = GetActorTaskSpec(actor_id);
auto task_arg = task_spec.GetMutableMessage().add_args();
auto inline_obj_ref = task_arg->add_nested_inlined_refs();
Expand Down
99 changes: 0 additions & 99 deletions src/ray/core_worker/test/direct_task_transport_mock_test.cc

This file was deleted.

69 changes: 69 additions & 0 deletions src/ray/core_worker/transport/actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,75 @@ void ActorTaskSubmitter::AddActorQueueIfNotExists(const ActorID &actor_id,
actor_id, execute_out_of_order, max_pending_calls, fail_if_actor_unreachable));
}

Status ActorTaskSubmitter::SubmitActorCreationTask(TaskSpecification task_spec) {
RAY_CHECK(task_spec.IsActorCreationTask());
RAY_LOG(DEBUG).WithField(task_spec.TaskId()) << "Submitting actor creation task";
resolver_.ResolveDependencies(task_spec, [this, task_spec](Status status) mutable {
// NOTE: task_spec here is capture copied (from a stack variable) and also
// mutable. (Mutations to the variable are expected to be shared inside and
// outside of this closure).
task_finisher_.MarkDependenciesResolved(task_spec.TaskId());
if (!status.ok()) {
RAY_LOG(WARNING) << "Resolving task dependencies failed " << status.ToString();
RAY_UNUSED(task_finisher_.FailOrRetryPendingTask(
task_spec.TaskId(), rpc::ErrorType::DEPENDENCY_RESOLUTION_FAILED, &status));
return;
}
RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId();
// The actor creation task will be sent to
// gcs server directly after the in-memory dependent objects are resolved. For
// more details please see the protocol of actor management based on gcs.
// https://docs.google.com/document/d/1EAWide-jy05akJp6OMtDn58XOK7bUyruWMia4E-fV28/edit?usp=sharing
auto actor_id = task_spec.ActorCreationId();
auto task_id = task_spec.TaskId();
RAY_LOG(DEBUG).WithField(actor_id) << "Creating actor via GCS";
RAY_CHECK_OK(actor_creator_.AsyncCreateActor(
task_spec,
[this, actor_id, task_id](Status status, const rpc::CreateActorReply &reply) {
if (status.ok() || status.IsCreationTaskError()) {
rpc::PushTaskReply push_task_reply;
push_task_reply.mutable_borrowed_refs()->CopyFrom(reply.borrowed_refs());
if (status.IsCreationTaskError()) {
RAY_LOG(INFO).WithField(actor_id).WithField(task_id)
<< "Actor creation failed and we will not be retrying the "
"creation task";
// Update the task execution error to be CreationTaskError.
push_task_reply.set_task_execution_error(status.ToString());
} else {
RAY_LOG(DEBUG).WithField(actor_id) << "Created actor";
}
// NOTE: When actor creation task failed we will not retry the creation
// task so just marking the task fails.
task_finisher_.CompletePendingTask(
task_id,
push_task_reply,
reply.actor_address(),
/*is_application_error=*/status.IsCreationTaskError());
} else {
// Either fails the rpc call or actor scheduling cancelled.
rpc::RayErrorInfo ray_error_info;
if (status.IsSchedulingCancelled()) {
RAY_LOG(DEBUG).WithField(actor_id) << "Actor creation cancelled";
task_finisher_.MarkTaskCanceled(task_id);
if (reply.has_death_cause()) {
ray_error_info.mutable_actor_died_error()->CopyFrom(reply.death_cause());
}
} else {
RAY_LOG(INFO).WithField(actor_id)
<< "Failed to create actor with status: " << status.ToString();
}
RAY_UNUSED(task_finisher_.FailOrRetryPendingTask(
task_id,
rpc::ErrorType::ACTOR_CREATION_FAILED,
&status,
ray_error_info.has_actor_died_error() ? &ray_error_info : nullptr));
}
}));
});

return Status::OK();
}

Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
auto task_id = task_spec.TaskId();
auto actor_id = task_spec.ActorId();
Expand Down
6 changes: 6 additions & 0 deletions src/ray/core_worker/transport/actor_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
std::function<void(const ActorID &, int64_t)> warn_excess_queueing,
instrumented_io_context &io_service)
: core_worker_client_pool_(core_worker_client_pool),
actor_creator_(actor_creator),
resolver_(store, task_finisher, actor_creator),
task_finisher_(task_finisher),
warn_excess_queueing_(warn_excess_queueing),
Expand Down Expand Up @@ -116,6 +117,9 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
/// \return Status::Invalid if the task is not yet supported.
Status SubmitTask(TaskSpecification task_spec);

/// Submit an actor creation task to an actor via GCS.
Status SubmitActorCreationTask(TaskSpecification task_spec);

/// Create connection to actor and send all pending tasks.
///
/// \param[in] actor_id Actor ID.
Expand Down Expand Up @@ -384,6 +388,8 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
/// Pool for producing new core worker clients.
rpc::CoreWorkerClientPool &core_worker_client_pool_;

ActorCreatorInterface &actor_creator_;

/// Mutex to protect the various maps below.
mutable absl::Mutex mu_;

Expand Down
54 changes: 1 addition & 53 deletions src/ray/core_worker/transport/normal_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace ray {
namespace core {

Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_CHECK(task_spec.IsNormalTask());
RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId();
num_tasks_submitted_++;

Expand All @@ -37,59 +38,6 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
return;
}
RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId();
if (task_spec.IsActorCreationTask()) {
// If gcs actor management is enabled, the actor creation task will be sent to
// gcs server directly after the in-memory dependent objects are resolved. For
// more details please see the protocol of actor management based on gcs.
// https://docs.google.com/document/d/1EAWide-jy05akJp6OMtDn58XOK7bUyruWMia4E-fV28/edit?usp=sharing
auto actor_id = task_spec.ActorCreationId();
auto task_id = task_spec.TaskId();
RAY_LOG(DEBUG) << "Creating actor via GCS actor id = : " << actor_id;
RAY_CHECK_OK(actor_creator_->AsyncCreateActor(
task_spec,
[this, actor_id, task_id](Status status, const rpc::CreateActorReply &reply) {
if (status.ok() || status.IsCreationTaskError()) {
rpc::PushTaskReply push_task_reply;
push_task_reply.mutable_borrowed_refs()->CopyFrom(reply.borrowed_refs());
if (status.IsCreationTaskError()) {
RAY_LOG(INFO) << "Actor creation failed and we will not be retrying the "
"creation task, actor id = "
<< actor_id << ", task id = " << task_id;
// Update the task execution error to be CreationTaskError.
push_task_reply.set_task_execution_error(status.ToString());
} else {
RAY_LOG(DEBUG) << "Created actor, actor id = " << actor_id;
}
// NOTE: When actor creation task failed we will not retry the creation
// task so just marking the task fails.
task_finisher_->CompletePendingTask(
task_id,
push_task_reply,
reply.actor_address(),
/*is_application_error=*/status.IsCreationTaskError());
} else {
// Either fails the rpc call or actor scheduling cancelled.
rpc::RayErrorInfo ray_error_info;
if (status.IsSchedulingCancelled()) {
RAY_LOG(DEBUG) << "Actor creation cancelled, actor id = " << actor_id;
task_finisher_->MarkTaskCanceled(task_id);
if (reply.has_death_cause()) {
ray_error_info.mutable_actor_died_error()->CopyFrom(
reply.death_cause());
}
} else {
RAY_LOG(INFO) << "Failed to create actor " << actor_id
<< " with status: " << status.ToString();
}
RAY_UNUSED(task_finisher_->FailOrRetryPendingTask(
task_id,
rpc::ErrorType::ACTOR_CREATION_FAILED,
&status,
ray_error_info.has_actor_died_error() ? &ray_error_info : nullptr));
}
}));
return;
}

bool keep_executing = true;
{
Expand Down
4 changes: 0 additions & 4 deletions src/ray/core_worker/transport/normal_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class NormalTaskSubmitter {
lease_timeout_ms_(lease_timeout_ms),
local_raylet_id_(local_raylet_id),
worker_type_(worker_type),
actor_creator_(actor_creator),
client_cache_(core_worker_client_pool),
job_id_(job_id),
lease_request_rate_limiter_(lease_request_rate_limiter),
Expand Down Expand Up @@ -269,9 +268,6 @@ class NormalTaskSubmitter {
/// The type of this core worker process.
const WorkerType worker_type_;

/// Interface for actor creation.
std::shared_ptr<ActorCreatorInterface> actor_creator_;

// Protects task submission state below.
absl::Mutex mu_;

Expand Down

0 comments on commit 9ea8630

Please sign in to comment.