Skip to content

Commit

Permalink
Add new files to parsing queue on every loop of dag processsing (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
pavansharma36 authored Nov 14, 2022
1 parent fb9e5e6 commit 65b78b7
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 24 deletions.
31 changes: 23 additions & 8 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Processes DAGs."""
from __future__ import annotations

import collections
import enum
import importlib
import inspect
Expand Down Expand Up @@ -118,7 +119,6 @@ def __init__(
async_mode: bool,
):
super().__init__()
self._file_path_queue: list[str] = []
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
Expand Down Expand Up @@ -381,7 +381,7 @@ def __init__(
):
super().__init__()
self._file_paths: list[str] = []
self._file_path_queue: list[str] = []
self._file_path_queue: collections.deque[str] = collections.deque()
self._max_runs = max_runs
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
Expand Down Expand Up @@ -601,7 +601,7 @@ def _run_parsing_loop(self):
self._fetch_callbacks(max_callbacks_per_loop)
self._deactivate_stale_dags()
DagWarning.purge_inactive_dag_warnings()
self._refresh_dag_dir()
refreshed_dag_dir = self._refresh_dag_dir()

self._kill_timed_out_processors()

Expand All @@ -610,6 +610,8 @@ def _run_parsing_loop(self):
if not self._file_path_queue:
self.emit_metrics()
self.prepare_file_path_queue()
elif refreshed_dag_dir:
self.add_new_file_path_to_queue()

self.start_new_processes()

Expand Down Expand Up @@ -710,10 +712,10 @@ def _add_callback_to_queue(self, request: CallbackRequest):
# Remove file paths matching request.full_filepath from self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
self._file_path_queue = [
self._file_path_queue = collections.deque(
file_path for file_path in self._file_path_queue if file_path != request.full_filepath
]
self._file_path_queue.insert(0, request.full_filepath)
)
self._file_path_queue.appendleft(request.full_filepath)

def _refresh_dag_dir(self):
"""Refresh file paths from dag dir if we haven't done it for too long."""
Expand Down Expand Up @@ -760,6 +762,9 @@ def _refresh_dag_dir(self):

DagCode.remove_deleted_code(dag_filelocs)

return True
return False

def _print_stat(self):
"""Occasionally print out stats about how fast the files are getting processed"""
if 0 < self.print_stats_interval < time.monotonic() - self.last_stat_print_time:
Expand Down Expand Up @@ -932,7 +937,7 @@ def set_file_paths(self, new_file_paths):
:return: None
"""
self._file_paths = new_file_paths
self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths]
self._file_path_queue = collections.deque(x for x in self._file_path_queue if x in new_file_paths)
# Stop processors that are working on deleted files
filtered_processors = {}
for file_path, processor in self._processors.items():
Expand Down Expand Up @@ -1010,7 +1015,7 @@ def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_req
def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
file_path = self._file_path_queue.popleft()
# Stop creating duplicate processor i.e. processor with the same filepath
if file_path in self._processors.keys():
continue
Expand All @@ -1032,6 +1037,16 @@ def start_new_processes(self):
self._processors[file_path] = processor
self.waitables[processor.waitable_handle] = processor

def add_new_file_path_to_queue(self):
for file_path in self.file_paths:
if file_path not in self._file_stats:
# We found new file after refreshing dir. add to parsing queue at start
self.log.info('Adding new file %s to parsing queue', file_path)
self._file_stats[file_path] = DagFileStat(
num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
)
self._file_path_queue.appendleft(file_path)

def prepare_file_path_queue(self):
"""Generate more file paths to process. Result are saved in _file_path_queue."""
self._parsing_start_time = time.perf_counter()
Expand Down
77 changes: 61 additions & 16 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

import collections
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -219,7 +220,7 @@ def test_start_new_processes_with_same_filepath(self):
file_1 = "file_1.py"
file_2 = "file_2.py"
file_3 = "file_3.py"
manager._file_path_queue = [file_1, file_2, file_3]
manager._file_path_queue = collections.deque([file_1, file_2, file_3])

# Mock that only one processor exists. This processor runs with 'file_1'
manager._processors[file_1] = MagicMock()
Expand All @@ -234,7 +235,7 @@ def test_start_new_processes_with_same_filepath(self):

assert file_1 in manager._processors.keys()
assert file_2 in manager._processors.keys()
assert [file_3] == manager._file_path_queue
assert collections.deque([file_3]) == manager._file_path_queue

def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
manager = DagFileProcessorManager(
Expand Down Expand Up @@ -300,9 +301,11 @@ def test_file_paths_in_queue_sorted_alphabetically(
)

manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
assert manager._file_path_queue == collections.deque()
manager.prepare_file_path_queue()
assert manager._file_path_queue == ["file_1.py", "file_2.py", "file_3.py", "file_4.py"]
assert manager._file_path_queue == collections.deque(
["file_1.py", "file_2.py", "file_3.py", "file_4.py"]
)

@conf_vars({("scheduler", "file_parsing_sort_mode"): "random_seeded_by_host"})
@mock.patch("zipfile.is_zipfile", return_value=True)
Expand All @@ -327,10 +330,10 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host(
)

manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
assert manager._file_path_queue == collections.deque()
manager.prepare_file_path_queue()

expected_order = dag_files
expected_order = collections.deque(dag_files)
random.Random(get_hostname()).shuffle(expected_order)
assert manager._file_path_queue == expected_order

Expand Down Expand Up @@ -365,9 +368,11 @@ def test_file_paths_in_queue_sorted_by_modified_time(
)

manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
assert manager._file_path_queue == collections.deque()
manager.prepare_file_path_queue()
assert manager._file_path_queue == ["file_4.py", "file_1.py", "file_3.py", "file_2.py"]
assert manager._file_path_queue == collections.deque(
["file_4.py", "file_1.py", "file_3.py", "file_2.py"]
)

@conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("zipfile.is_zipfile", return_value=True)
Expand Down Expand Up @@ -395,7 +400,41 @@ def test_file_paths_in_queue_excludes_missing_file(

manager.set_file_paths(dag_files)
manager.prepare_file_path_queue()
assert manager._file_path_queue == ["file_2.py", "file_3.py"]
assert manager._file_path_queue == collections.deque(["file_2.py", "file_3.py"])

@conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory", return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_add_new_file_to_parsing_queue(
self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
):
"""Check that new file is added to parsing queue"""
dag_files = ["file_1.py", "file_2.py", "file_3.py"]
mock_getmtime.side_effect = [1.0, 2.0, 3.0]
mock_find_path.return_value = dag_files

manager = DagFileProcessorManager(
dag_directory="directory",
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

manager.set_file_paths(dag_files)
manager.prepare_file_path_queue()
assert manager._file_path_queue == collections.deque(["file_3.py", "file_2.py", "file_1.py"])

manager.set_file_paths(dag_files + ["file_4.py"])
manager.add_new_file_path_to_queue()
assert manager._file_path_queue == collections.deque(
["file_4.py", "file_3.py", "file_2.py", "file_1.py"]
)

@conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("zipfile.is_zipfile", return_value=True)
Expand Down Expand Up @@ -432,23 +471,23 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
}
with freeze_time(freezed_base_time):
manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
assert manager._file_path_queue == collections.deque()
# File Path Queue will be empty as the "modified time" < "last finish time"
manager.prepare_file_path_queue()
assert manager._file_path_queue == []
assert manager._file_path_queue == collections.deque()

# Simulate the DAG modification by using modified_time which is greater
# than the last_parse_time but still less than now - min_file_process_interval
file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
file_1_new_mtime_ts = file_1_new_mtime.timestamp()
with freeze_time(freezed_base_time):
manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
assert manager._file_path_queue == collections.deque()
# File Path Queue will be empty as the "modified time" < "last finish time"
mock_getmtime.side_effect = [file_1_new_mtime_ts]
manager.prepare_file_path_queue()
# Check that file is added to the queue even though file was just recently passed
assert manager._file_path_queue == ["file_1.py"]
assert manager._file_path_queue == collections.deque(["file_1.py"])
assert last_finish_time < file_1_new_mtime
assert (
manager._file_process_interval
Expand Down Expand Up @@ -1038,7 +1077,9 @@ def test_callback_queue(self, tmpdir):
manager._add_callback_to_queue(dag2_req1)

# then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last)
assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
assert manager._file_path_queue == collections.deque(
[dag2_req1.full_filepath, dag1_req1.full_filepath]
)
assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath}
assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]
assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1]
Expand All @@ -1047,14 +1088,18 @@ def test_callback_queue(self, tmpdir):
manager._add_callback_to_queue(dag1_sla2)

# then - since sla2 == sla1, should not have brought dag1 to the fore
assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
assert manager._file_path_queue == collections.deque(
[dag2_req1.full_filepath, dag1_req1.full_filepath]
)
assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]

# when
manager._add_callback_to_queue(dag1_req2)

# then - non-sla callback should have brought dag1 to the fore
assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath]
assert manager._file_path_queue == collections.deque(
[dag1_req1.full_filepath, dag2_req1.full_filepath]
)
assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2]


Expand Down

0 comments on commit 65b78b7

Please sign in to comment.