Skip to content

Commit

Permalink
[Core][runtime-context] replace deprecated API usage ray-project#31198
Browse files Browse the repository at this point in the history
get_runtime_context().(actor|job|node|...)_id is deprecated, using the new API instead.
  • Loading branch information
scv119 authored Jan 5, 2023
1 parent a96d5de commit 5a6b234
Show file tree
Hide file tree
Showing 19 changed files with 30 additions and 30 deletions.
2 changes: 1 addition & 1 deletion dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def _get_current_node_resource_key(self) -> str:
It can be used for actor placement.
"""
current_node_id = ray.get_runtime_context().node_id.hex()
current_node_id = ray.get_runtime_context().get_node_id()
for node in ray.nodes():
if node["NodeID"] == current_node_id:
# Found the node.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class BlockExecStats:
def __init__(self):
self.wall_time_s: Optional[float] = None
self.cpu_time_s: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().node_id.hex()
self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
# Max memory usage. May be an overestimate since we do not
# differentiate from previous tasks on the same worker.
self.max_rss_bytes: int = 0
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/random_access_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def multiget(self, block_indices, keys):
return result

def ping(self):
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

def stats(self) -> dict:
return {
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5043,7 +5043,7 @@ def check_dataset_is_local(ds):
locations = []
for block in blocks:
locations.extend(location_data[block]["node_ids"])
assert set(locations) == {ray.get_runtime_context().node_id.hex()}
assert set(locations) == {ray.get_runtime_context().get_node_id()}

local_path = "local://" + data_path
# Plain read.
Expand Down Expand Up @@ -5095,7 +5095,7 @@ def test_random_shuffle_spread(ray_start_cluster, use_push_based_shuffle):

@ray.remote
def get_node_id():
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

node1_id = ray.get(get_node_id.options(resources={"bar:1": 1}).remote())
node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote())
Expand Down Expand Up @@ -5129,7 +5129,7 @@ def test_parquet_read_spread(ray_start_cluster, tmp_path):

@ray.remote
def get_node_id():
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

node1_id = ray.get(get_node_id.options(resources={"bar:1": 1}).remote())
node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote())
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_dataset_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def do_write(

@ray.remote
def write(b):
node_id = ray.get_runtime_context().node_id.hex()
node_id = ray.get_runtime_context().get_node_id()
return ray.get(data_sink.write.remote(node_id, b))

tasks = []
Expand Down Expand Up @@ -293,7 +293,7 @@ def test_write_datasource_ray_remote_args(ray_start_cluster):

@ray.remote
def get_node_id():
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

bar_node_id = ray.get(get_node_id.options(resources={"bar": 1}).remote())

Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_dataset_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def test_read_text_remote_args(ray_start_cluster, tmp_path):

@ray.remote
def get_node_id():
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

bar_node_id = ray.get(get_node_id.options(resources={"bar": 1}).remote())

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def serve_start(

# Used for scheduling things to the head node explicitly.
# Assumes that `serve.start` runs on the head node.
head_node_id = ray.get_runtime_context().node_id.hex()
head_node_id = ray.get_runtime_context().get_node_id()
controller_actor_options = {
"num_cpus": 1 if dedicated_cpu else 0,
"name": controller_name,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def get_current_node_resource_key() -> str:
It can be used for actor placement.
"""
current_node_id = ray.get_runtime_context().node_id.hex()
current_node_id = ray.get_runtime_context().get_node_id()
for node in ray.nodes():
if node["NodeID"] == current_node_id:
# Found the node.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ def __init__(
self._controller = None
if self._controller is None:
# Used for scheduling things to the head node explicitly.
head_node_id = ray.get_runtime_context().node_id.hex()
head_node_id = ray.get_runtime_context().get_node_id()
http_config = HTTPOptions()
http_config.port = http_proxy_port
self._controller = ServeController.options(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def test_replica_spread(ray_cluster):

@serve.deployment(num_replicas=2)
def get_node_id():
return os.getpid(), ray.get_runtime_context().node_id.hex()
return os.getpid(), ray.get_runtime_context().get_node_id()

h = serve.run(get_node_id.bind())

Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_actor_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ def test_actor_timestamps(ray_start_regular):
@ray.remote
class Foo:
def get_id(self):
return ray.get_runtime_context().actor_id.hex()
return ray.get_runtime_context().get_actor_id()

def kill_self(self):
sys.exit(1)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def ready(self):
a = A.options(lifetime="detached", name="A").remote()
assert ray.get(a.ready.remote()) == {val}
assert ray.get_runtime_context().job_id.hex() == '01000000'
assert ray.get_runtime_context().get_job_id() == '01000000'
"""
run_string_as_driver(script.format(address=call_ray_start, val=1))
run_string_as_driver(script.format(address=call_ray_start_2, val=2))
Expand All @@ -250,7 +250,7 @@ def ready(self):
ray.init("{address}", namespace="a")
a = ray.get_actor(name="A")
assert ray.get(a.ready.remote()) == {val}
assert ray.get_runtime_context().job_id.hex() == '02000000'
assert ray.get_runtime_context().get_job_id() == '02000000'
"""
run_string_as_driver(script.format(address=call_ray_start, val=1))
run_string_as_driver(script.format(address=call_ray_start_2, val=2))
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def print_after(_obj):
p = init_log_pubsub()
print_after.remote(print_msg.remote())
msgs = get_log_message(
p, num=3, timeout=1, job_id=ray.get_runtime_context().job_id.hex()
p, num=3, timeout=1, job_id=ray.get_runtime_context().get_job_id()
)

assert len(msgs) == 1, msgs
Expand Down
8 changes: 4 additions & 4 deletions python/ray/tests/test_scheduling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):

