diff --git a/airflow/configuration.py b/airflow/configuration.py index 6720604581f39..19706180b61ef 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -354,7 +354,7 @@ def run_command(command): TEST_CONFIG = """\ [core] airflow_home = {AIRFLOW_HOME} -dags_folder = {AIRFLOW_HOME}/dags +dags_folder = {TEST_DAGS_FOLDER} base_log_folder = {AIRFLOW_HOME}/logs executor = SequentialExecutor sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db @@ -581,6 +581,17 @@ def mkdir_p(path): else: AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG']) +# Set up dags folder for unit tests +# this directory won't exist if users install via pip +_TEST_DAGS_FOLDER = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))), + 'tests', + 'dags') +if os.path.exists(_TEST_DAGS_FOLDER): + TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER +else: + TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags') + def parameterized_config(template): """ diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py deleted file mode 100644 index a9683db9d146d..0000000000000 --- a/airflow/utils/tests.py +++ /dev/null @@ -1,22 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import airflow - -def get_dag(dag_id, test_dag_folder=None): - """ retrieve DAG from the test dag folder """ - dagbag = airflow.models.DagBag(dag_folder=test_dag_folder) - dag = dagbag.dags[dag_id] - dag.clear() - return dag diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh index 0aa0418114b99..32504390e8375 100755 --- a/scripts/ci/run_tests.sh +++ b/scripts/ci/run_tests.sh @@ -16,6 +16,9 @@ if [ "${TRAVIS}" ]; then echo "Using travis airflow.cfg" DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" cp -f ${DIR}/airflow_travis.cfg ~/airflow/unittests.cfg + + ROOTDIR="$(dirname $(dirname $DIR))" + export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags" fi echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN diff --git a/tests/core.py b/tests/core.py index 373d35ebf075c..2597e7677db53 100644 --- a/tests/core.py +++ b/tests/core.py @@ -43,7 +43,7 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] TEST_DAG_ID = 'unit_tests' -configuration.test_mode() + try: import cPickle as pickle diff --git a/tests/dags/README.md b/tests/dags/README.md new file mode 100644 index 0000000000000..14849f1e9391f --- /dev/null +++ b/tests/dags/README.md @@ -0,0 +1,10 @@ +# Unit Tests DAGs Folder + +This folder contains DAGs for Airflow unit testing. + +To access a DAG in this folder, use the following code inside a unit test. Note this only works when `test_mode` is on; otherwise the normal Airflow `DAGS_FOLDER` will take precedence. + +```python +dagbag = DagBag() +dag = dagbag.get(dag_id) +``` diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 969bea5de23bb..051ff32ef0c33 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -21,83 +21,76 @@ from datetime import datetime from airflow.models import DAG -from airflow.operators import DummyOperator, PythonOperator +from airflow.operators import DummyOperator, PythonOperator, SubDagOperator from airflow.utils.trigger_rule import TriggerRule DEFAULT_DATE = datetime(2016, 1, 1) +default_args = dict( + start_date=DEFAULT_DATE, + owner='airflow') def fail(): raise ValueError('Expected failure.') # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it -dag1 = DAG(dag_id='test_backfill_pooled_task_dag', start_date=DEFAULT_DATE) +dag1 = DAG(dag_id='test_backfill_pooled_task_dag', default_args=default_args) dag1_task1 = DummyOperator( task_id='test_backfill_pooled_task', dag=dag1, - pool='test_backfill_pooled_task_pool', - owner='airflow') + pool='test_backfill_pooled_task_pool',) # DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', start_date=DEFAULT_DATE) +dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) dag2_task1 = DummyOperator( task_id='test_dop_task', dag=dag2, - depends_on_past=True, - owner='airflow') + depends_on_past=True,) # DAG tests that a Dag run that doesn't complete is marked failed -dag3 = DAG(dag_id='test_dagrun_states_fail', start_date=DEFAULT_DATE) +dag3 = DAG(dag_id='test_dagrun_states_fail', default_args=default_args) dag3_task1 = PythonOperator( task_id='test_dagrun_fail', dag=dag3, - owner='airflow', python_callable=fail) dag3_task2 = DummyOperator( task_id='test_dagrun_succeed', - dag=dag3, - owner='airflow') + dag=dag3,) dag3_task2.set_upstream(dag3_task1) # DAG tests that a Dag run that completes but has a failure is marked success -dag4 = DAG(dag_id='test_dagrun_states_success', start_date=DEFAULT_DATE) +dag4 = DAG(dag_id='test_dagrun_states_success', default_args=default_args) dag4_task1 = PythonOperator( task_id='test_dagrun_fail', dag=dag4, - owner='airflow', python_callable=fail, ) dag4_task2 = DummyOperator( task_id='test_dagrun_succeed', dag=dag4, - owner='airflow', trigger_rule=TriggerRule.ALL_FAILED ) dag4_task2.set_upstream(dag4_task1) # DAG tests that a Dag run that completes but has a root failure is marked fail -dag5 = DAG(dag_id='test_dagrun_states_root_fail', start_date=DEFAULT_DATE) +dag5 = DAG(dag_id='test_dagrun_states_root_fail', default_args=default_args) dag5_task1 = DummyOperator( task_id='test_dagrun_succeed', dag=dag5, - owner='airflow' ) dag5_task2 = PythonOperator( task_id='test_dagrun_fail', dag=dag5, - owner='airflow', python_callable=fail, ) # DAG tests that a Dag run that is deadlocked with no states is failed -dag6 = DAG(dag_id='test_dagrun_states_deadlock', start_date=DEFAULT_DATE) +dag6 = DAG(dag_id='test_dagrun_states_deadlock', default_args=default_args) dag6_task1 = DummyOperator( task_id='test_depends_on_past', depends_on_past=True, - dag=dag6, - owner='airflow') + dag=dag6,) dag6_task2 = DummyOperator( task_id='test_depends_on_past_2', depends_on_past=True, - dag=dag6, - owner='airflow') + dag=dag6,) dag6_task2.set_upstream(dag6_task1) diff --git a/tests/jobs.py b/tests/jobs.py index d11b2cc20b35d..39343e40b5e6d 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -24,28 +24,24 @@ from airflow import AirflowException, settings from airflow.bin import cli from airflow.jobs import BackfillJob, SchedulerJob -from airflow.models import DagBag, DagRun, Pool, TaskInstance as TI +from airflow.models import DAG, DagBag, DagRun, Pool, TaskInstance as TI +from airflow.operators import DummyOperator from airflow.utils.state import State -from airflow.utils.tests import get_dag from airflow.utils.timeout import timeout from airflow.utils.db import provide_session DEV_NULL = '/dev/null' DEFAULT_DATE = datetime.datetime(2016, 1, 1) -TEST_DAGS_FOLDER = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'dags') - class BackfillJobTest(unittest.TestCase): def setUp(self): self.parser = cli.CLIFactory.get_parser() + self.dagbag = DagBag() def test_backfill_examples(self): - dagbag = DagBag( - dag_folder=DEV_NULL, include_examples=True) dags = [ - dag for dag in dagbag.dags.values() + dag for dag in self.dagbag.dags.values() if dag.dag_id in ('example_bash_operator',)] for dag in dags: dag.clear( @@ -65,25 +61,17 @@ def test_trap_executor_error(self): Test that errors setting up tasks (before tasks run) are properly caught """ - dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER) - dags = [ - dag for dag in dagbag.dags.values() - if dag.dag_id in ('test_raise_executor_error',)] - for dag in dags: - dag.clear( - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE) - for dag in dags: - job = BackfillJob( - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE) - # run with timeout because this creates an infinite loop if not - # caught - def run_with_timeout(): - with timeout(seconds=30): - job.run() - self.assertRaises(AirflowException, run_with_timeout) + dag = self.dagbag.get_dag('test_raise_executor_error') + job = BackfillJob( + dag=dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE) + # run with timeout because this creates an infinite loop if not + # caught + def run_with_timeout(): + with timeout(seconds=30): + job.run() + self.assertRaises(AirflowException, run_with_timeout) def test_backfill_pooled_task(self): """ @@ -96,7 +84,7 @@ def test_backfill_pooled_task(self): session.add(pool) session.commit() - dag = get_dag('test_backfill_pooled_task_dag', TEST_DAGS_FOLDER) + dag = self.dagbag.get_dag('test_backfill_pooled_task_dag') job = BackfillJob( dag=dag, @@ -115,8 +103,8 @@ def test_backfill_pooled_task(self): self.assertEqual(ti.state, State.SUCCESS) def test_backfill_depends_on_past(self): - dag = get_dag('test_depends_on_past', TEST_DAGS_FOLDER) - run_date = dag.start_date + datetime.timedelta(days=5) + dag = self.dagbag.get_dag('test_depends_on_past') + run_date = DEFAULT_DATE + datetime.timedelta(days=5) # import ipdb; ipdb.set_trace() BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run() @@ -143,18 +131,16 @@ def test_cli_backfill_depends_on_past(self): 'backfill', dag_id, '-l', - '-sd', - TEST_DAGS_FOLDER, '-s', run_date.isoformat(), ] - dag = get_dag(dag_id, TEST_DAGS_FOLDER) + dag = self.dagbag.get_dag(dag_id) - cli.backfill(self.parser.parse_args(args)) - ti = TI(dag.get_task('test_depends_on_past'), run_date) - ti.refresh_from_db() - # task did not run - self.assertEqual(ti.state, State.NONE) + self.assertRaisesRegex( + AirflowException, + 'BackfillJob is deadlocked', + cli.backfill, + self.parser.parse_args(args)) cli.backfill(self.parser.parse_args(args + ['-I'])) ti = TI(dag.get_task('test_depends_on_past'), run_date) @@ -164,6 +150,10 @@ def test_cli_backfill_depends_on_past(self): class SchedulerJobTest(unittest.TestCase): + + def setUp(self): + self.dagbag = DagBag() + @provide_session def evaluate_dagrun( self, @@ -181,16 +171,13 @@ def evaluate_dagrun( run_kwargs = {} scheduler = SchedulerJob() - dag = get_dag(dag_id, TEST_DAGS_FOLDER) + dag = self.dagbag.get_dag(dag_id) dr = scheduler.schedule_dag(dag) if advance_execution_date: # run a second time to schedule a dagrun after the start_date dr = scheduler.schedule_dag(dag) ex_date = dr.execution_date - # if 'test_dagrun_states_deadlock' in dag_id and run_kwargs: - # import ipdb; ipdb.set_trace() - try: dag.run(start_date=ex_date, end_date=ex_date, **run_kwargs) except AirflowException: @@ -214,8 +201,6 @@ def evaluate_dagrun( # dagrun is running self.assertEqual(dr.state, State.RUNNING) - # import ipdb; ipdb.set_trace() - dag.get_active_runs() # dagrun failed diff --git a/tests/models.py b/tests/models.py index 288852fa180a6..8f04f4bc1ec5d 100644 --- a/tests/models.py +++ b/tests/models.py @@ -26,7 +26,6 @@ from airflow.models import TaskInstance as TI from airflow.operators import DummyOperator, BashOperator from airflow.utils.state import State -from airflow.utils.tests import get_dag DEFAULT_DATE = datetime.datetime(2016, 1, 1) TEST_DAGS_FOLDER = os.path.join( @@ -226,7 +225,8 @@ def run_with_error(ti): self.assertEqual(ti.try_number, 4) def test_depends_on_past(self): - dag = get_dag('test_depends_on_past', TEST_DAGS_FOLDER) + dagbag = models.DagBag() + dag = dagbag.get_dag('test_depends_on_past') task = dag.tasks[0] run_date = task.start_date + datetime.timedelta(days=5) ti = TI(task, run_date) diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index 2de1ee1b25b56..b3dd4f1641527 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -12,18 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import os import unittest -from datetime import datetime import airflow -from airflow import DAG -from airflow.operators import DummyOperator -from airflow.operators.subdag_operator import SubDagOperator +from airflow.models import DAG, DagBag +from airflow.operators import BashOperator, DummyOperator, SubDagOperator +from airflow.jobs import BackfillJob from airflow.exceptions import AirflowException +DEFAULT_DATE = datetime.datetime(2016, 1, 1) + default_args = dict( owner='airflow', - start_date=datetime(2016, 1, 1), + start_date=DEFAULT_DATE, ) class SubDagOperatorTests(unittest.TestCase): @@ -54,7 +57,7 @@ def test_subdag_pools(self): Subdags and subdag tasks can't both have a pool with 1 slot """ dag = DAG('parent', default_args=default_args) - subdag = DAG('parent.test', default_args=default_args) + subdag = DAG('parent.child', default_args=default_args) session = airflow.settings.Session() pool_1 = airflow.models.Pool(pool='test_pool_1', slots=1) @@ -68,12 +71,12 @@ def test_subdag_pools(self): self.assertRaises( AirflowException, SubDagOperator, - task_id='test', dag=dag, subdag=subdag, pool='test_pool_1') + task_id='child', dag=dag, subdag=subdag, pool='test_pool_1') # recreate dag because failed subdagoperator was already added dag = DAG('parent', default_args=default_args) SubDagOperator( - task_id='test', dag=dag, subdag=subdag, pool='test_pool_10') + task_id='child', dag=dag, subdag=subdag, pool='test_pool_10') session.delete(pool_1) session.delete(pool_10)