From cefefd5ec94b000ae16ccb2f89d0ec15c5915ff0 Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Tue, 4 Apr 2023 19:00:38 -0700 Subject: [PATCH] [core][dashboard] Feature flag task logs recording (#34056) This should address a few issues introduced by the original task log recording features: [core] perf regression: 1_1_actor_calls_concurrent #33924 [core] perf regression: 1_1_actor_calls_async #33949 [Tests] Fix two skipped Windows test for test_task_event_2.py #33738 The roocasue with the regressions are: With #32943, we are recording log file offsets before and after executing a task, which calls tell() on the file descriptor object for each worker. The cost of that shows up when there are concurrent execution of tasks on a single worker. I am turning this off by default for this release since the subsequent PRs are not merged yet. We will need to tackle or resolve the regression once we turn this feature on when we merge subsequent PRs. One idea is to make this "finding-out-offset-procedure" async, e.g. we try to locate the exact task id's log offset when we querying the task logs at querying time. --- python/ray/_private/ray_constants.py | 2 ++ python/ray/_private/worker.py | 7 +++++++ python/ray/tests/test_task_events_2.py | 11 ++++++++++- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 037a925d7321..a73aaad86d70 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -422,3 +422,5 @@ def gcs_actor_scheduling_enabled(): "dashboard_agent_listen_port", "gcs_server_port", # the `port` option for gcs port. } + +RAY_ENABLE_RECORD_TASK_LOGGING = env_bool("RAY_ENABLE_RECORD_TASK_LOGGING", False) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 13e2162aa389..f956bc0c2f12 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -456,6 +456,7 @@ def __init__(self): self.ray_debugger_external = False self._load_code_from_local = False # Opened file descriptor to stdout/stderr for this python worker. + self._enable_record_task_log = ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING self._out_file = None self._err_file = None # Create the lock here because the serializer will use it before @@ -539,6 +540,9 @@ def set_out_file(self, out_file=Optional[IO[AnyStr]]) -> None: def record_task_log_start(self): """Record the task log info when task starts executing""" + if not self._enable_record_task_log: + return + self.core_worker.record_task_log_start( self.get_out_file_path(), self.get_err_file_path(), @@ -548,6 +552,9 @@ def record_task_log_start(self): def record_task_log_end(self): """Record the task log info when task finishes executing""" + if not self._enable_record_task_log: + return + self.core_worker.record_task_log_end( self.get_current_out_offset(), self.get_current_err_offset() ) diff --git a/python/ray/tests/test_task_events_2.py b/python/ray/tests/test_task_events_2.py index d0efcfac2361..f5aecac9c4bd 100644 --- a/python/ray/tests/test_task_events_2.py +++ b/python/ray/tests/test_task_events_2.py @@ -537,7 +537,8 @@ def _read_file(filepath, start, end): @pytest.mark.skipif( - sys.platform == "win32", reason="Failing on Windows. we should fix it asap" + not ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING, + reason="Skipping if not recording task logs offsets.", ) def test_task_logs_info_basic(shutdown_only): """Test tasks (normal tasks/actor tasks) execution logging @@ -594,6 +595,10 @@ def verify(): wait_for_condition(verify) +@pytest.mark.skipif( + not ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING, + reason="Skipping if not recording task logs offsets.", +) def test_task_logs_info_disabled(shutdown_only, monkeypatch): """Test when redirect disabled, no task log info is available due to missing log file @@ -619,6 +624,10 @@ def verify(): wait_for_condition(verify) +@pytest.mark.skipif( + not ray_constants.RAY_ENABLE_RECORD_TASK_LOGGING, + reason="Skipping if not recording task logs offsets.", +) def test_task_logs_info_running_task(shutdown_only): ray.init(num_cpus=1)