Skip to content

Commit

Permalink
Set DAG_FOLDER for unit tests
Browse files Browse the repository at this point in the history
When tests are running, the default DAG_FOLDER becomes
`airflow/tests/dags`. This makes it much easier to execute DAGs in unit
tests in a standardized manner.

Also exports DAGS_FOLDER as an env var for Travis
  • Loading branch information
jlowin committed Apr 5, 2016
1 parent 6581858 commit 4763720
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 100 deletions.
13 changes: 12 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def run_command(command):
TEST_CONFIG = """\
[core]
airflow_home = {AIRFLOW_HOME}
dags_folder = {AIRFLOW_HOME}/dags
dags_folder = {TEST_DAGS_FOLDER}
base_log_folder = {AIRFLOW_HOME}/logs
executor = SequentialExecutor
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
Expand Down Expand Up @@ -581,6 +581,17 @@ def mkdir_p(path):
else:
AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG'])

# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'dags')
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')


def parameterized_config(template):
"""
Expand Down
22 changes: 0 additions & 22 deletions airflow/utils/tests.py

This file was deleted.

3 changes: 3 additions & 0 deletions scripts/ci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ if [ "${TRAVIS}" ]; then
echo "Using travis airflow.cfg"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cp -f ${DIR}/airflow_travis.cfg ~/airflow/unittests.cfg

ROOTDIR="$(dirname $(dirname $DIR))"
export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags"
fi

echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
Expand Down
2 changes: 1 addition & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
configuration.test_mode()


try:
import cPickle as pickle
Expand Down
10 changes: 10 additions & 0 deletions tests/dags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Unit Tests DAGs Folder

This folder contains DAGs for Airflow unit testing.

To access a DAG in this folder, use the following code inside a unit test. Note this only works when `test_mode` is on; otherwise the normal Airflow `DAGS_FOLDER` will take precedence.

```python
dagbag = DagBag()
dag = dagbag.get(dag_id)
```
37 changes: 15 additions & 22 deletions tests/dags/test_issue_1225.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,83 +21,76 @@
from datetime import datetime

from airflow.models import DAG
from airflow.operators import DummyOperator, PythonOperator
from airflow.operators import DummyOperator, PythonOperator, SubDagOperator
from airflow.utils.trigger_rule import TriggerRule
DEFAULT_DATE = datetime(2016, 1, 1)
default_args = dict(
start_date=DEFAULT_DATE,
owner='airflow')

def fail():
raise ValueError('Expected failure.')

# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(dag_id='test_backfill_pooled_task_dag', start_date=DEFAULT_DATE)
dag1 = DAG(dag_id='test_backfill_pooled_task_dag', default_args=default_args)
dag1_task1 = DummyOperator(
task_id='test_backfill_pooled_task',
dag=dag1,
pool='test_backfill_pooled_task_pool',
owner='airflow')
pool='test_backfill_pooled_task_pool',)

# DAG tests depends_on_past dependencies
dag2 = DAG(dag_id='test_depends_on_past', start_date=DEFAULT_DATE)
dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
dag2_task1 = DummyOperator(
task_id='test_dop_task',
dag=dag2,
depends_on_past=True,
owner='airflow')
depends_on_past=True,)

# DAG tests that a Dag run that doesn't complete is marked failed
dag3 = DAG(dag_id='test_dagrun_states_fail', start_date=DEFAULT_DATE)
dag3 = DAG(dag_id='test_dagrun_states_fail', default_args=default_args)
dag3_task1 = PythonOperator(
task_id='test_dagrun_fail',
dag=dag3,
owner='airflow',
python_callable=fail)
dag3_task2 = DummyOperator(
task_id='test_dagrun_succeed',
dag=dag3,
owner='airflow')
dag=dag3,)
dag3_task2.set_upstream(dag3_task1)

# DAG tests that a Dag run that completes but has a failure is marked success
dag4 = DAG(dag_id='test_dagrun_states_success', start_date=DEFAULT_DATE)
dag4 = DAG(dag_id='test_dagrun_states_success', default_args=default_args)
dag4_task1 = PythonOperator(
task_id='test_dagrun_fail',
dag=dag4,
owner='airflow',
python_callable=fail,
)
dag4_task2 = DummyOperator(
task_id='test_dagrun_succeed',
dag=dag4,
owner='airflow',
trigger_rule=TriggerRule.ALL_FAILED
)
dag4_task2.set_upstream(dag4_task1)

# DAG tests that a Dag run that completes but has a root failure is marked fail
dag5 = DAG(dag_id='test_dagrun_states_root_fail', start_date=DEFAULT_DATE)
dag5 = DAG(dag_id='test_dagrun_states_root_fail', default_args=default_args)
dag5_task1 = DummyOperator(
task_id='test_dagrun_succeed',
dag=dag5,
owner='airflow'
)
dag5_task2 = PythonOperator(
task_id='test_dagrun_fail',
dag=dag5,
owner='airflow',
python_callable=fail,
)

# DAG tests that a Dag run that is deadlocked with no states is failed
dag6 = DAG(dag_id='test_dagrun_states_deadlock', start_date=DEFAULT_DATE)
dag6 = DAG(dag_id='test_dagrun_states_deadlock', default_args=default_args)
dag6_task1 = DummyOperator(
task_id='test_depends_on_past',
depends_on_past=True,
dag=dag6,
owner='airflow')
dag=dag6,)
dag6_task2 = DummyOperator(
task_id='test_depends_on_past_2',
depends_on_past=True,
dag=dag6,
owner='airflow')
dag=dag6,)
dag6_task2.set_upstream(dag6_task1)
73 changes: 29 additions & 44 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,24 @@
from airflow import AirflowException, settings
from airflow.bin import cli
from airflow.jobs import BackfillJob, SchedulerJob
from airflow.models import DagBag, DagRun, Pool, TaskInstance as TI
from airflow.models import DAG, DagBag, DagRun, Pool, TaskInstance as TI
from airflow.operators import DummyOperator
from airflow.utils.state import State
from airflow.utils.tests import get_dag
from airflow.utils.timeout import timeout
from airflow.utils.db import provide_session

