Skip to content

Commit

Permalink
[3/gcs] Bootstrap log monitor and monitor from gcs (ray-project#21194)
Browse files Browse the repository at this point in the history
This is part of redis removal. This PR enable log monitor and monitor to bootstrap from gcs

Co-authored-by: mwtian <[email protected]>
Co-authored-by: Mingwei Tian <[email protected]>
  • Loading branch information
3 people authored Dec 22, 2021
1 parent cfe0897 commit 0c786b1
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 38 deletions.
34 changes: 23 additions & 11 deletions python/ray/_private/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,24 @@ class LogMonitor:
false otherwise.
"""

def __init__(self, logs_dir, redis_address, redis_password=None):
def __init__(self,
logs_dir,
redis_address,
gcs_address,
redis_password=None):
"""Initialize the log monitor object."""
self.ip = services.get_node_ip_address()
self.logs_dir = logs_dir
self.redis_client = ray._private.services.create_redis_client(
redis_address, password=redis_password)
if gcs_utils.use_gcs_for_bootstrap():
self.redis_client = None
else:
self.redis_client = ray._private.services.create_redis_client(
redis_address, password=redis_password)
gcs_address = gcs_utils.get_gcs_address_from_redis(
self.redis_client)
self.publisher = None
if gcs_pubsub.gcs_pubsub_enabled():
gcs_addr = gcs_utils.get_gcs_address_from_redis(self.redis_client)
self.publisher = gcs_pubsub.GcsPublisher(address=gcs_addr)
self.publisher = gcs_pubsub.GcsPublisher(address=gcs_address)
self.log_filenames = set()
self.open_file_infos = []
self.closed_file_infos = []
Expand Down Expand Up @@ -443,7 +451,10 @@ def run(self):
backup_count=args.logging_rotate_backup_count)

log_monitor = LogMonitor(
args.logs_dir, args.redis_address, redis_password=args.redis_password)
args.logs_dir,
args.redis_address,
args.gcs_address,
redis_password=args.redis_password)

try:
log_monitor.run()
Expand All @@ -452,11 +463,12 @@ def run(self):
redis_client = ray._private.services.create_redis_client(
args.redis_address, password=args.redis_password)
gcs_publisher = None
if args.gcs_address:
gcs_publisher = GcsPublisher(address=args.gcs_address)
elif gcs_pubsub_enabled():
gcs_publisher = GcsPublisher(
address=gcs_utils.get_gcs_address_from_redis(redis_client))
if gcs_pubsub_enabled():
if gcs_utils.use_gcs_for_bootstrap():
gcs_publisher = GcsPublisher(address=args.gcs_address)
else:
gcs_publisher = GcsPublisher(
address=gcs_utils.get_gcs_address_from_redis(redis_client))
traceback_str = ray._private.utils.format_error_message(
traceback.format_exc())
message = (f"The log monitor on node {platform.node()} "
Expand Down
14 changes: 11 additions & 3 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,7 @@ def _start_redis_instance(executable,


def start_log_monitor(redis_address,
gcs_address,
logs_dir,
stdout_file=None,
stderr_file=None,
Expand Down Expand Up @@ -1215,7 +1216,8 @@ def start_log_monitor(redis_address,
sys.executable, "-u", log_monitor_filepath,
f"--redis-address={redis_address}", f"--logs-dir={logs_dir}",
f"--logging-rotate-bytes={max_bytes}",
f"--logging-rotate-backup-count={backup_count}"
f"--logging-rotate-backup-count={backup_count}",
f"--gcs-address={gcs_address}"
]
if redis_password:
command += ["--redis-password", redis_password]
Expand Down Expand Up @@ -1604,6 +1606,7 @@ def start_raylet(redis_address,
f"--logging-rotate-backup-count={backup_count}",
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
]

if redis_password:
start_worker_command += [f"--redis-password={redis_password}"]

Expand Down Expand Up @@ -1960,6 +1963,7 @@ def start_worker(node_ip_address,


def start_monitor(redis_address,
gcs_address,
logs_dir,
stdout_file=None,
stderr_file=None,
Expand Down Expand Up @@ -1992,10 +1996,14 @@ def start_monitor(redis_address,
"""
monitor_path = os.path.join(RAY_PATH, AUTOSCALER_PRIVATE_DIR, "monitor.py")
command = [
sys.executable, "-u", monitor_path, f"--logs-dir={logs_dir}",
sys.executable,
"-u",
monitor_path,
f"--logs-dir={logs_dir}",
f"--redis-address={redis_address}",
f"--logging-rotate-bytes={max_bytes}",
f"--logging-rotate-backup-count={backup_count}"
f"--logging-rotate-backup-count={backup_count}",
f"--gcs-address={gcs_address}",
]
if autoscaling_config:
command.append("--autoscaling-config=" + str(autoscaling_config))
Expand Down
68 changes: 47 additions & 21 deletions python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,24 @@ class Monitor:
"""

def __init__(self,
redis_address,
address,
autoscaling_config,
redis_password=None,
prefix_cluster_info=False,
monitor_ip=None,
stop_event: Optional[Event] = None):
# Initialize the Redis clients.
self.redis = ray._private.services.create_redis_client(
redis_address, password=redis_password)
(ip, port) = redis_address.split(":")
# Initialize the gcs stub for getting all node resource usage.
gcs_address = self.redis.get("GcsServerAddress").decode("utf-8")
if not use_gcs_for_bootstrap():
# Initialize the Redis clients.
redis_address = address
self.redis = ray._private.services.create_redis_client(
redis_address, password=redis_password)
(ip, port) = address.split(":")
# Initialize the gcs stub for getting all node resource usage.
gcs_address = self.redis.get("GcsServerAddress").decode("utf-8")
else:
gcs_address = address
redis_address = None

options = (("grpc.enable_http_proxy", 0), )
gcs_channel = ray._private.utils.init_grpc_channel(
gcs_address, options)
Expand All @@ -156,19 +162,35 @@ def __init__(self,

# Set the redis client and mode so _internal_kv works for autoscaler.
worker = ray.worker.global_worker
worker.redis_client = self.redis
gcs_client = GcsClient.create_from_redis(self.redis)
if use_gcs_for_bootstrap():
gcs_client = GcsClient(address=gcs_address)
else:
worker.redis_client = self.redis
gcs_client = GcsClient.create_from_redis(self.redis)

if monitor_ip:
monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}"
if use_gcs_for_bootstrap():
gcs_client.internal_kv_put(b"AutoscalerMetricsAddress",
monitor_addr.encode(), True, None)
else:
self.redis.set("AutoscalerMetricsAddress", monitor_addr)
_initialize_internal_kv(gcs_client)
if monitor_ip:
monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}"
self.redis.set("AutoscalerMetricsAddress", monitor_addr)
if use_gcs_for_bootstrap():
gcs_client.internal_kv_put(b"AutoscalerMetricsAddress",
monitor_addr.encode(), True, None)
else:
self.redis.set("AutoscalerMetricsAddress", monitor_addr)
worker.mode = 0
head_node_ip = redis_address.split(":")[0]
self.redis_address = redis_address
self.redis_password = redis_password
if use_gcs_for_bootstrap():
head_node_ip = gcs_address.split(":")[0]
else:
head_node_ip = redis_address.split(":")[0]
self.redis_address = redis_address
self.redis_password = redis_password

self.load_metrics = LoadMetrics()
self.last_avail_resources = None
self.event_summarizer = EventSummarizer()
Expand Down Expand Up @@ -407,14 +429,18 @@ def _handle_failure(self, error):
message = f"The autoscaler failed with the following error:\n{error}"
if _internal_kv_initialized():
_internal_kv_put(DEBUG_AUTOSCALING_ERROR, message, overwrite=True)
redis_client = ray._private.services.create_redis_client(
self.redis_address, password=self.redis_password)
if not use_gcs_for_bootstrap():
redis_client = ray._private.services.create_redis_client(
self.redis_address, password=self.redis_password)
else:
redis_client = None
gcs_publisher = None
if args.gcs_address:
gcs_publisher = GcsPublisher(address=args.gcs_address)
elif gcs_pubsub_enabled():
gcs_publisher = GcsPublisher(
address=get_gcs_address_from_redis(redis_client))
if gcs_pubsub_enabled():
if use_gcs_for_bootstrap():
gcs_publisher = GcsPublisher(address=args.gcs_address)
else:
gcs_publisher = GcsPublisher(
address=get_gcs_address_from_redis(redis_client))
from ray._private.utils import publish_error_to_driver
publish_error_to_driver(
ray_constants.MONITOR_DIED_ERROR,
Expand Down Expand Up @@ -535,7 +561,7 @@ def run(self):
autoscaling_config = None

monitor = Monitor(
args.redis_address,
args.gcs_address if use_gcs_for_bootstrap() else args.redis_address,
autoscaling_config,
redis_password=args.redis_password,
monitor_ip=args.monitor_ip)
Expand Down
4 changes: 4 additions & 0 deletions python/ray/cluster_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ def __init__(self,
if connect:
self.connect()

@property
def gcs_address(self):
return self.head_node.gcs_address

@property
def address(self):
return self.redis_address
Expand Down
2 changes: 2 additions & 0 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ def start_log_monitor(self):
"""Start the log monitor."""
process_info = ray._private.services.start_log_monitor(
self.redis_address,
self.gcs_address,
self._logs_dir,
stdout_file=subprocess.DEVNULL,
stderr_file=subprocess.DEVNULL,
Expand Down Expand Up @@ -872,6 +873,7 @@ def start_monitor(self):
"monitor", unique=True)
process_info = ray._private.services.start_monitor(
self._redis_address,
self.gcs_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
Expand Down
4 changes: 2 additions & 2 deletions python/ray/ray_operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ def start_monitor(self) -> None:
"""Runs the autoscaling monitor."""
ray_head_pod_ip = commands.get_head_node_ip(self.config_path)
port = operator_utils.infer_head_port(self.config)
redis_address = services.address(ray_head_pod_ip, port)
address = services.address(ray_head_pod_ip, port)
mtr = monitor.Monitor(
redis_address=redis_address,
address,
autoscaling_config=self.config_path,
redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
prefix_cluster_info=True,
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tests/test_multi_node_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ray.autoscaler.sdk import request_resources
from ray.autoscaler._private.monitor import Monitor
from ray.cluster_utils import Cluster
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray._private.test_utils import (generate_system_config_map,
wait_for_condition, SignalActor)

Expand Down Expand Up @@ -185,7 +186,10 @@ def test_heartbeats_single(ray_start_cluster_head):
Test proper metrics.
"""
cluster = ray_start_cluster_head
monitor = setup_monitor(cluster.address)
if use_gcs_for_bootstrap():
monitor = setup_monitor(cluster.gcs_address)
else:
monitor = setup_monitor(cluster.address)
total_cpus = ray.state.cluster_resources()["CPU"]
verify_load_metrics(monitor, ({"CPU": 0.0}, {"CPU": total_cpus}))

Expand Down

0 comments on commit 0c786b1

Please sign in to comment.