From 3b863f182d42dc1ba76c2623b9983698a682d5f7 Mon Sep 17 00:00:00 2001 From: Ultrabug Date: Tue, 11 Jul 2017 21:03:43 +0200 Subject: [PATCH] [AIRFLOW-1357] Fix scheduler zip file support Zipped DAGs are supported on the models but not taken into account by the scheduler since 1.8. This fixes the issue. Closes #2406 from ultrabug/jira_1357 --- airflow/utils/dag_processing.py | 5 +++-- tests/jobs.py | 21 +++++++++++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index cdcc7b033b956..2e975c10f9341 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -21,6 +21,7 @@ import os import re import time +import zipfile from abc import ABCMeta, abstractmethod from collections import defaultdict @@ -187,7 +188,7 @@ def list_py_file_paths(directory, safe_mode=True): continue mod_name, file_ext = os.path.splitext( os.path.split(file_path)[-1]) - if file_ext != '.py': + if file_ext != '.py' and not zipfile.is_zipfile(file_path): continue if any([re.findall(p, file_path) for p in patterns]): continue @@ -195,7 +196,7 @@ def list_py_file_paths(directory, safe_mode=True): # Heuristic that guesses whether a Python file contains an # Airflow DAG definition. might_contain_dag = True - if safe_mode: + if safe_mode and not zipfile.is_zipfile(file_path): with open(file_path, 'rb') as f: content = f.read() might_contain_dag = all( diff --git a/tests/jobs.py b/tests/jobs.py index 824cd9def39b5..6e6150bce26f7 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -36,7 +36,7 @@ from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout -from airflow.utils.dag_processing import SimpleDagBag +from airflow.utils.dag_processing import SimpleDagBag, list_py_file_paths from mock import patch from sqlalchemy.orm.session import make_transient @@ -68,7 +68,8 @@ # Filename to be used for dags that are created in an ad-hoc manner and can be removed/ # created at runtime TEMP_DAG_FILENAME = "temp_dag.py" - +TEST_DAGS_FOLDER = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'dags') class BackfillJobTest(unittest.TestCase): @@ -2011,3 +2012,19 @@ def test_remove_file_clears_import_error(self): import_errors = session.query(models.ImportError).all() self.assertEqual(len(import_errors), 0) + + def test_list_py_file_paths(self): + """ + [JIRA-1357] Test the 'list_py_file_paths' function used by the + scheduler to list and load DAGs. + """ + detected_files = [] + expected_files = [] + for file_name in os.listdir(TEST_DAGS_FOLDER): + if file_name.endswith('.py') or file_name.endswith('.zip'): + if file_name not in ['no_dags.py']: + expected_files.append( + '{}/{}'.format(TEST_DAGS_FOLDER, file_name)) + for file_path in list_py_file_paths(TEST_DAGS_FOLDER): + detected_files.append(file_path) + self.assertEqual(sorted(detected_files), sorted(expected_files))