DEV_NULL = '/dev/null'
DEFAULT_DATE = datetime.datetime(2016, 1, 1)
TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')


class BackfillJobTest(unittest.TestCase):

def setUp(self):
self.parser = cli.CLIFactory.get_parser()
self.dagbag = DagBag()

def test_backfill_examples(self):
dagbag = DagBag(
dag_folder=DEV_NULL, include_examples=True)
dags = [
dag for dag in dagbag.dags.values()
dag for dag in self.dagbag.dags.values()
if dag.dag_id in ('example_bash_operator',)]
for dag in dags:
dag.clear(
Expand All @@ -65,25 +61,17 @@ def test_trap_executor_error(self):
Test that errors setting up tasks (before tasks run) are properly
caught
"""
dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER)
dags = [
dag for dag in dagbag.dags.values()
if dag.dag_id in ('test_raise_executor_error',)]
for dag in dags:
dag.clear(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
for dag in dags:
job = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
# run with timeout because this creates an infinite loop if not
# caught
def run_with_timeout():
with timeout(seconds=30):
job.run()
self.assertRaises(AirflowException, run_with_timeout)
dag = self.dagbag.get_dag('test_raise_executor_error')
job = BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
# run with timeout because this creates an infinite loop if not
# caught
def run_with_timeout():
with timeout(seconds=30):
job.run()
self.assertRaises(AirflowException, run_with_timeout)

def test_backfill_pooled_task(self):
"""
Expand All @@ -96,7 +84,7 @@ def test_backfill_pooled_task(self):
session.add(pool)
session.commit()

dag = get_dag('test_backfill_pooled_task_dag', TEST_DAGS_FOLDER)
dag = self.dagbag.get_dag('test_backfill_pooled_task_dag')

job = BackfillJob(
dag=dag,
Expand All @@ -115,8 +103,8 @@ def test_backfill_pooled_task(self):
self.assertEqual(ti.state, State.SUCCESS)

def test_backfill_depends_on_past(self):
dag = get_dag('test_depends_on_past', TEST_DAGS_FOLDER)
run_date = dag.start_date + datetime.timedelta(days=5)
dag = self.dagbag.get_dag('test_depends_on_past')
run_date = DEFAULT_DATE + datetime.timedelta(days=5)
# import ipdb; ipdb.set_trace()
BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run()

Expand All @@ -143,18 +131,16 @@ def test_cli_backfill_depends_on_past(self):
'backfill',
dag_id,
'-l',
'-sd',
TEST_DAGS_FOLDER,
'-s',
run_date.isoformat(),
]
dag = get_dag(dag_id, TEST_DAGS_FOLDER)
dag = self.dagbag.get_dag(dag_id)

cli.backfill(self.parser.parse_args(args))
ti = TI(dag.get_task('test_depends_on_past'), run_date)
ti.refresh_from_db()
# task did not run
self.assertEqual(ti.state, State.NONE)
self.assertRaisesRegex(
AirflowException,
'BackfillJob is deadlocked',
cli.backfill,
self.parser.parse_args(args))

cli.backfill(self.parser.parse_args(args + ['-I']))
ti = TI(dag.get_task('test_depends_on_past'), run_date)
Expand All @@ -164,6 +150,10 @@ def test_cli_backfill_depends_on_past(self):


class SchedulerJobTest(unittest.TestCase):

def setUp(self):
self.dagbag = DagBag()

@provide_session
def evaluate_dagrun(
self,
Expand All @@ -181,16 +171,13 @@ def evaluate_dagrun(
run_kwargs = {}

scheduler = SchedulerJob()
dag = get_dag(dag_id, TEST_DAGS_FOLDER)
dag = self.dagbag.get_dag(dag_id)
dr = scheduler.schedule_dag(dag)
if advance_execution_date:
# run a second time to schedule a dagrun after the start_date
dr = scheduler.schedule_dag(dag)
ex_date = dr.execution_date

# if 'test_dagrun_states_deadlock' in dag_id and run_kwargs:
# import ipdb; ipdb.set_trace()

try:
dag.run(start_date=ex_date, end_date=ex_date, **run_kwargs)
except AirflowException:
Expand All @@ -214,8 +201,6 @@ def evaluate_dagrun(
# dagrun is running
self.assertEqual(dr.state, State.RUNNING)

# import ipdb; ipdb.set_trace()

dag.get_active_runs()

# dagrun failed
Expand Down
4 changes: 2 additions & 2 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from airflow.models import TaskInstance as TI
from airflow.operators import DummyOperator, BashOperator
from airflow.utils.state import State
from airflow.utils.tests import get_dag

DEFAULT_DATE = datetime.datetime(2016, 1, 1)
TEST_DAGS_FOLDER = os.path.join(
Expand Down Expand Up @@ -226,7 +225,8 @@ def run_with_error(ti):
self.assertEqual(ti.try_number, 4)

def test_depends_on_past(self):
dag = get_dag('test_depends_on_past', TEST_DAGS_FOLDER)
dagbag = models.DagBag()
dag = dagbag.get_dag('test_depends_on_past')
task = dag.tasks[0]
run_date = task.start_date + datetime.timedelta(days=5)
ti = TI(task, run_date)
Expand Down
Loading

0 comments on commit 4763720

Please sign in to comment.