Skip to content

Commit

Permalink
Ensure the dagbag_size metric decreases when files are deleted (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
ashb authored Dec 6, 2022
1 parent bdf3175 commit 2c7bd92
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
35 changes: 29 additions & 6 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,12 @@ def set_file_paths(self, new_file_paths):
Stats.decr("dag_processing.processes")
processor.terminate()
self._file_stats.pop(file_path)

to_remove = set(self._file_stats.keys()) - set(self._file_paths)
for key in to_remove:
# Remove the stats for any dag files that don't exist anymore
del self._file_stats[key]

self._processors = filtered_processors

def wait_until_finished(self):
Expand Down Expand Up @@ -1064,13 +1070,16 @@ def prepare_file_path_queue(self):
is_mtime_mode = list_mode == "modified_time"

file_paths_recently_processed = []
file_paths_to_stop_watching = set()
for file_path in self._file_paths:

if is_mtime_mode:
try:
files_with_mtime[file_path] = os.path.getmtime(file_path)
except FileNotFoundError:
self.log.warning("Skipping processing of missing file: %s", file_path)
self._file_stats.pop(file_path, None)
file_paths_to_stop_watching.add(file_path)
continue
file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
else:
Expand Down Expand Up @@ -1099,12 +1108,18 @@ def prepare_file_path_queue(self):
# set of files. Since we set the seed, the sort order will remain same per host
random.Random(get_hostname()).shuffle(file_paths)

if file_paths_to_stop_watching:
self.set_file_paths(
[path for path in self._file_paths if path not in file_paths_to_stop_watching]
)

files_paths_at_run_limit = [
file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
]

file_paths_to_exclude = set(file_paths_in_progress).union(
file_paths_recently_processed, files_paths_at_run_limit
file_paths_recently_processed,
files_paths_at_run_limit,
)

# Do not convert the following list to set as set does not preserve the order
Expand All @@ -1122,12 +1137,11 @@ def prepare_file_path_queue(self):

self.log.debug("Queuing the following files for processing:\n\t%s", "\n\t".join(files_paths_to_queue))

default = DagFileStat(
num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
)
for file_path in files_paths_to_queue:
if file_path not in self._file_stats:
self._file_stats[file_path] = DagFileStat(
num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
)

self._file_stats.setdefault(file_path, default)
self._file_path_queue.extend(files_paths_to_queue)

def _kill_timed_out_processors(self):
Expand All @@ -1153,6 +1167,15 @@ def _kill_timed_out_processors(self):
self.waitables.pop(processor.waitable_handle)
processors_to_remove.append(file_path)

stat = DagFileStat(
num_dags=0,
import_errors=1,
last_finish_time=now,
last_duration=duration,
run_count=self.get_run_count(file_path) + 1,
)
self._file_stats[processor.file_path] = stat

# Clean up `self._processors` after iterating over it
for proc in processors_to_remove:
self._processors.pop(proc)
Expand Down
1 change: 1 addition & 0 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):

manager.set_file_paths(["abc.txt"])
assert manager._processors == {}
assert "missing_file.txt" not in manager._file_stats

def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
manager = DagFileProcessorManager(
Expand Down

0 comments on commit 2c7bd92

Please sign in to comment.