Skip to content

Commit

Permalink
[AIRFLOW-1357] Fix scheduler zip file support
Browse files Browse the repository at this point in the history
Zipped DAGs are supported on the models but not
taken into account
by the scheduler since 1.8. This fixes the issue.

Closes apache#2406 from ultrabug/jira_1357
  • Loading branch information
ultrabug authored and bolkedebruin committed Jul 11, 2017
1 parent f6f73b5 commit 3b863f1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
5 changes: 3 additions & 2 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import re
import time
import zipfile

from abc import ABCMeta, abstractmethod
from collections import defaultdict
Expand Down Expand Up @@ -187,15 +188,15 @@ def list_py_file_paths(directory, safe_mode=True):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(file_path)[-1])
if file_ext != '.py':
if file_ext != '.py' and not zipfile.is_zipfile(file_path):
continue
if any([re.findall(p, file_path) for p in patterns]):
continue

# Heuristic that guesses whether a Python file contains an
# Airflow DAG definition.
might_contain_dag = True
if safe_mode:
if safe_mode and not zipfile.is_zipfile(file_path):
with open(file_path, 'rb') as f:
content = f.read()
might_contain_dag = all(
Expand Down
21 changes: 19 additions & 2 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
from airflow.utils.dag_processing import SimpleDagBag
from airflow.utils.dag_processing import SimpleDagBag, list_py_file_paths

from mock import patch
from sqlalchemy.orm.session import make_transient
Expand Down Expand Up @@ -68,7 +68,8 @@
# Filename to be used for dags that are created in an ad-hoc manner and can be removed/
# created at runtime
TEMP_DAG_FILENAME = "temp_dag.py"

TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')

class BackfillJobTest(unittest.TestCase):

Expand Down Expand Up @@ -2011,3 +2012,19 @@ def test_remove_file_clears_import_error(self):
import_errors = session.query(models.ImportError).all()

self.assertEqual(len(import_errors), 0)

def test_list_py_file_paths(self):
"""
[JIRA-1357] Test the 'list_py_file_paths' function used by the
scheduler to list and load DAGs.
"""
detected_files = []
expected_files = []
for file_name in os.listdir(TEST_DAGS_FOLDER):
if file_name.endswith('.py') or file_name.endswith('.zip'):
if file_name not in ['no_dags.py']:
expected_files.append(
'{}/{}'.format(TEST_DAGS_FOLDER, file_name))
for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
detected_files.append(file_path)
self.assertEqual(sorted(detected_files), sorted(expected_files))

0 comments on commit 3b863f1

Please sign in to comment.