Skip to content

Commit

Permalink
[core][dashboard] Add repr_name as part of actor state (ray-project#3…
Browse files Browse the repository at this point in the history
…3555)

This PR adds the actor's repr name as part of the PushTaskReply when executing actor creation task on the executor, and stores it as part of GCS actor table.

A couple of places where the repr name will now show up:

Task summary (we will likely use "group_name" in the future)
Repr name on actor table as another column
Repr name on actor detail.
  • Loading branch information
rickyyx authored Mar 23, 2023
1 parent 24996a6 commit 334ee66
Show file tree
Hide file tree
Showing 19 changed files with 213 additions and 8 deletions.
23 changes: 23 additions & 0 deletions dashboard/client/src/components/ActorTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,25 @@ const ActorTable = ({
</Typography>
),
},
{
label: "Repr",
helpInfo: (
<Typography>
The repr name of the actor instance defined by __repr__. For example,
this actor will have repr "Actor1"
<br />
<br />
@ray.remote
<br />
class Actor:
<br />
&emsp;def __repr__(self):
<br />
&emsp;&emsp;return "Actor1"
<br />
</Typography>
),
},
{
label: "State",
helpInfo: (
Expand Down Expand Up @@ -353,6 +372,7 @@ const ActorTable = ({
({
actorId,
actorClass,
reprName,
jobId,
placementGroupId,
pid,
Expand Down Expand Up @@ -407,6 +427,9 @@ const ActorTable = ({
</TableCell>
<TableCell align="center">{actorClass}</TableCell>
<TableCell align="center">{name ? name : "-"}</TableCell>
<TableCell align="center">
{reprName ? reprName : "-"}
</TableCell>
<TableCell align="center">
<StatusChip type="actor" status={state} />
</TableCell>
Expand Down
8 changes: 8 additions & 0 deletions dashboard/client/src/pages/actor/ActorDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ const ActorDetailPage = () => {
}
: { value: "-" },
},
{
label: "Repr",
content: actorDetail.reprName
? {
value: actorDetail.reprName,
}
: { value: "-" },
},
{
label: "Job ID",
content: actorDetail.jobId
Expand Down
1 change: 1 addition & 0 deletions dashboard/client/src/type/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export type Actor = {
[key: string]: number;
};
exitDetail: string;
reprName: string;
};

export type ActorDetail = {
Expand Down
2 changes: 2 additions & 0 deletions dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def actor_table_data_to_dict(message):
"className",
"startTime",
"endTime",
"reprName",
}
light_message = {k: v for (k, v) in orig_message.items() if k in fields}
light_message["actorClass"] = orig_message["className"]
Expand Down Expand Up @@ -141,6 +142,7 @@ async def _update_actors(self):
"exitDetail",
"startTime",
"endTime",
"reprName",
)

def process_actor_data_from_pubsub(actor_id, actor_table_data):
Expand Down
4 changes: 4 additions & 0 deletions dashboard/modules/actor/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def get_pid(self):

return os.getpid()

def __repr__(self) -> str:
return "Foo1"

@ray.remote(num_cpus=0, resources={"infeasible_actor": 1})
class InfeasibleActor:
pass
Expand Down Expand Up @@ -76,6 +79,7 @@ class InfeasibleActor:
assert actor_response["requiredResources"] == {}
assert actor_response["endTime"] == 0
assert actor_response["exitDetail"] == "-"
assert actor_response["reprName"] == "Foo1"
for a in actors.values():
# "exitDetail always exits from the response"
assert "exitDetail" in a
Expand Down
13 changes: 12 additions & 1 deletion dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,10 +622,21 @@ async def summarize_tasks(self, option: SummaryApiOptions) -> SummaryApiResponse
detail=summary_by == "lineage",
)
)

if summary_by == "func_name":
summary_results = TaskSummaries.to_summary_by_func_name(tasks=result.result)
else:
summary_results = TaskSummaries.to_summary_by_lineage(tasks=result.result)
# We will need the actors info for actor tasks.
actors = await self.list_actors(
option=ListApiOptions(
timeout=option.timeout,
limit=RAY_MAX_LIMIT_FROM_API_SERVER,
detail=True,
)
)
summary_results = TaskSummaries.to_summary_by_lineage(
tasks=result.result, actors=actors.result
)
summary = StateSummary(node_id_to_summary={"cluster": summary_results})
warnings = result.warnings
if (
Expand Down
10 changes: 9 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,17 @@ cdef void execute_task(
if (hasattr(actor_class, "__ray_actor_class__") and
(actor_class.__ray_actor_class__.__repr__
!= object.__repr__)):
actor_repr = repr(actor)
actor_magic_token = "{}{}\n".format(
ray_constants.LOG_PREFIX_ACTOR_NAME, repr(actor))
ray_constants.LOG_PREFIX_ACTOR_NAME, actor_repr)
# Flush on both stdout and stderr.
print(actor_magic_token, end="")
print(actor_magic_token, file=sys.stderr, end="")

# Sets the actor repr name for the actor so other components
# like GCS has such info.
core_worker.set_actor_repr_name(actor_repr)

if (returns[0].size() > 0 and
not inspect.isgenerator(outputs) and
len(outputs) != int(returns[0].size())):
Expand Down Expand Up @@ -1627,6 +1632,9 @@ cdef class CoreWorker:
def set_actor_title(self, title):
CCoreWorkerProcess.GetCoreWorker().SetActorTitle(title)

def set_actor_repr_name(self, repr_name):
CCoreWorkerProcess.GetCoreWorker().SetActorReprName(repr_name)

def get_plasma_event_handler(self):
return self.plasma_event_handler

Expand Down
26 changes: 23 additions & 3 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ class ActorState(StateSchema):
is_detached: bool = state_column(filterable=False, detail=True)
#: The placement group id that's associated with this actor.
placement_group_id: str = state_column(detail=True, filterable=True)
#: Actor's repr name if a customized __repr__ method exists, else empty string.
repr_name: str = state_column(detail=True, filterable=True)


@dataclass(init=True)
Expand Down Expand Up @@ -841,7 +843,9 @@ def to_summary_by_func_name(cls, *, tasks: List[Dict]) -> "TaskSummaries":
)

@classmethod
def to_summary_by_lineage(cls, *, tasks: List[Dict]) -> "TaskSummaries":
def to_summary_by_lineage(
cls, *, tasks: List[Dict], actors: List[Dict]
) -> "TaskSummaries":
"""
This summarizes tasks by lineage.
i.e. A task will be grouped with another task if they have the
Expand Down Expand Up @@ -881,6 +885,8 @@ def to_summary_by_lineage(cls, *, tasks: List[Dict]) -> "TaskSummaries":
if type_enum == TaskType.ACTOR_CREATION_TASK:
actor_creation_task_id_for_actor_id[task["actor_id"]] = task["task_id"]

actor_dict = {actor["actor_id"]: actor for actor in actors}

def get_or_create_task_group(task_id: str) -> Optional[NestedTaskSummary]:
"""
Gets an already created task_group
Expand Down Expand Up @@ -952,6 +958,7 @@ def get_or_create_actor_task_group(
Returns None if there is missing data about the actor or one of its parents.
"""
key = f"actor:{actor_id}"
actor = actor_dict.get(actor_id)
if key not in task_group_by_id:
creation_task_id = actor_creation_task_id_for_actor_id.get(actor_id)
creation_task = tasks_by_id.get(creation_task_id)
Expand All @@ -962,8 +969,21 @@ def get_or_create_actor_task_group(
# tree at that node.
return None

# TODO(aguo): Get actor name from actors state-api.
[actor_name, *rest] = creation_task["func_or_class_name"].split(".")
# TODO(rickyx)
# We are using repr name for grouping actors if exists,
# else use class name. We should be using some group_name in the future.
if actor is None:
logger.debug(
f"We are missing actor info for actor {actor_id}, "
f"even though creation task exists: {creation_task}"
)
[actor_name, *rest] = creation_task["func_or_class_name"].split(".")
else:
actor_name = (
actor["repr_name"]
if actor["repr_name"]
else actor["class_name"]
)

task_group_by_id[key] = NestedTaskSummary(
name=actor_name,
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_bool ShouldCaptureChildTasksInPlacementGroup()
const CActorID &GetActorId()
void SetActorTitle(const c_string &title)
void SetActorReprName(const c_string &repr_name)
void SetWebuiDisplay(const c_string &key, const c_string &message)
CTaskID GetCallerId()
const ResourceMappingType &GetResourceIDs() const
Expand Down
85 changes: 84 additions & 1 deletion python/ray/tests/test_state_api_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
import pytest

from ray._private.profiling import chrome_tracing_dump
from ray.experimental.state.api import list_tasks, list_actors, list_workers, list_nodes
from ray.experimental.state.api import (
get_actor,
list_tasks,
list_actors,
list_workers,
list_nodes,
)
from ray._private.test_utils import wait_for_condition


Expand Down Expand Up @@ -169,6 +175,83 @@ def verify():
wait_for_condition(verify, timeout=10)


def test_actor_repr_name(shutdown_only):
def _verify_repr_name(id, name):
actor = get_actor(id=id)
assert actor is not None
assert actor["repr_name"] == name
return True

# Assert simple actor repr name
@ray.remote
class ReprActor:
def __init__(self, x) -> None:
self.x = x

def __repr__(self) -> str:
return self.x

def ready(self):
pass

a = ReprActor.remote(x="repr-name-a")
b = ReprActor.remote(x="repr-name-b")

wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name-a")
wait_for_condition(_verify_repr_name, id=b._actor_id.hex(), name="repr-name-b")

# Assert when no __repr__ defined. repr_name should be empty
@ray.remote
class Actor:
pass

a = Actor.remote()
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="")

# Assert special actors (async actor, threaded actor, detached actor, named actor)
@ray.remote
class AsyncActor:
def __init__(self, x) -> None:
self.x = x

def __repr__(self) -> str:
return self.x

async def ready(self):
pass

a = AsyncActor.remote(x="async-x")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="async-x")

a = ReprActor.options(max_concurrency=3).remote(x="x")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="x")

a = ReprActor.options(name="named-actor").remote(x="repr-name")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name")

a = ReprActor.options(name="detached-actor", lifetime="detached").remote(
x="repr-name"
)
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="repr-name")
ray.kill(a)

# Assert nested actor class.
class OutClass:
@ray.remote
class InnerActor:
def __init__(self, name) -> None:
self.name = name

def __repr__(self) -> str:
return self.name

def get_actor(self, name):
return OutClass.InnerActor.remote(name=name)

a = OutClass().get_actor(name="inner")
wait_for_condition(_verify_repr_name, id=a._actor_id.hex(), name="inner")


if __name__ == "__main__":
import sys

Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_state_api_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def grab_tasks_from_task_group(

random.shuffle(tasks)

summary = TaskSummaries.to_summary_by_lineage(tasks=tasks)
summary = TaskSummaries.to_summary_by_lineage(tasks=tasks, actors=[])

assert summary.total_tasks == 20
assert summary.total_actor_tasks == 110
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3745,6 +3745,11 @@ void CoreWorker::SetActorTitle(const std::string &title) {
actor_title_ = title;
}

void CoreWorker::SetActorReprName(const std::string &repr_name) {
RAY_CHECK(direct_task_receiver_ != nullptr);
direct_task_receiver_->SetActorReprName(repr_name);
}

rpc::JobConfig CoreWorker::GetJobConfig() const {
return worker_context_.GetCurrentJobConfig();
}
Expand Down
10 changes: 10 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {

void SetActorTitle(const std::string &title);

/// Sets the actor's repr name.
///
/// This is set explicitly rather than included as part of actor creation task spec
/// because it's only available after running the creation task as it might depend on
/// fields to be be initialized during actor creation task. The repr name will be
/// included as part of actor creation task reply (PushTaskReply) to GCS.
///
/// \param repr_name Actor repr name.
void SetActorReprName(const std::string &repr_name);

void SetCallerCreationTimestamp();

/// Increase the reference count for this object ID.
Expand Down
11 changes: 10 additions & 1 deletion src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
<< ", actor_id: " << task_spec.ActorCreationId()
<< ", status: " << status;
} else {
// Set the actor repr name if it's customized by the actor.
if (!actor_repr_name_.empty()) {
reply->set_actor_repr_name(actor_repr_name_);
}
RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId()
<< ", actor_id: " << task_spec.ActorCreationId();
<< ", actor_id: " << task_spec.ActorCreationId()
<< ", actor_repr_name: " << actor_repr_name_;
}
}
}
Expand Down Expand Up @@ -311,5 +316,9 @@ void CoreWorkerDirectTaskReceiver::Stop() {
}
}

void CoreWorkerDirectTaskReceiver::SetActorReprName(const std::string &repr_name) {
actor_repr_name_ = repr_name;
}

} // namespace core
} // namespace ray
Loading

0 comments on commit 334ee66

Please sign in to comment.