@ray.remote
def get_node_id():
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

worker_node_ids = {
ray.get(get_node_id.options(resources={f"foo:{i}": 1}).remote())
Expand All @@ -467,12 +467,12 @@ def task1():
internal_kv._internal_kv_put("test_task1", "task1")
while internal_kv._internal_kv_exists("test_task1"):
time.sleep(0.1)
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

@ray.remote
def task2():
internal_kv._internal_kv_put("test_task2", "task2")
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

locations = []
locations.append(task1.remote())
Expand All @@ -494,7 +494,7 @@ def task2():
@ray.remote(num_cpus=1)
class Actor:
def ping(self):
return ray.get_runtime_context().node_id.hex()
return ray.get_runtime_context().get_node_id()

actors = []
locations = []
Expand Down
2 changes: 1 addition & 1 deletion python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def construct_metadata() -> WorkerMetadata:
This function is expected to be run on the actor.
"""
node_id = ray.get_runtime_context().node_id.hex()
node_id = ray.get_runtime_context().get_node_id()
node_ip = ray.util.get_node_ip_address()
hostname = socket.gethostname()
gpu_ids = [str(gpu_id) for gpu_id in ray.get_gpu_ids()]
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/utils/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def _get_current_node_resource_key() -> str:
Returns:
(str) A string of the format node:<CURRENT-NODE-IP-ADDRESS>
"""
current_node_id = ray.get_runtime_context().node_id.hex()
current_node_id = ray.get_runtime_context().get_node_id()
for node in ray.nodes():
if node["NodeID"] == current_node_id:
# Found the node.
Expand Down
4 changes: 2 additions & 2 deletions python/ray/util/rpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def do_remote(self, arg):
# Tell the debug loop to connect to the next task.
data = json.dumps(
{
"job_id": ray.get_runtime_context().job_id.hex(),
"job_id": ray.get_runtime_context().get_job_id(),
}
)
_internal_kv_put(
Expand Down Expand Up @@ -259,7 +259,7 @@ def _connect_ray_pdb(
"lineno": parentframeinfo.lineno,
"traceback": "\n".join(traceback.format_exception(*sys.exc_info())),
"timestamp": time.time(),
"job_id": ray.get_runtime_context().job_id.hex(),
"job_id": ray.get_runtime_context().get_job_id(),
}
_internal_kv_put(
"RAY_PDB_{}".format(breakpoint_uuid),
Expand Down
6 changes: 3 additions & 3 deletions python/ray/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def _try_checkpoint_workflow(workflow_state) -> bool:
workflow_id, state, ignore_existing=False
)
)
job_id = ray.get_runtime_context().job_id.hex()
job_id = ray.get_runtime_context().get_job_id()
return workflow_manager.execute_workflow.remote(job_id, context)


Expand Down Expand Up @@ -284,7 +284,7 @@ def resume_async(workflow_id: str) -> ray.ObjectRef:
# ensures caller of 'run()' holds the reference to the workflow
# result. Otherwise if the actor removes the reference of the
# workflow output, the caller may fail to resolve the result.
job_id = ray.get_runtime_context().job_id.hex()
job_id = ray.get_runtime_context().get_job_id()

context = workflow_context.WorkflowTaskContext(workflow_id=workflow_id)
ray.get(workflow_manager.reconstruct_workflow.remote(job_id, context))
Expand Down Expand Up @@ -502,7 +502,7 @@ def resume_all(include_failed: bool = False) -> List[Tuple[str, ray.ObjectRef]]:
except Exception as e:
raise RuntimeError("Failed to get management actor") from e

job_id = ray.get_runtime_context().job_id.hex()
job_id = ray.get_runtime_context().get_job_id()
reconstructed_workflows = []
for wid, _ in all_failed:
context = workflow_context.WorkflowTaskContext(workflow_id=wid)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/workflow/debug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def execute_workflow_local(dag: DAGNode, workflow_id: str, *args, **kwargs):
"""Execute the workflow locally."""
from ray.workflow.workflow_state_from_dag import workflow_state_from_dag

job_id = ray.get_runtime_context().job_id.hex()
job_id = ray.get_runtime_context().get_job_id()
context = WorkflowTaskContext(workflow_id=workflow_id)
with workflow_task_context(context):
wf_store = get_workflow_storage()
Expand All @@ -29,7 +29,7 @@ def resume_workflow_local(workflow_id: str):
"""Resume the workflow locally."""
from ray.workflow.workflow_state_from_storage import workflow_state_from_storage

job_id = ray.get_runtime_context().job_id.hex()
job_id = ray.get_runtime_context().get_job_id()
context = WorkflowTaskContext(workflow_id=workflow_id)
with workflow_task_context(context):
wf_store = get_workflow_storage()
Expand Down

0 comments on commit 5a6b234

Please sign in to comment.