From 334ee6693d1315a988b2a0f300b6c2fa05791b06 Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Thu, 23 Mar 2023 05:25:36 -0700 Subject: [PATCH] [core][dashboard] Add repr_name as part of actor state (#33555) This PR adds the actor's repr name as part of the PushTaskReply when executing actor creation task on the executor, and stores it as part of GCS actor table. A couple of places where the repr name will now show up: Task summary (we will likely use "group_name" in the future) Repr name on actor table as another column Repr name on actor detail. --- .../client/src/components/ActorTable.tsx | 23 +++++ .../client/src/pages/actor/ActorDetail.tsx | 8 ++ dashboard/client/src/type/actor.ts | 1 + dashboard/modules/actor/actor_head.py | 2 + dashboard/modules/actor/tests/test_actor.py | 4 + dashboard/state_aggregator.py | 13 ++- python/ray/_raylet.pyx | 10 ++- python/ray/experimental/state/common.py | 26 +++++- python/ray/includes/libcoreworker.pxd | 1 + python/ray/tests/test_state_api_2.py | 85 ++++++++++++++++++- python/ray/tests/test_state_api_summary.py | 2 +- src/ray/core_worker/core_worker.cc | 5 ++ src/ray/core_worker/core_worker.h | 10 +++ .../transport/direct_actor_transport.cc | 11 ++- .../transport/direct_actor_transport.h | 9 ++ src/ray/gcs/gcs_server/gcs_actor_manager.cc | 1 + src/ray/gcs/gcs_server/gcs_actor_manager.h | 1 + src/ray/protobuf/core_worker.proto | 4 + src/ray/protobuf/gcs.proto | 5 ++ 19 files changed, 213 insertions(+), 8 deletions(-) diff --git a/dashboard/client/src/components/ActorTable.tsx b/dashboard/client/src/components/ActorTable.tsx index f2472ae27897..2ef4d150bb60 100644 --- a/dashboard/client/src/components/ActorTable.tsx +++ b/dashboard/client/src/components/ActorTable.tsx @@ -140,6 +140,25 @@ const ActorTable = ({ ), }, + { + label: "Repr", + helpInfo: ( + + The repr name of the actor instance defined by __repr__. For example, + this actor will have repr "Actor1" +
+
+ @ray.remote +
+ class Actor: +
+  def __repr__(self): +
+   return "Actor1" +
+
+ ), + }, { label: "State", helpInfo: ( @@ -353,6 +372,7 @@ const ActorTable = ({ ({ actorId, actorClass, + reprName, jobId, placementGroupId, pid, @@ -407,6 +427,9 @@ const ActorTable = ({ {actorClass} {name ? name : "-"} + + {reprName ? reprName : "-"} + diff --git a/dashboard/client/src/pages/actor/ActorDetail.tsx b/dashboard/client/src/pages/actor/ActorDetail.tsx index f8158e62ed88..80783d6b1baa 100644 --- a/dashboard/client/src/pages/actor/ActorDetail.tsx +++ b/dashboard/client/src/pages/actor/ActorDetail.tsx @@ -93,6 +93,14 @@ const ActorDetailPage = () => { } : { value: "-" }, }, + { + label: "Repr", + content: actorDetail.reprName + ? { + value: actorDetail.reprName, + } + : { value: "-" }, + }, { label: "Job ID", content: actorDetail.jobId diff --git a/dashboard/client/src/type/actor.ts b/dashboard/client/src/type/actor.ts index bf0bff94c219..7ac5274307ee 100644 --- a/dashboard/client/src/type/actor.ts +++ b/dashboard/client/src/type/actor.ts @@ -30,6 +30,7 @@ export type Actor = { [key: string]: number; }; exitDetail: string; + reprName: string; }; export type ActorDetail = { diff --git a/dashboard/modules/actor/actor_head.py b/dashboard/modules/actor/actor_head.py index 1a5e5a260e25..45b764217bed 100644 --- a/dashboard/modules/actor/actor_head.py +++ b/dashboard/modules/actor/actor_head.py @@ -57,6 +57,7 @@ def actor_table_data_to_dict(message): "className", "startTime", "endTime", + "reprName", } light_message = {k: v for (k, v) in orig_message.items() if k in fields} light_message["actorClass"] = orig_message["className"] @@ -141,6 +142,7 @@ async def _update_actors(self): "exitDetail", "startTime", "endTime", + "reprName", ) def process_actor_data_from_pubsub(actor_id, actor_table_data): diff --git a/dashboard/modules/actor/tests/test_actor.py b/dashboard/modules/actor/tests/test_actor.py index 92d09a976c8a..2d85790a6f8d 100644 --- a/dashboard/modules/actor/tests/test_actor.py +++ b/dashboard/modules/actor/tests/test_actor.py @@ -33,6 +33,9 @@ def get_pid(self): return os.getpid() + def __repr__(self) -> str: + return "Foo1" + @ray.remote(num_cpus=0, resources={"infeasible_actor": 1}) class InfeasibleActor: pass @@ -76,6 +79,7 @@ class InfeasibleActor: assert actor_response["requiredResources"] == {} assert actor_response["endTime"] == 0 assert actor_response["exitDetail"] == "-" + assert actor_response["reprName"] == "Foo1" for a in actors.values(): # "exitDetail always exits from the response" assert "exitDetail" in a diff --git a/dashboard/state_aggregator.py b/dashboard/state_aggregator.py index dd26d9bde5fd..96e0593a7499 100644 --- a/dashboard/state_aggregator.py +++ b/dashboard/state_aggregator.py @@ -622,10 +622,21 @@ async def summarize_tasks(self, option: SummaryApiOptions) -> SummaryApiResponse detail=summary_by == "lineage", ) ) + if summary_by == "func_name": summary_results = TaskSummaries.to_summary_by_func_name(tasks=result.result) else: - summary_results = TaskSummaries.to_summary_by_lineage(tasks=result.result) + # We will need the actors info for actor tasks. + actors = await self.list_actors( + option=ListApiOptions( + timeout=option.timeout, + limit=RAY_MAX_LIMIT_FROM_API_SERVER, + detail=True, + ) + ) + summary_results = TaskSummaries.to_summary_by_lineage( + tasks=result.result, actors=actors.result + ) summary = StateSummary(node_id_to_summary={"cluster": summary_results}) warnings = result.warnings if ( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1cda8ecdec21..01b535b0f45d 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -929,12 +929,17 @@ cdef void execute_task( if (hasattr(actor_class, "__ray_actor_class__") and (actor_class.__ray_actor_class__.__repr__ != object.__repr__)): + actor_repr = repr(actor) actor_magic_token = "{}{}\n".format( - ray_constants.LOG_PREFIX_ACTOR_NAME, repr(actor)) + ray_constants.LOG_PREFIX_ACTOR_NAME, actor_repr) # Flush on both stdout and stderr. print(actor_magic_token, end="") print(actor_magic_token, file=sys.stderr, end="") + # Sets the actor repr name for the actor so other components + # like GCS has such info. + core_worker.set_actor_repr_name(actor_repr) + if (returns[0].size() > 0 and not inspect.isgenerator(outputs) and len(outputs) != int(returns[0].size())): @@ -1627,6 +1632,9 @@ cdef class CoreWorker: def set_actor_title(self, title): CCoreWorkerProcess.GetCoreWorker().SetActorTitle(title) + def set_actor_repr_name(self, repr_name): + CCoreWorkerProcess.GetCoreWorker().SetActorReprName(repr_name) + def get_plasma_event_handler(self): return self.plasma_event_handler diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index 48f5c48e0885..ce7b7ddcd2eb 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -383,6 +383,8 @@ class ActorState(StateSchema): is_detached: bool = state_column(filterable=False, detail=True) #: The placement group id that's associated with this actor. placement_group_id: str = state_column(detail=True, filterable=True) + #: Actor's repr name if a customized __repr__ method exists, else empty string. + repr_name: str = state_column(detail=True, filterable=True) @dataclass(init=True) @@ -841,7 +843,9 @@ def to_summary_by_func_name(cls, *, tasks: List[Dict]) -> "TaskSummaries": ) @classmethod - def to_summary_by_lineage(cls, *, tasks: List[Dict]) -> "TaskSummaries": + def to_summary_by_lineage( + cls, *, tasks: List[Dict], actors: List[Dict] + ) -> "TaskSummaries": """ This summarizes tasks by lineage. i.e. A task will be grouped with another task if they have the @@ -881,6 +885,8 @@ def to_summary_by_lineage(cls, *, tasks: List[Dict]) -> "TaskSummaries": if type_enum == TaskType.ACTOR_CREATION_TASK: actor_creation_task_id_for_actor_id[task["actor_id"]] = task["task_id"] + actor_dict = {actor["actor_id"]: actor for actor in actors} + def get_or_create_task_group(task_id: str) -> Optional[NestedTaskSummary]: """ Gets an already created task_group @@ -952,6 +958,7 @@ def get_or_create_actor_task_group( Returns None if there is missing data about the actor or one of its parents. """ key = f"actor:{actor_id}" + actor = actor_dict.get(actor_id) if key not in task_group_by_id: creation_task_id = actor_creation_task_id_for_actor_id.get(actor_id) creation_task = tasks_by_id.get(creation_task_id) @@ -962,8 +969,21 @@ def get_or_create_actor_task_group( # tree at that node. return None - # TODO(aguo): Get actor name from actors state-api. - [actor_name, *rest] = creation_task["func_or_class_name"].split(".") + # TODO(rickyx) + # We are using repr name for grouping actors if exists, + # else use class name. We should be using some group_name in the future. + if actor is None: + logger.debug( + f"We are missing actor info for actor {actor_id}, " + f"even though creation task exists: {creation_task}" + ) + [actor_name, *rest] = creation_task["func_or_class_name"].split(".") + else: + actor_name = ( + actor["repr_name"] + if actor["repr_name"] + else actor["class_name"] + ) task_group_by_id[key] = NestedTaskSummary( name=actor_name, diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 9d83d098735c..07d95fdb49ad 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -158,6 +158,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_bool ShouldCaptureChildTasksInPlacementGroup() const CActorID &GetActorId() void SetActorTitle(const c_string &title) + void SetActorReprName(const c_string &repr_name) void SetWebuiDisplay(const c_string &key, const c_string &message) CTaskID GetCallerId() const ResourceMappingType &GetResourceIDs() const diff --git a/python/ray/tests/test_state_api_2.py b/python/ray/tests/test_state_api_2.py index 7ecdfb7d0102..9cf126870fae 100644 --- a/python/ray/tests/test_state_api_2.py +++ b/python/ray/tests/test_state_api_2.py @@ -9,7 +9,13 @@ import pytest from ray._private.profiling import chrome_tracing_dump -from ray.experimental.state.api import list_tasks, list_actors, list_workers, list_nodes +from ray.experimental.state.api import ( + get_actor, + list_tasks, + list_actors, + list_workers, + list_nodes, +) from ray._private.test_utils import wait_for_condition @@ -169,6 +175,83 @@ def verify(): wait_for_condition(verify, timeout=10) +def test_actor_repr_name(shutdown_only): + def _verify_repr_name(id, name): + actor = get_actor(id=id) + assert actor is not None + assert actor["repr_name"] == name + return True + + # Assert simple actor repr name + @ray.remote + class ReprActor: + def __init__(self, x) -> None: + self.x = x + + def __repr__(self) -> str: + return self.x + + def ready(self): + pass + + a = ReprActor.remote(x="repr-name-a") + b = ReprActor.remote(x="repr-name-b") + + wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name-a") + wait_for_condition(_verify_repr_name, id=b._actor_id.hex(), name="repr-name-b") + + # Assert when no __repr__ defined. repr_name should be empty + @ray.remote + class Actor: + pass + + a = Actor.remote() + wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="") + + # Assert special actors (async actor, threaded actor, detached actor, named actor) + @ray.remote + class AsyncActor: + def __init__(self, x) -> None: + self.x = x + + def __repr__(self) -> str: + return self.x + + async def ready(self): + pass + + a = AsyncActor.remote(x="async-x") + wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="async-x") + + a = ReprActor.options(max_concurrency=3).remote(x="x") + wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="x") + + a = ReprActor.options(name="named-actor").remote(x="repr-name") + wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name") + + a = ReprActor.options(name="detached-actor", lifetime="detached").remote( + x="repr-name" + ) + wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name") + ray.kill(a) + + # Assert nested actor class. + class OutClass: + @ray.remote + class InnerActor: + def __init__(self, name) -> None: + self.name = name + + def __repr__(self) -> str: + return self.name + + def get_actor(self, name): + return OutClass.InnerActor.remote(name=name) + + a = OutClass().get_actor(name="inner") + wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="inner") + + if __name__ == "__main__": import sys diff --git a/python/ray/tests/test_state_api_summary.py b/python/ray/tests/test_state_api_summary.py index 5d209fe4e202..c815d5f18de8 100644 --- a/python/ray/tests/test_state_api_summary.py +++ b/python/ray/tests/test_state_api_summary.py @@ -668,7 +668,7 @@ def grab_tasks_from_task_group( random.shuffle(tasks) - summary = TaskSummaries.to_summary_by_lineage(tasks=tasks) + summary = TaskSummaries.to_summary_by_lineage(tasks=tasks, actors=[]) assert summary.total_tasks == 20 assert summary.total_actor_tasks == 110 diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 688d16c0b770..171bf3725a10 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3745,6 +3745,11 @@ void CoreWorker::SetActorTitle(const std::string &title) { actor_title_ = title; } +void CoreWorker::SetActorReprName(const std::string &repr_name) { + RAY_CHECK(direct_task_receiver_ != nullptr); + direct_task_receiver_->SetActorReprName(repr_name); +} + rpc::JobConfig CoreWorker::GetJobConfig() const { return worker_context_.GetCurrentJobConfig(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index dea93c9b84b5..33aa89729e1b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -374,6 +374,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void SetActorTitle(const std::string &title); + /// Sets the actor's repr name. + /// + /// This is set explicitly rather than included as part of actor creation task spec + /// because it's only available after running the creation task as it might depend on + /// fields to be be initialized during actor creation task. The repr name will be + /// included as part of actor creation task reply (PushTaskReply) to GCS. + /// + /// \param repr_name Actor repr name. + void SetActorReprName(const std::string &repr_name); + void SetCallerCreationTimestamp(); /// Increase the reference count for this object ID. diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 5a588a679f69..bf97267363b6 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -187,8 +187,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask( << ", actor_id: " << task_spec.ActorCreationId() << ", status: " << status; } else { + // Set the actor repr name if it's customized by the actor. + if (!actor_repr_name_.empty()) { + reply->set_actor_repr_name(actor_repr_name_); + } RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId() - << ", actor_id: " << task_spec.ActorCreationId(); + << ", actor_id: " << task_spec.ActorCreationId() + << ", actor_repr_name: " << actor_repr_name_; } } } @@ -311,5 +316,9 @@ void CoreWorkerDirectTaskReceiver::Stop() { } } +void CoreWorkerDirectTaskReceiver::SetActorReprName(const std::string &repr_name) { + actor_repr_name_ = repr_name; +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 89181af3251d..38f2ad16153a 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -94,6 +94,12 @@ class CoreWorkerDirectTaskReceiver { void Stop(); + /// Set the actor repr name for an actor. + /// + /// The actor repr name is only available after actor creation task has been run since + /// the repr name could include data only initialized during the creation task. + void SetActorReprName(const std::string &repr_name); + private: /// Set up the configs for an actor. /// This should be called once for the actor creation task. @@ -135,6 +141,9 @@ class CoreWorkerDirectTaskReceiver { /// Whether this actor executes tasks out of order with respect to client submission /// order. bool execute_out_of_order_ = false; + /// The repr name of the actor instance for an anonymous actor. + /// This is only available after the actor creation task. + std::string actor_repr_name_ = ""; }; } // namespace core diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index c18ae602ec5f..1e72b4188af1 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -1224,6 +1224,7 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &ac auto worker_id = actor->GetWorkerID(); auto node_id = actor->GetNodeID(); mutable_actor_table_data->set_node_id(node_id.Binary()); + mutable_actor_table_data->set_repr_name(reply.actor_repr_name()); RAY_CHECK(!worker_id.IsNil()); RAY_CHECK(!node_id.IsNil()); RAY_CHECK(created_actors_[node_id].emplace(worker_id, actor_id).second); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 66ec7fa382ee..dc92cf815075 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -558,6 +558,7 @@ class GcsActorManager : public rpc::ActorInfoHandler { actor_delta->set_pid(actor.pid()); actor_delta->set_start_time(actor.start_time()); actor_delta->set_end_time(actor.end_time()); + actor_delta->set_repr_name(actor.repr_name()); // Acotr's namespace and name are used for removing cached name when it's dead. if (!actor.ray_namespace().empty()) { actor_delta->set_ray_namespace(actor.ray_namespace()); diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 2500f896b6a9..6b425b2a6c8e 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -137,6 +137,10 @@ message PushTaskReply { bool is_application_error = 6; // Whether the task was cancelled before it started running (i.e. while queued). bool was_cancelled_before_running = 7; + // If the task was an actor creation task, and the actor class has a customized + // repr defined for the anonymous actor (not a named actor), the repr name of the + // actor will be piggybacked to GCS to be included as part of ActorTableData. + optional string actor_repr_name = 8; } message DirectActorCallArgWaitCompleteRequest { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 0937e1156c98..597aeea0f27c 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -155,6 +155,11 @@ message ActorTableData { optional bytes node_id = 29; // Placement group ID if the actor requires a placement group. optional bytes placement_group_id = 30; + // The repr name of the actor if specified with a customized repr method, e.g. __repr__ + // This field is only available after the actor creation task has been run since it + // might depend on actor fields to be initialized in __init__. + // Default to empty string if no customized repr is defined. + string repr_name = 31; } message ErrorTableData {