Skip to content

Commit

Permalink
[Core] Write ray address even if ray node is started with --block (ra…
Browse files Browse the repository at this point in the history
…y-project#32961)

With ray-project#26678, when a ray cluster is started, it's address is written to /tmp/ray/ray_current_cluster so ray.init() can find the existing cluster and connect to it by default. However if a node is started with ray start --block, the file is not created so ray.init() will create a new cluster instead of connecting to the existing one, which is unexpected.

Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Mar 2, 2023
1 parent bfba96c commit 4d0ce8d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
6 changes: 3 additions & 3 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,9 @@ def start(
cli_logger.print(cf.bold(" ray stop"))
cli_logger.flush()

assert ray_params.gcs_address is not None
ray._private.utils.write_ray_address(ray_params.gcs_address, temp_dir)

if block:
cli_logger.newline()
with cli_logger.group(cf.bold("--block")):
Expand Down Expand Up @@ -987,9 +990,6 @@ def start(
os._exit(1)
# not-reachable

assert ray_params.gcs_address is not None
ray._private.utils.write_ray_address(ray_params.gcs_address, temp_dir)


@cli.command()
@click.option(
Expand Down
34 changes: 34 additions & 0 deletions python/ray/tests/test_ray_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ray.util.client.common import ClientObjectRef
from ray.util.client.ray_client_helpers import ray_start_client_server
from ray.util.client.worker import Worker
from ray._private.test_utils import wait_for_condition


@pytest.mark.skipif(
Expand Down Expand Up @@ -80,6 +81,39 @@ def test_ray_init_existing_instance(call_ray_start, address):
subprocess.check_output("ray stop --force", shell=True)


@pytest.mark.skipif(
os.environ.get("CI") and sys.platform == "win32",
reason="Flaky when run on windows CI",
)
def test_ray_init_existing_instance_via_blocked_ray_start():
blocked = subprocess.Popen(
["ray", "start", "--head", "--block", "--num-cpus", "1999"]
)

def _connect_to_existing_instance():
while True:
try:
# Make sure ray.init can connect to the existing cluster.
ray.init()
if ray.cluster_resources().get("CPU", 0) == 1999:
return True
else:
return False
except Exception:
return False
finally:
ray.shutdown()

try:
wait_for_condition(
_connect_to_existing_instance, timeout=30, retry_interval_ms=1000
)
finally:
blocked.terminate()
blocked.wait()
subprocess.check_output("ray stop --force", shell=True)


@pytest.mark.skipif(
os.environ.get("CI") and sys.platform == "win32",
reason="Flaky when run on windows CI",
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/raylet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ray::Status Raylet::RegisterGcs() {
<< ":" << self_node_info_.node_manager_port()
<< " object_manager address: " << self_node_info_.node_manager_address()
<< ":" << self_node_info_.object_manager_port()
<< " hostname: " << self_node_info_.node_manager_address();
<< " hostname: " << self_node_info_.node_manager_hostname();
RAY_CHECK_OK(node_manager_.RegisterGcs());
};

Expand Down

0 comments on commit 4d0ce8d

Please sign in to comment.