Skip to content

Commit

Permalink
[Core] Add get_worker_id() to runtime context (ray-project#35967)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Jun 1, 2023
1 parent a3cfd8b commit f620ad2
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 10 deletions.
6 changes: 1 addition & 5 deletions python/ray/_private/ray_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
RAY_DEDUP_LOGS_ALLOW_REGEX,
RAY_DEDUP_LOGS_SKIP_REGEX,
)
from ray._private.utils import binary_to_hex
from ray.util.debug import log_once


Expand Down Expand Up @@ -212,10 +211,7 @@ def get_worker_log_file_name(worker_type, job_id=None):
# Make sure these values are set already.
assert ray._private.worker._global_node is not None
assert ray._private.worker.global_worker is not None
filename = (
f"{worker_name}-"
f"{binary_to_hex(ray._private.worker.global_worker.worker_id)}-"
)
filename = f"{worker_name}-{ray.get_runtime_context().get_worker_id()}-"
if job_id:
filename += f"{job_id}-"
filename += f"{os.getpid()}"
Expand Down
11 changes: 11 additions & 0 deletions python/ray/runtime_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ def get_node_id(self) -> str:
node_id = self.worker.current_node_id
return node_id.hex()

def get_worker_id(self) -> str:
"""Get current worker ID for this worker or driver process.
Returns:
A worker id in hex format for this worker or driver process.
"""
assert (
ray.is_initialized()
), "Worker ID is not available because Ray has not been initialized."
return self.worker.worker_id.hex()

@property
@Deprecated(message="Use get_task_id() instead", warning=True)
def task_id(self):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ async def ready(self):
# of cross-language scenarios. Java can't deserialize a Python tuple.
return json.dumps(
[
ray._private.worker.global_worker.worker_id.hex(),
ray.get_runtime_context().get_worker_id(),
get_component_logger_file_path(),
]
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ async def is_allocated(self) -> str:
return (
os.getpid(),
ray.get_runtime_context().get_actor_id(),
ray._private.worker.global_worker.worker_id.hex(),
ray.get_runtime_context().get_worker_id(),
ray.get_runtime_context().get_node_id(),
ray.util.get_node_ip_address(),
get_component_logger_file_path(),
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def __init__(
node_ip=ray.util.get_node_ip_address(),
actor_id=ray.get_runtime_context().get_actor_id(),
actor_name=self.controller_name,
worker_id=ray._private.worker.global_worker.worker_id.hex(),
worker_id=ray.get_runtime_context().get_worker_id(),
log_file_path=get_component_logger_file_path(),
)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_actor_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def __init__(self):
pass

def get_id(self):
return ray._private.worker.global_worker.worker_id
return ray.get_runtime_context().get_worker_id()

a = Actor.remote()
actor_id = ray.get(a.get_id.remote())

@ray.remote
def f():
return ray._private.worker.global_worker.worker_id
return ray.get_runtime_context().get_worker_id()

resulting_ids = ray.get([f.remote() for _ in range(100)])
assert actor_id not in resulting_ids
Expand Down

0 comments on commit f620ad2

Please sign in to comment.