Skip to content

Commit

Permalink
[core] Add opt-in flag for Windows and OSX clusters, update `ray star…
Browse files Browse the repository at this point in the history
…t` output to match docs (ray-project#31166)

This PR cleans up a few usability issues around Ray clusters:

    Makes some cleanups to the ray start log output to match the new documentation on Ray clusters. Mainly, de-emphasize Ray Client and recommend jobs instead.
    Add an opt-in flag for enabling multi-node clusters for OSX and Windows. Previously, it was possible to start a multi-node cluster, but then any Ray programs would fail mysteriously after connecting to the cluster. Now, it will warn the user with an error message if the opt-in flag is not set.
    Document multi-node support for OSX and Windows.

Signed-off-by: Stephanie Wang <[email protected]>
Co-authored-by: Archit Kulkarni <[email protected]>
  • Loading branch information
stephanie-wang and architkulkarni authored Feb 9, 2023
1 parent d653f73 commit 90f8511
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 88 deletions.
3 changes: 3 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ test --test_env=PYENV_VERSION
test --test_env=PYENV_SHELL
# Do not send usage stats to the server for tests
test --test_env=RAY_USAGE_STATS_REPORT_URL="http://127.0.0.1:8000"
# Enable cluster mode for OSX and Windows. By default, Ray
# will not allow multinode OSX and Windows clusters.
test --test_env=RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER="1"
# This is needed for some core tests to run correctly
build:windows --enable_runfiles
# TODO(mehrdadn): Revert the "-\\.(asm|S)$" exclusion when this Bazel bug
Expand Down
1 change: 1 addition & 0 deletions .buildkite/pipeline.windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ steps:
- conda init
- . ./ci/ci.sh init
- ./ci/ci.sh build
- export RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER="1"
- if [ "${BUILDKITE_PARALLEL_JOB}" = "0" ]; then ./ci/ci.sh test_core; fi
# The next command will be sharded into $parallelism shards.
- ./ci/ci.sh test_python
Expand Down
6 changes: 6 additions & 0 deletions doc/source/cluster/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ Ray provides native cluster deployment support on the following technology stack
Advanced users may want to :ref:`deploy Ray manually <on-prem>`
or onto :ref:`platforms not listed here <ref-cluster-setup>`.

.. note::

Multi-node Ray clusters are only supported on Linux. At your own risk, you
may deploy Windows and OSX clusters by setting the environment variable
``RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1`` during deployment.

What's next?
------------

Expand Down
6 changes: 4 additions & 2 deletions doc/source/ray-overview/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ You can install and use Ray C++ API as follows.
M1 Mac (Apple Silicon) Support
------------------------------

Ray has experimental support for machines running Apple Silicon (such as M1 macs). To get started:
Ray has experimental support for machines running Apple Silicon (such as M1 macs).
Multi-node clusters are untested. To get started with local Ray development:

#. Install `miniforge <https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-MacOSX-arm64.sh>`_.

Expand Down Expand Up @@ -236,7 +237,8 @@ Ray has experimental support for machines running Apple Silicon (such as M1 macs
Windows Support
---------------

Windows support is currently in beta. Please submit any issues you encounter on
Windows support is currently in beta, and multi-node Ray clusters are untested.
Please submit any issues you encounter on
`GitHub <https://github.com/ray-project/ray/issues/>`_.

Installing Ray on Arch Linux
Expand Down
19 changes: 18 additions & 1 deletion python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import os
import sys

logger = logging.getLogger(__name__)

Expand All @@ -23,10 +24,18 @@ def env_integer(key, default):

def env_bool(key, default):
if key in os.environ:
return True if os.environ[key].lower() == "true" else False
return (
True
if os.environ[key].lower() == "true" or os.environ[key] == "1"
else False
)
return default


def env_set_by_user(key):
return key in os.environ


# Whether event logging to driver is enabled. Set to 0 to disable.
AUTOSCALER_EVENTS = env_integer("RAY_SCHEDULER_EVENTS", 1)

Expand Down Expand Up @@ -370,3 +379,11 @@ def gcs_actor_scheduling_enabled():
# Ray wheels into the conda environment, so the Ray wheels for these Python
# versions must be available online.
RUNTIME_ENV_CONDA_PY_VERSIONS = [(3, 6), (3, 7), (3, 8), (3, 9), (3, 10)]

# Whether to enable Ray clusters (in addition to local Ray).
# Ray clusters are not explicitly supported for Windows and OSX.
ENABLE_RAY_CLUSTERS_ENV_VAR = "RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER"
ENABLE_RAY_CLUSTER = env_bool(
ENABLE_RAY_CLUSTERS_ENV_VAR,
not (sys.platform == "darwin" or sys.platform == "win32"),
)
13 changes: 8 additions & 5 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,11 @@ def resolve_ip_for_localhost(address: str):
if not address:
raise ValueError(f"Malformed address: {address}")
address_parts = address.split(":")
# Make sure localhost isn't resolved to the loopback ip
if address_parts[0] == "127.0.0.1" or address_parts[0] == "localhost":
# Clusters are disabled by default for OSX and Windows.
if not ray_constants.ENABLE_RAY_CLUSTER:
return address
# Make sure localhost isn't resolved to the loopback ip
ip_address = get_node_ip_address()
return ":".join([ip_address] + address_parts[1:])
else:
Expand Down Expand Up @@ -627,10 +630,10 @@ def node_ip_address_from_perspective(address: str):
def get_node_ip_address(address="8.8.8.8:53"):
if ray._private.worker._global_node is not None:
return ray._private.worker._global_node.node_ip_address
if sys.platform == "darwin" or sys.platform == "win32":
# Due to the mac osx/windows firewall,
# we use loopback ip as the ip address
# to prevent security popups.
if not ray_constants.ENABLE_RAY_CLUSTER:
# Use loopback IP as the local IP address to prevent bothersome
# firewall popups on OSX and Windows.
# https://github.com/ray-project/ray/issues/18730.
return "127.0.0.1"
return node_ip_address_from_perspective(address)

Expand Down
17 changes: 13 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1353,15 +1353,24 @@ def init(
job_config = ray.job_config.JobConfig()
job_config.set_runtime_env(runtime_env)

if _node_ip_address is not None:
node_ip_address = services.resolve_ip_for_localhost(_node_ip_address)
raylet_ip_address = node_ip_address

redis_address, gcs_address = None, None
bootstrap_address = services.canonicalize_bootstrap_address(address, _temp_dir)
if bootstrap_address is not None:
gcs_address = bootstrap_address
logger.info("Connecting to existing Ray cluster at address: %s...", gcs_address)
if not ray_constants.env_set_by_user(ray_constants.ENABLE_RAY_CLUSTERS_ENV_VAR):
# If the cluster already exists, then assume it's safe to connect
# to the cluster even if we're on Windows or OSX (unless the user
# explicitly set the flag).
ray_constants.ENABLE_RAY_CLUSTER = True

# NOTE(swang): We must set the node IP address *after* we determine whether
# this is an existing cluster or not. For Windows and OSX, the resolved IP
# is localhost for new clusters and the usual public IP for existing
# clusters.
if _node_ip_address is not None:
node_ip_address = services.resolve_ip_for_localhost(_node_ip_address)
raylet_ip_address = node_ip_address

if local_mode:
driver_mode = LOCAL_MODE
Expand Down
159 changes: 95 additions & 64 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,7 @@ def start(
cf.bold("--port"),
)

# Whether the original arguments include node_ip_address.
include_node_ip_address = False
if node_ip_address is not None:
include_node_ip_address = True
node_ip_address = services.resolve_ip_for_localhost(node_ip_address)

resources = parse_resources_json(resources, cli_logger, cf)
Expand Down Expand Up @@ -749,87 +746,121 @@ def start(
cli_logger.success("-" * len(startup_msg))
cli_logger.newline()
with cli_logger.group("Next steps"):
cli_logger.print("To connect to this Ray runtime from another node, run")
# NOTE(kfstorm): Java driver rely on this line to get the address
# of the cluster. Please be careful when updating this line.
cli_logger.print(
cf.bold(" ray start --address='{}'"),
bootstrap_address,
)
dashboard_url = node.address_info["webui_url"]
if bootstrap_address.startswith("127.0.0.1:"):
if ray_constants.ENABLE_RAY_CLUSTER:
cli_logger.print(
"This Ray runtime only accepts connections from local host."
)
cli_logger.print(
"To accept connections from remote hosts, "
"specify a public ip when starting"
)
cli_logger.print(
"the head node: ray start --head --node-ip-address=<public-ip>."
)
else:
cli_logger.print(
"Multi-node Ray clusters are not supported on OSX and Windows."
)
cli_logger.print(
"If you would like to proceed anyway, restart Ray with:"
)
cli_logger.print(
cf.bold(" ray stop"),
)
cli_logger.print(
cf.bold(" {}=true ray start"),
ray_constants.ENABLE_RAY_CLUSTERS_ENV_VAR,
)
cli_logger.newline()
else:
cli_logger.print("To add another node to this Ray cluster, run")
# NOTE(kfstorm): Java driver rely on this line to get the address
# of the cluster. Please be careful when updating this line.
cli_logger.print(
"This Ray runtime only accepts connections from local host."
cf.bold(" ray start --address='{}'"),
bootstrap_address,
)
cli_logger.newline()
if ray_constants.ENABLE_RAY_CLUSTER:
cli_logger.print(
"To accept connections from remote hosts, "
"specify a public ip when starting"
"To connect to this Ray cluster, run `ray.init()` as usual:"
)
with cli_logger.indented():
cli_logger.print("{} ray", cf.magenta("import"))
cli_logger.print(
"ray{}init()",
cf.magenta("."),
)
cli_logger.newline()
cli_logger.print(
"To connect to this Ray instance from outside of "
"the cluster, for example "
)
cli_logger.print(
"the head node: ray start --head --node-ip-address=<public-ip>."
"when connecting to a remote cluster from your laptop, "
"make sure the"
)
cli_logger.newline()
cli_logger.print("Alternatively, use the following Python code:")
with cli_logger.indented():
cli_logger.print("{} ray", cf.magenta("import"))
# Note: In the case of joining an existing cluster using
# `address="auto"`, the _node_ip_address parameter is
# unnecessary.
cli_logger.print(
"ray{}init(address{}{}{})",
cf.magenta("."),
cf.magenta("="),
cf.yellow("'auto'"),
", _node_ip_address{}{}".format(
cf.magenta("="), cf.yellow("'" + node_ip_address + "'")
"dashboard {}is accessible and use the Ray Jobs API. For example:",
f"({dashboard_url}) " if dashboard_url else "",
)
if dashboard_url:
cli_logger.print(
cf.bold(
" RAY_ADDRESS='http://<dashboard URL>:{}' ray job submit "
"--working-dir . "
"-- python my_script.py"
),
ray_params.dashboard_port,
)
if include_node_ip_address
else "",
cli_logger.newline()
cli_logger.print(
"See https://docs.ray.io/en/latest/cluster/running-applications"
"/job-submission/index.html"
)
cli_logger.newline()
cli_logger.print(
"To connect to this Ray runtime from outside of "
"the cluster, for example to"
)
cli_logger.print(
"connect to a remote cluster from your laptop "
"directly, use the following"
)
cli_logger.print("Python code:")
with cli_logger.indented():
cli_logger.print("{} ray", cf.magenta("import"))
cli_logger.print(
"ray{}init(address{}{})",
cf.magenta("."),
cf.magenta("="),
cf.yellow(
"'ray://<head_node_ip_address>:" f"{ray_client_server_port}'"
),
"for more information on connecting to the Ray cluster from "
"a remote client."
)
cli_logger.newline()
cli_logger.print("To see the status of the cluster, use")
cli_logger.print(" {}".format(cf.bold("ray status")))
dashboard_url = node.address_info["webui_url"]
if dashboard_url:
cli_logger.print("To monitor and debug Ray, view the dashboard at ")
cli_logger.newline()
cli_logger.print("To see the status of the cluster, use")
cli_logger.print(" {}".format(cf.bold("ray status")))
if dashboard_url:
cli_logger.print("To monitor and debug Ray, view the dashboard at ")
cli_logger.print(
" {}".format(
cf.bold(dashboard_url),
)
)
cli_logger.newline()
cli_logger.print(
" {}".format(
cf.bold(dashboard_url),
cf.underlined(
"If connection fails, check your "
"firewall settings and "
"network configuration."
)
)
cli_logger.newline()
cli_logger.print(
cf.underlined(
"If connection fails, check your "
"firewall settings and "
"network configuration."
)
)
cli_logger.newline()
cli_logger.newline()
cli_logger.print("To terminate the Ray runtime, run")
cli_logger.print(cf.bold(" ray stop"))
ray_params.gcs_address = bootstrap_address
else:
# Start worker node.
if not ray_constants.ENABLE_RAY_CLUSTER:
cli_logger.abort(
"Multi-node Ray clusters are not supported on Windows and OSX. "
"Restart the Ray cluster with the environment variable `{}=1` "
"to proceed anyway.",
cf.bold(ray_constants.ENABLE_RAY_CLUSTERS_ENV_VAR),
)
raise Exception(
"Multi-node Ray clusters are not supported on Windows and OSX. "
"Restart the Ray cluster with the environment variable "
f"`{ray_constants.ENABLE_RAY_CLUSTERS_ENV_VAR}=1` to proceed "
"anyway.",
)

# Ensure `--address` flag is specified.
if address is None:
Expand Down
4 changes: 4 additions & 0 deletions python/ray/tests/test_basic_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import sys
import time
from pathlib import Path
import os

import numpy as np
import pytest
from unittest import mock

import ray
import ray.cluster_utils
Expand Down Expand Up @@ -163,6 +165,8 @@ def g():
sys.platform not in ["win32", "darwin"],
reason="Only listen on localhost by default on mac and windows.",
)
@mock.patch("ray._private.services.ray_constants.ENABLE_RAY_CLUSTER", False)
@mock.patch.dict(os.environ, {"RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER": "0"})
@pytest.mark.parametrize("start_ray", ["ray_start_regular", "call_ray_start"])
def test_listen_on_localhost(start_ray, request):
"""All ray processes should listen on localhost by default
Expand Down
Loading

0 comments on commit 90f8511

Please sign in to comment.