Skip to content

Commit

Permalink
[core] Deduplicate repetitive logs across the cluster (ray-project#33403
Browse files Browse the repository at this point in the history
)

User code and libraries often emit repetitive log messages, which is a pain point for large-scale cluster runs. Add a mechanism to automatically group together and deduplicate these messages.
  • Loading branch information
ericl authored Mar 28, 2023
1 parent d5b9727 commit 3539b61
Show file tree
Hide file tree
Showing 9 changed files with 417 additions and 6 deletions.
32 changes: 32 additions & 0 deletions doc/source/ray-observability/ray-logging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ Actor log messages look like the following by default.
(MyActor pid=480956) actor log message
Log deduplication
~~~~~~~~~~~~~~~~~

By default, Ray will deduplicate logs that appear redundantly across multiple processes. The first instance of each log message will always be immediately printed. However, subsequent log messages of the same pattern (ignoring words with numeric components) will be buffered for up to five seconds and printed in batch. For example, for the following code snippet:

.. code-block:: python
import ray
import random
@ray.remote
def task():
print("Hello there, I am a task", random.random())
ray.get([task.remote() for _ in range(100)])
The output will be as follows:

.. code-block:: bash
2023-03-27 15:08:34,195 INFO worker.py:1603 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(task pid=534172) Hello there, I am a task 0.20583517821231412
(task pid=534174) Hello there, I am a task 0.17536720316370757 [repeated 99x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication)
This feature is especially useful when importing libraries such as `tensorflow` or `numpy`, which may emit many verbose warning messages when imported. You can configure this feature as follows:

1. Set ``RAY_DEDUP_LOGS=0`` to disable this feature entirely.
2. Set ``RAY_DEDUP_LOGS_AGG_WINDOW_S=<int>`` to change the agggregation window.
3. Set ``RAY_DEDUP_LOGS_ALLOW_REGEX=<string>`` to specify log messages to never deduplicate.
4. Set ``RAY_DEDUP_LOGS_SKIP_REGEX=<string>`` to specify log messages to skip printing.


Disabling logging to the driver
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
16 changes: 16 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,22 @@ def env_set_by_user(key):
MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_MONITOR}.log"
LOG_MONITOR_LOG_FILE_NAME = f"{PROCESS_TYPE_LOG_MONITOR}.log"

# Enable log deduplication.
RAY_DEDUP_LOGS = env_bool("RAY_DEDUP_LOGS", True)

# How many seconds of messages to buffer for log deduplication.
RAY_DEDUP_LOGS_AGG_WINDOW_S = env_integer("RAY_DEDUP_LOGS_AGG_WINDOW_S", 5)

# Regex for log messages to never deduplicate, or None. This takes precedence over
# the skip regex below. A default pattern is set for testing.
TESTING_NEVER_DEDUP_TOKEN = "__ray_testing_never_deduplicate__"
RAY_DEDUP_LOGS_ALLOW_REGEX = os.environ.get(
"RAY_DEDUP_LOGS_ALLOW_REGEX", TESTING_NEVER_DEDUP_TOKEN
)

# Regex for log messages to always skip / suppress, or None.
RAY_DEDUP_LOGS_SKIP_REGEX = os.environ.get("RAY_DEDUP_LOGS_SKIP_REGEX")

WORKER_PROCESS_TYPE_IDLE_WORKER = "ray::IDLE"
WORKER_PROCESS_TYPE_SPILL_WORKER_NAME = "SpillWorker"
WORKER_PROCESS_TYPE_RESTORE_WORKER_NAME = "RestoreWorker"
Expand Down
184 changes: 183 additions & 1 deletion python/ray/_private/ray_logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import colorama
from dataclasses import dataclass
import logging
import os
import re
import sys
import threading
from logging.handlers import RotatingFileHandler
from typing import Callable
import time
from typing import Callable, Dict, List, Set, Tuple, Any, Optional

import ray
from ray.experimental.tqdm_ray import RAY_TQDM_MAGIC
from ray._private.ray_constants import (
RAY_DEDUP_LOGS,
RAY_DEDUP_LOGS_AGG_WINDOW_S,
RAY_DEDUP_LOGS_ALLOW_REGEX,
RAY_DEDUP_LOGS_SKIP_REGEX,
)
from ray._private.utils import binary_to_hex
from ray.util.debug import log_once

_default_handler = None

Expand Down Expand Up @@ -253,3 +265,173 @@ def emit(self, data):


global_worker_stdstream_dispatcher = WorkerStandardStreamDispatcher()


# Regex for canonicalizing log lines.
NUMBERS = re.compile(r"(\d+|0x[0-9a-fA-F]+)")

# Batch of log lines including ip, pid, lines, etc.
LogBatch = Dict[str, Any]


def _canonicalise_log_line(line):
# Remove words containing numbers or hex, since those tend to differ between
# workers.
return " ".join(x for x in line.split() if not NUMBERS.search(x))


@dataclass
class DedupState:
# Timestamp of the earliest log message seen of this pattern.
timestamp: int

# The number of un-printed occurrances for this pattern.
count: int

# Latest instance of this log pattern.
line: int

# Latest metadata dict for this log pattern, not including the lines field.
metadata: LogBatch

# Set of (ip, pid) sources which have emitted this pattern.
sources: Set[Tuple[str, int]]

# The string that should be printed to stdout.
def formatted(self) -> str:
return self.line + _color(
f" [repeated {self.count}x across cluster]" + _warn_once()
)


