Skip to content

Commit

Permalink
[2/gcs] Bootstrap dashboard for gcs ha (ray-project#21179)
Browse files Browse the repository at this point in the history
This is part of gcs ha project. This PR try to bootstrap dashboard with gcs address instead of redis.

Co-authored-by: mwtian <[email protected]>
  • Loading branch information
fishbone and mwtian authored Dec 22, 2021
1 parent 1db0386 commit 09421a4
Show file tree
Hide file tree
Showing 18 changed files with 239 additions and 157 deletions.
5 changes: 4 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,16 @@
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TORCH_VERSION=1.6 ./ci/travis/install-dependencies.sh
- ./dashboard/tests/run_ui_tests.sh
# Enable job related tests after ray.init can accept gcs address
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_env=RAY_gcs_grpc_based_pubsub=1
--test_env=RAY_bootstrap_with_gcs=1
--test_env=RAY_gcs_storage=memory -- //python/ray/dashboard/...
-//python/ray/dashboard:test_dashboard
-//python/ray/dashboard:test_actor
-//python/ray/dashboard:test_job_manager
-//python/ray/dashboard:test_cli_integration
-//python/ray/dashboard:test_http_job_server
-//python/ray/dashboard:test_job_submission
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-post_wheel_build
--test_env=RAY_gcs_grpc_based_pubsub=1
Expand Down
71 changes: 45 additions & 26 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import ray._private.utils
from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsPublisher
from ray._private.gcs_utils import GcsClient, \
get_gcs_address_from_redis
get_gcs_address_from_redis, use_gcs_for_bootstrap
from ray.core.generated import agent_manager_pb2
from ray.core.generated import agent_manager_pb2_grpc
from ray._private.ray_logging import setup_component_logger
Expand Down Expand Up @@ -53,6 +53,7 @@ def __init__(self,
node_ip_address,
redis_address,
dashboard_agent_port,
gcs_address,
redis_password=None,
temp_dir=None,
session_dir=None,
Expand All @@ -67,8 +68,15 @@ def __init__(self,
"""Initialize the DashboardAgent object."""
# Public attributes are accessible for all agent modules.
self.ip = node_ip_address
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password

if use_gcs_for_bootstrap():
assert gcs_address is not None
self.gcs_address = gcs_address
else:
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password
self.aioredis_client = None

self.temp_dir = temp_dir
self.session_dir = session_dir
self.runtime_env_dir = runtime_env_dir
Expand All @@ -94,7 +102,6 @@ def __init__(self,
self.server, f"{grpc_ip}:{self.dashboard_agent_port}")
logger.info("Dashboard agent grpc address: %s:%s", grpc_ip,
self.grpc_port)
self.aioredis_client = None
options = (("grpc.enable_http_proxy", 0), )
self.aiogrpc_raylet_channel = ray._private.utils.init_grpc_channel(
f"{self.ip}:{self.node_manager_port}", options, asynchronous=True)
Expand Down Expand Up @@ -135,17 +142,19 @@ async def _check_parent():
if sys.platform not in ["win32", "cygwin"]:
check_parent_task = create_task(_check_parent())

# Create an aioredis client for all modules.
try:
self.aioredis_client = await dashboard_utils.get_aioredis_client(
self.redis_address, self.redis_password,
dashboard_consts.CONNECT_REDIS_INTERNAL_SECONDS,
dashboard_consts.RETRY_REDIS_CONNECTION_TIMES)
except (socket.gaierror, ConnectionRefusedError):
logger.error(
"Dashboard agent exiting: "
"Failed to connect to redis at %s", self.redis_address)
sys.exit(-1)
if not use_gcs_for_bootstrap():
# Create an aioredis client for all modules.
try:
self.aioredis_client = \
await dashboard_utils.get_aioredis_client(
self.redis_address, self.redis_password,
dashboard_consts.CONNECT_REDIS_INTERNAL_SECONDS,
dashboard_consts.RETRY_REDIS_CONNECTION_TIMES)
except (socket.gaierror, ConnectionRefusedError):
logger.error(
"Dashboard agent exiting: "
"Failed to connect to redis at %s", self.redis_address)
sys.exit(-1)

# Create a http session for all modules.
# aiohttp<4.0.0 uses a 'loop' variable, aiohttp>=4.0.0 doesn't anymore
Expand All @@ -157,10 +166,13 @@ async def _check_parent():

# Start a grpc asyncio server.
await self.server.start()
# TODO: redis-removal bootstrap
gcs_address = await self.aioredis_client.get(
dashboard_consts.REDIS_KEY_GCS_SERVER_ADDRESS)
self.gcs_client = GcsClient(address=gcs_address.decode())

if not use_gcs_for_bootstrap():
gcs_address = await self.aioredis_client.get(
dashboard_consts.GCS_SERVER_ADDRESS)
self.gcs_client = GcsClient(address=gcs_address.decode())
else:
self.gcs_client = GcsClient(address=self.gcs_address)
modules = self._load_modules()

# Http server should be initialized after all modules loaded.
Expand Down Expand Up @@ -358,6 +370,7 @@ async def _check_parent():
args.node_ip_address,
args.redis_address,
args.dashboard_agent_port,
args.gcs_address,
redis_password=args.redis_password,
temp_dir=args.temp_dir,
session_dir=args.session_dir,
Expand Down Expand Up @@ -385,14 +398,20 @@ async def _check_parent():
# Agent is failed to be started many times.
# Push an error to all drivers, so that users can know the
# impact of the issue.
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
redis_client = None
gcs_publisher = None
if args.gcs_address:
gcs_publisher = GcsPublisher(args.gcs_address)
elif gcs_pubsub_enabled():
gcs_publisher = GcsPublisher(
address=get_gcs_address_from_redis(redis_client))
if gcs_pubsub_enabled():
if use_gcs_for_bootstrap():
gcs_publisher = GcsPublisher(args.gcs_address)
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
gcs_publisher = GcsPublisher(
address=get_gcs_address_from_redis(redis_client))
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)

traceback_str = ray._private.utils.format_error_message(
traceback.format_exc())
message = (
Expand Down
4 changes: 2 additions & 2 deletions dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
CONNECT_REDIS_INTERNAL_SECONDS = 2
PURGE_DATA_INTERVAL_SECONDS = 60 * 10
ORGANIZE_DATA_INTERVAL_SECONDS = 2
REDIS_KEY_DASHBOARD_RPC = "dashboard_rpc"
REDIS_KEY_GCS_SERVER_ADDRESS = "GcsServerAddress"
DASHBOARD_RPC_ADDRESS = "dashboard_rpc"
GCS_SERVER_ADDRESS = "GcsServerAddress"
# GCS check alive
GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR = env_integer(
"GCS_CHECK_ALIVE_MAX_COUNT_OF_RPC_ERROR", 10)
Expand Down
37 changes: 28 additions & 9 deletions dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class Dashboard:
host(str): Host address of dashboard aiohttp server.
port(int): Port number of dashboard aiohttp server.
port_retries(int): The retry times to select a valid port.
redis_address(str): GCS address of a Ray cluster
gcs_address(str): GCS address of the cluster
redis_address(str): Redis address of a Ray cluster
redis_password(str): Redis password to access GCS
log_dir(str): Log directory of dashboard.
"""
Expand All @@ -75,13 +76,15 @@ def __init__(self,
host,
port,
port_retries,
gcs_address,
redis_address,
redis_password=None,
log_dir=None):
self.dashboard_head = dashboard_head.DashboardHead(
http_host=host,
http_port=port,
http_port_retries=port_retries,
gcs_address=gcs_address,
redis_address=redis_address,
redis_password=redis_password,
log_dir=log_dir)
Expand Down Expand Up @@ -202,6 +205,13 @@ async def run(self):
help="Specify the path of the temporary directory use by Ray process.")

args = parser.parse_args()

if gcs_utils.use_gcs_for_bootstrap():
args.redis_address = None
args.redis_password = None
else:
args.gcs_address = None

try:
setup_component_logger(
logging_level=args.logging_level,
Expand All @@ -215,6 +225,7 @@ async def run(self):
args.host,
args.port,
args.port_retries,
args.gcs_address,
args.redis_address,
redis_password=args.redis_password,
log_dir=args.log_dir)
Expand All @@ -223,7 +234,8 @@ async def run(self):
# will be hang when the ray.state is connected and the GCS is exit.
# Please refer to: https://github.com/ray-project/ray/issues/16328
service_discovery = PrometheusServiceDiscoveryWriter(
args.redis_address, args.redis_password, args.temp_dir)
args.redis_address, args.redis_password, args.gcs_address,
args.temp_dir)
# Need daemon True to avoid dashboard hangs at exit.
service_discovery.daemon = True
service_discovery.start()
Expand All @@ -242,14 +254,21 @@ async def run(self):
raise e

# Something went wrong, so push an error to all drivers.
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
redis_client = None
gcs_publisher = None
if args.gcs_address:
gcs_publisher = GcsPublisher(address=args.gcs_address)
elif gcs_pubsub_enabled():
gcs_publisher = GcsPublisher(
address=gcs_utils.get_gcs_address_from_redis(redis_client))
if gcs_pubsub_enabled():
if gcs_utils.use_gcs_for_bootstrap():
gcs_publisher = GcsPublisher(args.gcs_address)
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
gcs_publisher = GcsPublisher(
address=gcs_utils.get_gcs_address_from_redis(redis_client))
redis_client = None
else:
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)

ray._private.utils.publish_error_to_driver(
redis_client,
ray_constants.DASHBOARD_DIED_ERROR,
Expand Down
62 changes: 39 additions & 23 deletions dashboard/head.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import ray.experimental.internal_kv as internal_kv
import ray._private.utils
from ray._private.gcs_utils import GcsClient
from ray._private.gcs_utils import GcsClient, use_gcs_for_bootstrap
import ray._private.services
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.utils as dashboard_utils
Expand Down Expand Up @@ -50,7 +50,7 @@ async def get_gcs_address_with_retry(redis_client) -> str:
while True:
try:
gcs_address = (await redis_client.get(
dashboard_consts.REDIS_KEY_GCS_SERVER_ADDRESS)).decode()
dashboard_consts.GCS_SERVER_ADDRESS)).decode()
if not gcs_address:
raise Exception("GCS address not found.")
logger.info("Connect to GCS at %s", gcs_address)
Expand Down Expand Up @@ -104,25 +104,35 @@ async def check_once(self) -> bool:


class DashboardHead:
def __init__(self, http_host, http_port, http_port_retries, redis_address,
redis_password, log_dir):
def __init__(self, http_host, http_port, http_port_retries, gcs_address,
redis_address, redis_password, log_dir):
self.health_check_thread: GCSHealthCheckThread = None
self._gcs_rpc_error_counter = 0
# Public attributes are accessible for all head modules.
# Walkaround for issue: https://github.com/ray-project/ray/issues/7084
self.http_host = "127.0.0.1" if http_host == "localhost" else http_host
self.http_port = http_port
self.http_port_retries = http_port_retries
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password

if use_gcs_for_bootstrap():
assert gcs_address is not None
self.gcs_address = gcs_address
else:
self.redis_address = dashboard_utils.address_tuple(redis_address)
self.redis_password = redis_password

self.log_dir = log_dir
self.aioredis_client = None
self.aiogrpc_gcs_channel = None
self.gcs_error_subscriber = None
self.gcs_log_subscriber = None
self.http_session = None
self.ip = ray.util.get_node_ip_address()
ip, port = redis_address.split(":")
if not use_gcs_for_bootstrap():
ip, port = redis_address.split(":")
else:
ip, port = gcs_address.split(":")

self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), ))
grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0"
self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server(
Expand Down Expand Up @@ -175,18 +185,25 @@ def _load_modules(self):
logger.info("Loaded %d modules.", len(modules))
return modules

async def run(self):
async def get_gcs_address(self):
# Create an aioredis client for all modules.
try:
self.aioredis_client = await dashboard_utils.get_aioredis_client(
self.redis_address, self.redis_password,
dashboard_consts.CONNECT_REDIS_INTERNAL_SECONDS,
dashboard_consts.RETRY_REDIS_CONNECTION_TIMES)
except (socket.gaierror, ConnectionError):
logger.error(
"Dashboard head exiting: "
"Failed to connect to redis at %s", self.redis_address)
sys.exit(-1)
if use_gcs_for_bootstrap():
return self.gcs_address
else:
try:
self.aioredis_client = \
await dashboard_utils.get_aioredis_client(
self.redis_address, self.redis_password,
dashboard_consts.CONNECT_REDIS_INTERNAL_SECONDS,
dashboard_consts.RETRY_REDIS_CONNECTION_TIMES)
except (socket.gaierror, ConnectionError):
logger.error(
"Dashboard head exiting: "
"Failed to connect to redis at %s", self.redis_address)
sys.exit(-1)
return await get_gcs_address_with_retry(self.aioredis_client)

async def run(self):

# Create a http session for all modules.
# aiohttp<4.0.0 uses a 'loop' variable, aiohttp>=4.0.0 doesn't anymore
Expand All @@ -196,9 +213,8 @@ async def run(self):
else:
self.http_session = aiohttp.ClientSession()

# Waiting for GCS is ready.
# TODO: redis-removal bootstrap
gcs_address = await get_gcs_address_with_retry(self.aioredis_client)
gcs_address = await self.get_gcs_address()

# Dashboard will handle connection failure automatically
self.gcs_client = GcsClient(
address=gcs_address, nums_reconnect_retry=0)
Expand Down Expand Up @@ -259,11 +275,11 @@ async def _async_notify():
# TODO: Use async version if performance is an issue
# Write the dashboard head port to gcs kv.
internal_kv._internal_kv_put(
ray_constants.REDIS_KEY_DASHBOARD,
ray_constants.DASHBOARD_ADDRESS,
f"{http_host}:{http_port}",
namespace=ray_constants.KV_NAMESPACE_DASHBOARD)
internal_kv._internal_kv_put(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC,
dashboard_consts.DASHBOARD_RPC_ADDRESS,
f"{self.ip}:{self.grpc_port}",
namespace=ray_constants.KV_NAMESPACE_DASHBOARD)

Expand Down
7 changes: 3 additions & 4 deletions dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,10 @@ def process_actor_data_from_pubsub(actor_id, actor_table_data):
job_actors[actor_id] = actor_table_data
DataSource.job_actors[job_id] = job_actors

aioredis_client = self._dashboard_head.aioredis_client

# Receive actors from channel.
if gcs_pubsub_enabled():
gcs_addr = await aioredis_client.get("GcsServerAddress")
subscriber = GcsAioActorSubscriber(address=gcs_addr.decode())
gcs_addr = await self._dashboard_head.get_gcs_address()
subscriber = GcsAioActorSubscriber(address=gcs_addr)
await subscriber.subscribe()

while True:
Expand All @@ -153,6 +151,7 @@ def process_actor_data_from_pubsub(actor_id, actor_table_data):
logger.exception("Error processing actor info from GCS.")

else:
aioredis_client = self._dashboard_head.aioredis_client
receiver = Receiver()
key = "{}:*".format(actor_consts.ACTOR_CHANNEL)
pattern = receiver.pattern(key)
Expand Down
2 changes: 1 addition & 1 deletion dashboard/modules/event/event_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def _connect_to_dashboard(self):
try:
# TODO: Use async version if performance is an issue
dashboard_rpc_address = internal_kv._internal_kv_get(
dashboard_consts.REDIS_KEY_DASHBOARD_RPC,
dashboard_consts.DASHBOARD_RPC_ADDRESS,
namespace=ray_constants.KV_NAMESPACE_DASHBOARD)
if dashboard_rpc_address:
logger.info("Report events to %s", dashboard_rpc_address)
Expand Down
Loading

0 comments on commit 09421a4

Please sign in to comment.