Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin committed Apr 3, 2016
1 parent 4eb2fd4 commit 6516a24
Show file tree
Hide file tree
Showing 7 changed files with 483 additions and 73 deletions.
22 changes: 22 additions & 0 deletions airflow/utils/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import airflow

def get_dag(dag_id, test_dag_folder=None):
""" retrieve DAG from the test dag folder """
dagbag = airflow.models.DagBag(dag_folder=test_dag_folder)
dag = dagbag.dags[dag_id]
dag.clear()
return dag
3 changes: 2 additions & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import absolute_import
from .configuration import *
from .core import *
from .jobs import *
from .models import *
from .operators import *
from .configuration import *
from .contrib import *
from .utils import *
68 changes: 0 additions & 68 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,6 @@
import pickle


class timeout:
"""
A context manager used to limit execution time.
Note -- won't work on Windows (based on signal, like Airflow timeouts)
Based on: http://stackoverflow.com/a/22348885
"""
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message

def handle_timeout(self, signum, frame):
raise ValueError(self.error_message)

def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)

def __exit__(self, type, value, traceback):
signal.alarm(0)


class FakeDatetime(datetime):
"A fake replacement for datetime that can be mocked for testing."

Expand Down Expand Up @@ -253,50 +230,6 @@ def test_schedule_dag_no_end_date_up_to_today_only(self):
def test_confirm_unittest_mod(self):
assert configuration.get('core', 'unit_test_mode')

def test_backfill_examples(self):
self.dagbag = models.DagBag(
dag_folder=DEV_NULL, include_examples=True)
dags = [
dag for dag in self.dagbag.dags.values()
if dag.dag_id in ('example_bash_operator',)]
for dag in dags:
dag.clear(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
for dag in dags:
job = jobs.BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
job.run()

def test_trap_executor_error(self):
"""
Test for https://github.com/airbnb/airflow/pull/1220
Test that errors setting up tasks (before tasks run) are properly
caught
"""
self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
dags = [
dag for dag in self.dagbag.dags.values()
if dag.dag_id in ('test_raise_executor_error',)]
for dag in dags:
dag.clear(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
for dag in dags:
job = jobs.BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
# run with timeout because this creates an infinite loop if not
# caught
def run_with_timeout():
with timeout(seconds=15):
job.run()
self.assertRaises(AirflowException, run_with_timeout)

def test_pickling(self):
dp = self.dag.pickle()
assert self.dag.dag_id == dp.pickle.dag_id
Expand Down Expand Up @@ -669,7 +602,6 @@ def test_bad_trigger_rule(self):
trigger_rule="non_existant",
dag=self.dag)


class CliTests(unittest.TestCase):
def setUp(self):
configuration.test_mode()
Expand Down
32 changes: 32 additions & 0 deletions tests/dags/test_backfill_pooled_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
DAG designed to test what happens when a DAG with pooled tasks is run
by a BackfillJob.
Addresses issue #1225.
"""
from datetime import datetime

from airflow.models import DAG
from airflow.operators import DummyOperator

dag = DAG(dag_id='test_backfill_pooled_task_dag')
task = DummyOperator(
task_id='test_backfill_pooled_task',
dag=dag,
pool='test_backfill_pooled_task_pool',
owner='airflow',
start_date=datetime(2016, 2, 1))
103 changes: 103 additions & 0 deletions tests/dags/test_issue_1225.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
DAG designed to test what happens when a DAG with pooled tasks is run
by a BackfillJob.
Addresses issue #1225.
"""
from datetime import datetime

from airflow.models import DAG
from airflow.operators import DummyOperator, PythonOperator
from airflow.utils.trigger_rule import TriggerRule
DEFAULT_DATE = datetime(2016, 1, 1)

def fail():
raise ValueError('Expected failure.')

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(dag_id='test_backfill_pooled_task_dag', start_date=DEFAULT_DATE)
dag1_task1 = DummyOperator(
task_id='test_backfill_pooled_task',
dag=dag1,
pool='test_backfill_pooled_task_pool',
owner='airflow')

# DAG tests depends_on_past dependencies
dag2 = DAG(dag_id='test_depends_on_past', start_date=DEFAULT_DATE)
dag2_task1 = DummyOperator(
task_id='test_dop_task',
dag=dag2,
depends_on_past=True,
owner='airflow')

# DAG tests that a Dag run that doesn't complete is marked failed
dag3 = DAG(dag_id='test_dagrun_states_fail', start_date=DEFAULT_DATE)
dag3_task1 = PythonOperator(
task_id='test_dagrun_fail',
dag=dag3,
owner='airflow',
python_callable=fail)
dag3_task2 = DummyOperator(
task_id='test_dagrun_succeed',
dag=dag3,
owner='airflow')
dag3_task2.set_upstream(dag3_task1)

# DAG tests that a Dag run that completes but has a failure is marked success
dag4 = DAG(dag_id='test_dagrun_states_success', start_date=DEFAULT_DATE)
dag4_task1 = PythonOperator(
task_id='test_dagrun_fail',
dag=dag4,
owner='airflow',
python_callable=fail,
)
dag4_task2 = DummyOperator(
task_id='test_dagrun_succeed',
dag=dag4,
owner='airflow',
trigger_rule=TriggerRule.ALL_FAILED
)
dag4_task2.set_upstream(dag4_task1)

# DAG tests that a Dag run that completes but has a root failure is marked fail
dag5 = DAG(dag_id='test_dagrun_states_root_fail', start_date=DEFAULT_DATE)
dag5_task1 = DummyOperator(
task_id='test_dagrun_succeed',
dag=dag5,
owner='airflow'
)
dag5_task2 = PythonOperator(
task_id='test_dagrun_fail',
dag=dag5,
owner='airflow',
python_callable=fail,
)

# DAG tests that a Dag run that is deadlocked with no states is failed
dag6 = DAG(dag_id='test_dagrun_states_deadlock', start_date=DEFAULT_DATE)
dag6_task1 = DummyOperator(
task_id='test_depends_on_past',
depends_on_past=True,
dag=dag6,
owner='airflow')
dag6_task2 = DummyOperator(
task_id='test_depends_on_past_2',
depends_on_past=True,
dag=dag6,
owner='airflow')
dag6_task2.set_upstream(dag6_task1)
Loading

0 comments on commit 6516a24

Please sign in to comment.