class LogDeduplicator:
def __init__(
self,
agg_window_s: int,
allow_re: Optional[str],
skip_re: Optional[str],
*,
_timesource=None,
):
self.agg_window_s = agg_window_s
if allow_re:
self.allow_re = re.compile(allow_re)
else:
self.allow_re = None
if skip_re:
self.skip_re = re.compile(skip_re)
else:
self.skip_re = None
# Buffer of up to RAY_DEDUP_LOGS_AGG_WINDOW_S recent log patterns.
# This buffer is cleared if the pattern isn't seen within the window.
self.recent: Dict[str, DedupState] = {}
self.timesource = _timesource or (lambda: time.time())

def deduplicate(self, batch: LogBatch) -> List[LogBatch]:
"""Rewrite a batch of lines to reduce duplicate log messages.
Args:
batch: The batch of lines from a single source.
Returns:
List of batches from this and possibly other previous sources to print.
"""
if not RAY_DEDUP_LOGS:
return [batch]

now = self.timesource()
metadata = batch.copy()
del metadata["lines"]
source = (metadata.get("ip"), metadata.get("pid"))
output: List[LogBatch] = [dict(**metadata, lines=[])]

# Decide which lines to emit from the input batch. Put the outputs in the
# first output log batch (output[0]).
for line in batch["lines"]:
if RAY_TQDM_MAGIC in line or (self.allow_re and self.allow_re.search(line)):
output[0]["lines"].append(line)
continue
elif self.skip_re and self.skip_re.search(line):
continue
dedup_key = _canonicalise_log_line(line)
if dedup_key in self.recent:
sources = self.recent[dedup_key].sources
sources.add(source)
if len(sources) > 1:
state = self.recent[dedup_key]
self.recent[dedup_key] = DedupState(
state.timestamp,
state.count + 1,
line,
metadata,
sources,
)
else:
# Don't dedup messages from the same source, just print.
output[0]["lines"].append(line)
else:
self.recent[dedup_key] = DedupState(now, 0, line, metadata, {source})
output[0]["lines"].append(line)

# Flush patterns from the buffer that are older than the aggregation window.
while self.recent:
if now - next(iter(self.recent.values())).timestamp < self.agg_window_s:
break
dedup_key = next(iter(self.recent))
state = self.recent.pop(dedup_key)
# we already logged an instance of this line immediately when received,
# so don't log for count == 0
if state.count > 1:
# (Actor pid=xxxx) [repeated 2x across cluster] ...
output.append(dict(**state.metadata, lines=[state.formatted()]))
# Continue aggregating for this key but reset timestamp and count.
state.timestamp = now
state.count = 0
self.recent[dedup_key] = state
elif state.count > 0:
# Aggregation wasn't fruitful, print the line and stop aggregating.
output.append(dict(state.metadata, lines=[line]))

return output

def flush(self) -> List[dict]:
"""Return all buffered log messages and clear the buffer.
Returns:
List of log batches to print.
"""
output = []
for state in self.recent.values():
if state.count > 1:
output.append(
dict(
state.metadata,
lines=[state.formatted()],
)
)
elif state.count > 0:
output.append(dict(state.metadata, **{"lines": [state.line]}))
self.recent.clear()
return output


def _warn_once() -> str:
if log_once("log_dedup_warning"):
return (
" (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to "
"disable log deduplication)"
)
else:
return ""


def _color(msg: str) -> str:
return "{}{}{}".format(colorama.Fore.GREEN, msg, colorama.Style.RESET_ALL)


stdout_deduplicator = LogDeduplicator(
RAY_DEDUP_LOGS_AGG_WINDOW_S, RAY_DEDUP_LOGS_ALLOW_REGEX, RAY_DEDUP_LOGS_SKIP_REGEX
)
stderr_deduplicator = LogDeduplicator(
RAY_DEDUP_LOGS_AGG_WINDOW_S, RAY_DEDUP_LOGS_ALLOW_REGEX, RAY_DEDUP_LOGS_SKIP_REGEX
)
25 changes: 23 additions & 2 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
from ray._private.inspect_util import is_cython
from ray._private.ray_logging import (
global_worker_stdstream_dispatcher,
stdout_deduplicator,
stderr_deduplicator,
setup_logger,
)
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
Expand Down Expand Up @@ -1743,8 +1745,23 @@ def custom_excepthook(type, value, tb):


def print_to_stdstream(data):
print_file = sys.stderr if data["is_err"] else sys.stdout
print_worker_logs(data, print_file)
should_dedup = data.get("pid") not in ["autoscaler", "raylet"]

if data["is_err"]:
if should_dedup:
batches = stderr_deduplicator.deduplicate(data)
else:
batches = [data]
sink = sys.stderr
else:
if should_dedup:
batches = stdout_deduplicator.deduplicate(data)
else:
batches = [data]
sink = sys.stdout

for batch in batches:
print_worker_logs(batch, sink)


# Start time of this process, used for relative time logs.
Expand Down Expand Up @@ -2293,6 +2310,10 @@ def disconnect(exiting_interpreter=False):

worker._session_index += 1

for leftover in stdout_deduplicator.flush():
print_worker_logs(leftover, sys.stdout)
for leftover in stderr_deduplicator.flush():
print_worker_logs(leftover, sys.stderr)
global_worker_stdstream_dispatcher.remove_handler("ray_print_logs")

worker.node = None # Disconnect the worker from the node.
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_autoscaler_fake_scaledown.py", # Temporarily owned by core.
"test_log_dedup.py",
"test_logging.py",
"test_memory_scheduling.py",
"test_memory_pressure.py",
Expand Down
Loading

0 comments on commit 3539b61

Please sign in to comment.