diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 6b6f546a3b480..fd9e4832c8f57 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -18,6 +18,7 @@ """Processes DAGs.""" from __future__ import annotations +import collections import enum import importlib import inspect @@ -118,7 +119,6 @@ def __init__( async_mode: bool, ): super().__init__() - self._file_path_queue: list[str] = [] self._dag_directory: os.PathLike = dag_directory self._max_runs = max_runs self._processor_timeout = processor_timeout @@ -381,7 +381,7 @@ def __init__( ): super().__init__() self._file_paths: list[str] = [] - self._file_path_queue: list[str] = [] + self._file_path_queue: collections.deque[str] = collections.deque() self._max_runs = max_runs # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn @@ -601,7 +601,7 @@ def _run_parsing_loop(self): self._fetch_callbacks(max_callbacks_per_loop) self._deactivate_stale_dags() DagWarning.purge_inactive_dag_warnings() - self._refresh_dag_dir() + refreshed_dag_dir = self._refresh_dag_dir() self._kill_timed_out_processors() @@ -610,6 +610,8 @@ def _run_parsing_loop(self): if not self._file_path_queue: self.emit_metrics() self.prepare_file_path_queue() + elif refreshed_dag_dir: + self.add_new_file_path_to_queue() self.start_new_processes() @@ -710,10 +712,10 @@ def _add_callback_to_queue(self, request: CallbackRequest): # Remove file paths matching request.full_filepath from self._file_path_queue # Since we are already going to use that filepath to run callback, # there is no need to have same file path again in the queue - self._file_path_queue = [ + self._file_path_queue = collections.deque( file_path for file_path in self._file_path_queue if file_path != request.full_filepath - ] - self._file_path_queue.insert(0, request.full_filepath) + ) + self._file_path_queue.appendleft(request.full_filepath) def _refresh_dag_dir(self): """Refresh file paths from dag dir if we haven't done it for too long.""" @@ -760,6 +762,9 @@ def _refresh_dag_dir(self): DagCode.remove_deleted_code(dag_filelocs) + return True + return False + def _print_stat(self): """Occasionally print out stats about how fast the files are getting processed""" if 0 < self.print_stats_interval < time.monotonic() - self.last_stat_print_time: @@ -932,7 +937,7 @@ def set_file_paths(self, new_file_paths): :return: None """ self._file_paths = new_file_paths - self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths] + self._file_path_queue = collections.deque(x for x in self._file_path_queue if x in new_file_paths) # Stop processors that are working on deleted files filtered_processors = {} for file_path, processor in self._processors.items(): @@ -1010,7 +1015,7 @@ def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_req def start_new_processes(self): """Start more processors if we have enough slots and files to process""" while self._parallelism - len(self._processors) > 0 and self._file_path_queue: - file_path = self._file_path_queue.pop(0) + file_path = self._file_path_queue.popleft() # Stop creating duplicate processor i.e. processor with the same filepath if file_path in self._processors.keys(): continue @@ -1032,6 +1037,16 @@ def start_new_processes(self): self._processors[file_path] = processor self.waitables[processor.waitable_handle] = processor + def add_new_file_path_to_queue(self): + for file_path in self.file_paths: + if file_path not in self._file_stats: + # We found new file after refreshing dir. add to parsing queue at start + self.log.info('Adding new file %s to parsing queue', file_path) + self._file_stats[file_path] = DagFileStat( + num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0 + ) + self._file_path_queue.appendleft(file_path) + def prepare_file_path_queue(self): """Generate more file paths to process. Result are saved in _file_path_queue.""" self._parsing_start_time = time.perf_counter() diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 084056dad919d..7df0e78840cd0 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import collections import logging import multiprocessing import os @@ -219,7 +220,7 @@ def test_start_new_processes_with_same_filepath(self): file_1 = "file_1.py" file_2 = "file_2.py" file_3 = "file_3.py" - manager._file_path_queue = [file_1, file_2, file_3] + manager._file_path_queue = collections.deque([file_1, file_2, file_3]) # Mock that only one processor exists. This processor runs with 'file_1' manager._processors[file_1] = MagicMock() @@ -234,7 +235,7 @@ def test_start_new_processes_with_same_filepath(self): assert file_1 in manager._processors.keys() assert file_2 in manager._processors.keys() - assert [file_3] == manager._file_path_queue + assert collections.deque([file_3]) == manager._file_path_queue def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): manager = DagFileProcessorManager( @@ -300,9 +301,11 @@ def test_file_paths_in_queue_sorted_alphabetically( ) manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] + assert manager._file_path_queue == collections.deque() manager.prepare_file_path_queue() - assert manager._file_path_queue == ["file_1.py", "file_2.py", "file_3.py", "file_4.py"] + assert manager._file_path_queue == collections.deque( + ["file_1.py", "file_2.py", "file_3.py", "file_4.py"] + ) @conf_vars({("scheduler", "file_parsing_sort_mode"): "random_seeded_by_host"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -327,10 +330,10 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( ) manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] + assert manager._file_path_queue == collections.deque() manager.prepare_file_path_queue() - expected_order = dag_files + expected_order = collections.deque(dag_files) random.Random(get_hostname()).shuffle(expected_order) assert manager._file_path_queue == expected_order @@ -365,9 +368,11 @@ def test_file_paths_in_queue_sorted_by_modified_time( ) manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] + assert manager._file_path_queue == collections.deque() manager.prepare_file_path_queue() - assert manager._file_path_queue == ["file_4.py", "file_1.py", "file_3.py", "file_2.py"] + assert manager._file_path_queue == collections.deque( + ["file_4.py", "file_1.py", "file_3.py", "file_2.py"] + ) @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -395,7 +400,41 @@ def test_file_paths_in_queue_excludes_missing_file( manager.set_file_paths(dag_files) manager.prepare_file_path_queue() - assert manager._file_path_queue == ["file_2.py", "file_3.py"] + assert manager._file_path_queue == collections.deque(["file_2.py", "file_3.py"]) + + @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("zipfile.is_zipfile", return_value=True) + @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) + @mock.patch("airflow.utils.file.find_path_from_directory", return_value=True) + @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) + @mock.patch("airflow.utils.file.os.path.getmtime") + def test_add_new_file_to_parsing_queue( + self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + ): + """Check that new file is added to parsing queue""" + dag_files = ["file_1.py", "file_2.py", "file_3.py"] + mock_getmtime.side_effect = [1.0, 2.0, 3.0] + mock_find_path.return_value = dag_files + + manager = DagFileProcessorManager( + dag_directory="directory", + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + manager.set_file_paths(dag_files) + manager.prepare_file_path_queue() + assert manager._file_path_queue == collections.deque(["file_3.py", "file_2.py", "file_1.py"]) + + manager.set_file_paths(dag_files + ["file_4.py"]) + manager.add_new_file_path_to_queue() + assert manager._file_path_queue == collections.deque( + ["file_4.py", "file_3.py", "file_2.py", "file_1.py"] + ) @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -432,10 +471,10 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( } with freeze_time(freezed_base_time): manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] + assert manager._file_path_queue == collections.deque() # File Path Queue will be empty as the "modified time" < "last finish time" manager.prepare_file_path_queue() - assert manager._file_path_queue == [] + assert manager._file_path_queue == collections.deque() # Simulate the DAG modification by using modified_time which is greater # than the last_parse_time but still less than now - min_file_process_interval @@ -443,12 +482,12 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( file_1_new_mtime_ts = file_1_new_mtime.timestamp() with freeze_time(freezed_base_time): manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] + assert manager._file_path_queue == collections.deque() # File Path Queue will be empty as the "modified time" < "last finish time" mock_getmtime.side_effect = [file_1_new_mtime_ts] manager.prepare_file_path_queue() # Check that file is added to the queue even though file was just recently passed - assert manager._file_path_queue == ["file_1.py"] + assert manager._file_path_queue == collections.deque(["file_1.py"]) assert last_finish_time < file_1_new_mtime assert ( manager._file_process_interval @@ -1038,7 +1077,9 @@ def test_callback_queue(self, tmpdir): manager._add_callback_to_queue(dag2_req1) # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last) - assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] + assert manager._file_path_queue == collections.deque( + [dag2_req1.full_filepath, dag1_req1.full_filepath] + ) assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath} assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1] @@ -1047,14 +1088,18 @@ def test_callback_queue(self, tmpdir): manager._add_callback_to_queue(dag1_sla2) # then - since sla2 == sla1, should not have brought dag1 to the fore - assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] + assert manager._file_path_queue == collections.deque( + [dag2_req1.full_filepath, dag1_req1.full_filepath] + ) assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] # when manager._add_callback_to_queue(dag1_req2) # then - non-sla callback should have brought dag1 to the fore - assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath] + assert manager._file_path_queue == collections.deque( + [dag1_req1.full_filepath, dag2_req1.full_filepath] + ) assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2]