From 4763720530fdd179df1239d2ff331930368b772b Mon Sep 17 00:00:00 2001 From: jlowin Date: Sun, 3 Apr 2016 21:56:46 -0400 Subject: [PATCH 1/3] Set DAG_FOLDER for unit tests When tests are running, the default DAG_FOLDER becomes `airflow/tests/dags`. This makes it much easier to execute DAGs in unit tests in a standardized manner. Also exports DAGS_FOLDER as an env var for Travis --- airflow/configuration.py | 13 +++++- airflow/utils/tests.py | 22 --------- scripts/ci/run_tests.sh | 3 ++ tests/core.py | 2 +- tests/dags/README.md | 10 ++++ tests/dags/test_issue_1225.py | 37 ++++++--------- tests/jobs.py | 73 ++++++++++++------------------ tests/models.py | 4 +- tests/operators/subdag_operator.py | 19 ++++---- 9 files changed, 83 insertions(+), 100 deletions(-) delete mode 100644 airflow/utils/tests.py create mode 100644 tests/dags/README.md diff --git a/airflow/configuration.py b/airflow/configuration.py index 6720604581f39..19706180b61ef 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -354,7 +354,7 @@ def run_command(command): TEST_CONFIG = """\ [core] airflow_home = {AIRFLOW_HOME} -dags_folder = {AIRFLOW_HOME}/dags +dags_folder = {TEST_DAGS_FOLDER} base_log_folder = {AIRFLOW_HOME}/logs executor = SequentialExecutor sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db @@ -581,6 +581,17 @@ def mkdir_p(path): else: AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG']) +# Set up dags folder for unit tests +# this directory won't exist if users install via pip +_TEST_DAGS_FOLDER = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))), + 'tests', + 'dags') +if os.path.exists(_TEST_DAGS_FOLDER): + TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER +else: + TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags') + def parameterized_config(template): """ diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py deleted file mode 100644 index a9683db9d146d..0000000000000 --- a/airflow/utils/tests.py +++ /dev/null @@ -1,22 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import airflow - -def get_dag(dag_id, test_dag_folder=None): - """ retrieve DAG from the test dag folder """ - dagbag = airflow.models.DagBag(dag_folder=test_dag_folder) - dag = dagbag.dags[dag_id] - dag.clear() - return dag diff --git a/scripts/ci/run_tests.sh b/scripts/ci/run_tests.sh index 0aa0418114b99..32504390e8375 100755 --- a/scripts/ci/run_tests.sh +++ b/scripts/ci/run_tests.sh @@ -16,6 +16,9 @@ if [ "${TRAVIS}" ]; then echo "Using travis airflow.cfg" DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" cp -f ${DIR}/airflow_travis.cfg ~/airflow/unittests.cfg + + ROOTDIR="$(dirname $(dirname $DIR))" + export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags" fi echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN diff --git a/tests/core.py b/tests/core.py index 373d35ebf075c..2597e7677db53 100644 --- a/tests/core.py +++ b/tests/core.py @@ -43,7 +43,7 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] TEST_DAG_ID = 'unit_tests' -configuration.test_mode() + try: import cPickle as pickle diff --git a/tests/dags/README.md b/tests/dags/README.md new file mode 100644 index 0000000000000..14849f1e9391f --- /dev/null +++ b/tests/dags/README.md @@ -0,0 +1,10 @@ +# Unit Tests DAGs Folder + +This folder contains DAGs for Airflow unit testing. + +To access a DAG in this folder, use the following code inside a unit test. Note this only works when `test_mode` is on; otherwise the normal Airflow `DAGS_FOLDER` will take precedence. + +```python +dagbag = DagBag() +dag = dagbag.get(dag_id) +``` diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 969bea5de23bb..051ff32ef0c33 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -21,83 +21,76 @@ from datetime import datetime from airflow.models import DAG -from airflow.operators import DummyOperator, PythonOperator +from airflow.operators import DummyOperator, PythonOperator, SubDagOperator from airflow.utils.trigger_rule import TriggerRule DEFAULT_DATE = datetime(2016, 1, 1) +default_args = dict( + start_date=DEFAULT_DATE, + owner='airflow') def fail(): raise ValueError('Expected failure.') # DAG tests backfill with pooled tasks # Previously backfill would queue the task but never run it -dag1 = DAG(dag_id='test_backfill_pooled_task_dag', start_date=DEFAULT_DATE) +dag1 = DAG(dag_id='test_backfill_pooled_task_dag', default_args=default_args) dag1_task1 = DummyOperator( task_id='test_backfill_pooled_task', dag=dag1, - pool='test_backfill_pooled_task_pool', - owner='airflow') + pool='test_backfill_pooled_task_pool',) # DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', start_date=DEFAULT_DATE) +dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) dag2_task1 = DummyOperator( task_id='test_dop_task', dag=dag2, - depends_on_past=True, - owner='airflow') + depends_on_past=True,) # DAG tests that a Dag run that doesn't complete is marked failed -dag3 = DAG(dag_id='test_dagrun_states_fail', start_date=DEFAULT_DATE) +dag3 = DAG(dag_id='test_dagrun_states_fail', default_args=default_args) dag3_task1 = PythonOperator( task_id='test_dagrun_fail', dag=dag3, - owner='airflow', python_callable=fail) dag3_task2 = DummyOperator( task_id='test_dagrun_succeed', - dag=dag3, - owner='airflow') + dag=dag3,) dag3_task2.set_upstream(dag3_task1) # DAG tests that a Dag run that completes but has a failure is marked success -dag4 = DAG(dag_id='test_dagrun_states_success', start_date=DEFAULT_DATE) +dag4 = DAG(dag_id='test_dagrun_states_success', default_args=default_args) dag4_task1 = PythonOperator( task_id='test_dagrun_fail', dag=dag4, - owner='airflow', python_callable=fail, ) dag4_task2 = DummyOperator( task_id='test_dagrun_succeed', dag=dag4, - owner='airflow', trigger_rule=TriggerRule.ALL_FAILED ) dag4_task2.set_upstream(dag4_task1) # DAG tests that a Dag run that completes but has a root failure is marked fail -dag5 = DAG(dag_id='test_dagrun_states_root_fail', start_date=DEFAULT_DATE) +dag5 = DAG(dag_id='test_dagrun_states_root_fail', default_args=default_args) dag5_task1 = DummyOperator( task_id='test_dagrun_succeed', dag=dag5, - owner='airflow' ) dag5_task2 = PythonOperator( task_id='test_dagrun_fail', dag=dag5, - owner='airflow', python_callable=fail, ) # DAG tests that a Dag run that is deadlocked with no states is failed -dag6 = DAG(dag_id='test_dagrun_states_deadlock', start_date=DEFAULT_DATE) +dag6 = DAG(dag_id='test_dagrun_states_deadlock', default_args=default_args) dag6_task1 = DummyOperator( task_id='test_depends_on_past', depends_on_past=True, - dag=dag6, - owner='airflow') + dag=dag6,) dag6_task2 = DummyOperator( task_id='test_depends_on_past_2', depends_on_past=True, - dag=dag6, - owner='airflow') + dag=dag6,) dag6_task2.set_upstream(dag6_task1) diff --git a/tests/jobs.py b/tests/jobs.py index d11b2cc20b35d..39343e40b5e6d 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -24,28 +24,24 @@ from airflow import AirflowException, settings from airflow.bin import cli from airflow.jobs import BackfillJob, SchedulerJob -from airflow.models import DagBag, DagRun, Pool, TaskInstance as TI +from airflow.models import DAG, DagBag, DagRun, Pool, TaskInstance as TI +from airflow.operators import DummyOperator from airflow.utils.state import State -from airflow.utils.tests import get_dag from airflow.utils.timeout import timeout from airflow.utils.db import provide_session DEV_NULL = '/dev/null' DEFAULT_DATE = datetime.datetime(2016, 1, 1) -TEST_DAGS_FOLDER = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'dags') - class BackfillJobTest(unittest.TestCase): def setUp(self): self.parser = cli.CLIFactory.get_parser() + self.dagbag = DagBag() def test_backfill_examples(self): - dagbag = DagBag( - dag_folder=DEV_NULL, include_examples=True) dags = [ - dag for dag in dagbag.dags.values() + dag for dag in self.dagbag.dags.values() if dag.dag_id in ('example_bash_operator',)] for dag in dags: dag.clear( @@ -65,25 +61,17 @@ def test_trap_executor_error(self): Test that errors setting up tasks (before tasks run) are properly caught """ - dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER) - dags = [ - dag for dag in dagbag.dags.values() - if dag.dag_id in ('test_raise_executor_error',)] - for dag in dags: - dag.clear( - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE) - for dag in dags: - job = BackfillJob( - dag=dag, - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE) - # run with timeout because this creates an infinite loop if not - # caught - def run_with_timeout(): - with timeout(seconds=30): - job.run() - self.assertRaises(AirflowException, run_with_timeout) + dag = self.dagbag.get_dag('test_raise_executor_error') + job = BackfillJob( + dag=dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE) + # run with timeout because this creates an infinite loop if not + # caught + def run_with_timeout(): + with timeout(seconds=30): + job.run() + self.assertRaises(AirflowException, run_with_timeout) def test_backfill_pooled_task(self): """ @@ -96,7 +84,7 @@ def test_backfill_pooled_task(self): session.add(pool) session.commit() - dag = get_dag('test_backfill_pooled_task_dag', TEST_DAGS_FOLDER) + dag = self.dagbag.get_dag('test_backfill_pooled_task_dag') job = BackfillJob( dag=dag, @@ -115,8 +103,8 @@ def test_backfill_pooled_task(self): self.assertEqual(ti.state, State.SUCCESS) def test_backfill_depends_on_past(self): - dag = get_dag('test_depends_on_past', TEST_DAGS_FOLDER) - run_date = dag.start_date + datetime.timedelta(days=5) + dag = self.dagbag.get_dag('test_depends_on_past') + run_date = DEFAULT_DATE + datetime.timedelta(days=5) # import ipdb; ipdb.set_trace() BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run() @@ -143,18 +131,16 @@ def test_cli_backfill_depends_on_past(self): 'backfill', dag_id, '-l', - '-sd', - TEST_DAGS_FOLDER, '-s', run_date.isoformat(), ] - dag = get_dag(dag_id, TEST_DAGS_FOLDER) + dag = self.dagbag.get_dag(dag_id) - cli.backfill(self.parser.parse_args(args)) - ti = TI(dag.get_task('test_depends_on_past'), run_date) - ti.refresh_from_db() - # task did not run - self.assertEqual(ti.state, State.NONE) + self.assertRaisesRegex( + AirflowException, + 'BackfillJob is deadlocked', + cli.backfill, + self.parser.parse_args(args)) cli.backfill(self.parser.parse_args(args + ['-I'])) ti = TI(dag.get_task('test_depends_on_past'), run_date) @@ -164,6 +150,10 @@ def test_cli_backfill_depends_on_past(self): class SchedulerJobTest(unittest.TestCase): + + def setUp(self): + self.dagbag = DagBag() + @provide_session def evaluate_dagrun( self, @@ -181,16 +171,13 @@ def evaluate_dagrun( run_kwargs = {} scheduler = SchedulerJob() - dag = get_dag(dag_id, TEST_DAGS_FOLDER) + dag = self.dagbag.get_dag(dag_id) dr = scheduler.schedule_dag(dag) if advance_execution_date: # run a second time to schedule a dagrun after the start_date dr = scheduler.schedule_dag(dag) ex_date = dr.execution_date - # if 'test_dagrun_states_deadlock' in dag_id and run_kwargs: - # import ipdb; ipdb.set_trace() - try: dag.run(start_date=ex_date, end_date=ex_date, **run_kwargs) except AirflowException: @@ -214,8 +201,6 @@ def evaluate_dagrun( # dagrun is running self.assertEqual(dr.state, State.RUNNING) - # import ipdb; ipdb.set_trace() - dag.get_active_runs() # dagrun failed diff --git a/tests/models.py b/tests/models.py index 288852fa180a6..8f04f4bc1ec5d 100644 --- a/tests/models.py +++ b/tests/models.py @@ -26,7 +26,6 @@ from airflow.models import TaskInstance as TI from airflow.operators import DummyOperator, BashOperator from airflow.utils.state import State -from airflow.utils.tests import get_dag DEFAULT_DATE = datetime.datetime(2016, 1, 1) TEST_DAGS_FOLDER = os.path.join( @@ -226,7 +225,8 @@ def run_with_error(ti): self.assertEqual(ti.try_number, 4) def test_depends_on_past(self): - dag = get_dag('test_depends_on_past', TEST_DAGS_FOLDER) + dagbag = models.DagBag() + dag = dagbag.get_dag('test_depends_on_past') task = dag.tasks[0] run_date = task.start_date + datetime.timedelta(days=5) ti = TI(task, run_date) diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index 2de1ee1b25b56..b3dd4f1641527 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -12,18 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import os import unittest -from datetime import datetime import airflow -from airflow import DAG -from airflow.operators import DummyOperator -from airflow.operators.subdag_operator import SubDagOperator +from airflow.models import DAG, DagBag +from airflow.operators import BashOperator, DummyOperator, SubDagOperator +from airflow.jobs import BackfillJob from airflow.exceptions import AirflowException +DEFAULT_DATE = datetime.datetime(2016, 1, 1) + default_args = dict( owner='airflow', - start_date=datetime(2016, 1, 1), + start_date=DEFAULT_DATE, ) class SubDagOperatorTests(unittest.TestCase): @@ -54,7 +57,7 @@ def test_subdag_pools(self): Subdags and subdag tasks can't both have a pool with 1 slot """ dag = DAG('parent', default_args=default_args) - subdag = DAG('parent.test', default_args=default_args) + subdag = DAG('parent.child', default_args=default_args) session = airflow.settings.Session() pool_1 = airflow.models.Pool(pool='test_pool_1', slots=1) @@ -68,12 +71,12 @@ def test_subdag_pools(self): self.assertRaises( AirflowException, SubDagOperator, - task_id='test', dag=dag, subdag=subdag, pool='test_pool_1') + task_id='child', dag=dag, subdag=subdag, pool='test_pool_1') # recreate dag because failed subdagoperator was already added dag = DAG('parent', default_args=default_args) SubDagOperator( - task_id='test', dag=dag, subdag=subdag, pool='test_pool_10') + task_id='child', dag=dag, subdag=subdag, pool='test_pool_10') session.delete(pool_1) session.delete(pool_10) From 2e0421a28347de9a24bb14f37d33988c50b901b2 Mon Sep 17 00:00:00 2001 From: jlowin Date: Sun, 3 Apr 2016 22:06:16 -0400 Subject: [PATCH 2/3] Fix handling of deadlocked jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Raise an error when a backfill deadlocks Deadlocked backfills didn’t raise AirflowExceptions, so SubDagOperators didn’t recognize that their subdags were failing. - Fix bug with marking DagRuns as failed - Let SchedulerJob mark DagRuns as deadlocked when there are no TIs available; other deadlock metrics depend on TIs - Adds unit tests. --- airflow/bin/cli.py | 3 +- airflow/jobs.py | 163 +++++++++++++++++------------ airflow/models.py | 3 +- airflow/utils/state.py | 18 ++++ tests/dags/test_issue_1225.py | 21 ++++ tests/jobs.py | 51 +++++---- tests/models.py | 1 + tests/operators/subdag_operator.py | 14 ++- tests/utils.py | 3 + 9 files changed, 188 insertions(+), 89 deletions(-) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 310a471c4b7f1..4c3cafb8052a7 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -39,7 +39,8 @@ def process_subdir(subdir): def get_dag(args): dagbag = DagBag(process_subdir(args.subdir)) if args.dag_id not in dagbag.dags: - raise AirflowException('dag_id could not be found') + raise AirflowException( + 'dag_id could not be found: {}'.format(args.dag_id)) return dagbag.dags[args.dag_id] diff --git a/airflow/jobs.py b/airflow/jobs.py index 34318f394b81b..7e4867a770e25 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -499,6 +499,7 @@ def process_dag(self, dag, executor): skip_tis = {(ti[0], ti[1]) for ti in qry.all()} descartes = [obj for obj in product(dag.tasks, active_runs)] + could_not_run = set() self.logger.info('Checking dependencies on {} tasks instances, minus {} ' 'skippable ones'.format(len(descartes), len(skip_tis))) for task, dttm in descartes: @@ -513,6 +514,23 @@ def process_dag(self, dag, executor): elif ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Firing task: {}'.format(ti)) executor.queue_task_instance(ti, pickle_id=pickle_id) + else: + could_not_run.add(ti) + + # this type of deadlock happens when dagruns can't even start and so + # the TI's haven't been persisted to the database. + if len(could_not_run) == len(descartes): + self.logger.error( + 'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id)) + (session + .query(models.DagRun) + .filter( + models.DagRun.dag_id == dag.dag_id, + models.DagRun.state == State.RUNNING, + models.DagRun.execution_date.in_(active_runs)) + .update( + {models.DagRun.state: State.FAILED}, + synchronize_session='fetch')) # Releasing the lock self.logger.debug("Unlocking DAG (scheduler_lock)") @@ -781,11 +799,12 @@ def _execute(self): # Build a list of all instances to run tasks_to_run = {} - failed = [] - succeeded = [] - started = [] - wont_run = [] - not_ready_to_run = set() + failed = set() + succeeded = set() + started = set() + skipped = set() + not_ready = set() + deadlocked = set() for task in self.dag.tasks: if (not self.include_adhoc) and task.adhoc: @@ -800,9 +819,8 @@ def _execute(self): session.commit() # Triggering what is ready to get triggered - deadlocked = False while tasks_to_run and not deadlocked: - + not_ready.clear() for key, ti in list(tasks_to_run.items()): ti.refresh_from_db() @@ -810,18 +828,24 @@ def _execute(self): self.ignore_first_depends_on_past and ti.execution_date == (start_date or ti.start_date)) - # Did the task finish without failing? -- then we're done - if ( - ti.state in (State.SUCCESS, State.SKIPPED) and - key in tasks_to_run): - succeeded.append(key) - tasks_to_run.pop(key) + # The task was already marked successful or skipped by a + # different Job. Don't rerun it. + if key not in started: + if ti.state == State.SUCCESS: + succeeded.add(key) + tasks_to_run.pop(key) + continue + elif ti.state == State.SKIPPED: + skipped.add(key) + tasks_to_run.pop(key) + continue - # Is the task runnable? -- the run it - elif ti.is_queueable( + # Is the task runnable? -- then run it + if ti.is_queueable( include_queued=True, ignore_depends_on_past=ignore_depends_on_past, flag_upstream_failed=True): + self.logger.debug('Sending {} to executor'.format(ti)) executor.queue_task_instance( ti, mark_success=self.mark_success, @@ -829,38 +853,22 @@ def _execute(self): ignore_dependencies=self.ignore_dependencies, ignore_depends_on_past=ignore_depends_on_past, pool=self.pool) - ti.state = State.RUNNING - if key not in started: - started.append(key) - if ti in not_ready_to_run: - not_ready_to_run.remove(ti) - - # Mark the task as not ready to run. If the set of tasks - # that aren't ready ever equals the set of tasks to run, - # then the backfill is deadlocked + started.add(key) + + # Mark the task as not ready to run elif ti.state in (State.NONE, State.UPSTREAM_FAILED): - not_ready_to_run.add(ti) - if not_ready_to_run == set(tasks_to_run.values()): - msg = 'BackfillJob is deadlocked: no tasks can be run.' - if any( - t.are_dependencies_met() != - t.are_dependencies_met( - ignore_depends_on_past=True) - for t in tasks_to_run.values()): - msg += ( - ' Some of the tasks that were unable to ' - 'run have "depends_on_past=True". Try running ' - 'the backfill with the option ' - '"ignore_first_depends_on_past=True" ' - ' or passing "-I" at the command line.') - self.logger.error(msg) - deadlocked = True - wont_run.extend(not_ready_to_run) - tasks_to_run.clear() + self.logger.debug('Added {} to not_ready'.format(ti)) + not_ready.add(key) self.heartbeat() executor.heartbeat() + # If the set of tasks that aren't ready ever equals the set of + # tasks to run, then the backfill is deadlocked + if not_ready and not_ready == set(tasks_to_run): + deadlocked.update(tasks_to_run.values()) + tasks_to_run.clear() + # Reacting to events for key, state in list(executor.get_event_buffer().items()): dag_id, task_id, execution_date = key @@ -882,12 +890,12 @@ def _execute(self): # task reports skipped elif ti.state == State.SKIPPED: - wont_run.append(key) + skipped.add(key) self.logger.error("Skipping {} ".format(key)) # anything else is a failure else: - failed.append(key) + failed.add(key) self.logger.error("Task instance {} failed".format(key)) tasks_to_run.pop(key) @@ -899,18 +907,19 @@ def _execute(self): if ti.state == State.SUCCESS: self.logger.info( 'Task instance {} succeeded'.format(key)) - succeeded.append(key) + succeeded.add(key) tasks_to_run.pop(key) # task reports failure elif ti.state == State.FAILED: self.logger.error("Task instance {} failed".format(key)) - failed.append(key) + failed.add(key) tasks_to_run.pop(key) # this probably won't ever be triggered - elif key in not_ready_to_run: - continue + elif ti in not_ready: + self.logger.info( + "{} wasn't expected to run, but it did".format(ti)) # executor reports success but task does not - this is weird elif ti.state not in ( @@ -939,29 +948,51 @@ def _execute(self): ti.handle_failure(msg) tasks_to_run.pop(key) - msg = ( - "[backfill progress] " - "waiting: {0} | " - "succeeded: {1} | " - "kicked_off: {2} | " - "failed: {3} | " - "wont_run: {4} ").format( - len(tasks_to_run), - len(succeeded), - len(started), - len(failed), - len(wont_run)) + msg = ' | '.join([ + "[backfill progress]", + "waiting: {0}", + "succeeded: {1}", + "kicked_off: {2}", + "failed: {3}", + "skipped: {4}", + "deadlocked: {5}" + ]).format( + len(tasks_to_run), + len(succeeded), + len(started), + len(failed), + len(skipped), + len(deadlocked)) self.logger.info(msg) executor.end() session.close() + + err = '' if failed: - msg = ( - "------------------------------------------\n" - "Some tasks instances failed, " - "here's the list:\n{}".format(failed)) - raise AirflowException(msg) - self.logger.info("All done. Exiting.") + err += ( + "---------------------------------------------------\n" + "Some task instances failed:\n{}\n".format(failed)) + if deadlocked: + err += ( + '---------------------------------------------------\n' + 'BackfillJob is deadlocked.') + deadlocked_depends_on_past = any( + t.are_dependencies_met() != t.are_dependencies_met( + ignore_depends_on_past=True) + for t in deadlocked) + if deadlocked_depends_on_past: + err += ( + 'Some of the deadlocked tasks were unable to run because ' + 'of "depends_on_past" relationships. Try running the ' + 'backfill with the option ' + '"ignore_first_depends_on_past=True" or passing "-I" at ' + 'the command line.') + err += ' These tasks were unable to run:\n{}\n'.format(deadlocked) + if err: + raise AirflowException(err) + + self.logger.info("Backfill done. Exiting.") class LocalTaskJob(BaseJob): diff --git a/airflow/models.py b/airflow/models.py index 2cc51cc7b6705..6868d34d7bbe1 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -2398,7 +2398,8 @@ def get_active_runs(self): # AND there are unfinished tasks... any(ti.state in State.unfinished() for ti in task_instances) and # AND none of them have dependencies met... - all(not ti.are_dependencies_met() for ti in task_instances + all(not ti.are_dependencies_met(session=session) + for ti in task_instances if ti.state in State.unfinished())) for run in active_runs: diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 7824f15b3ffe6..169f5b6d5911f 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -69,8 +69,26 @@ def runnable(cls): cls.QUEUED ] + @classmethod + def finished(cls): + """ + A list of states indicating that a task started and completed a + run attempt. Note that the attempt could have resulted in failure or + have been interrupted; in any case, it is no longer running. + """ + return [ + cls.SUCCESS, + cls.SHUTDOWN, + cls.FAILED, + cls.SKIPPED, + ] + @classmethod def unfinished(cls): + """ + A list of states indicating that a task either has not completed + a run or has not even started. + """ return [ cls.NONE, cls.QUEUED, diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 051ff32ef0c33..1c81bf0218f94 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -94,3 +94,24 @@ def fail(): depends_on_past=True, dag=dag6,) dag6_task2.set_upstream(dag6_task1) + + +# DAG tests that a deadlocked subdag is properly caught +dag7 = DAG(dag_id='test_subdag_deadlock', default_args=default_args) +subdag7 = DAG(dag_id='test_subdag_deadlock.subdag', default_args=default_args) +subdag7_task1 = PythonOperator( + task_id='test_subdag_fail', + dag=subdag7, + python_callable=fail) +subdag7_task2 = DummyOperator( + task_id='test_subdag_dummy_1', + dag=subdag7,) +subdag7_task3 = DummyOperator( + task_id='test_subdag_dummy_2', + dag=subdag7) +dag7_subdag1 = SubDagOperator( + task_id='subdag', + dag=dag7, + subdag=subdag7) +subdag7_task1.set_downstream(subdag7_task2) +subdag7_task2.set_downstream(subdag7_task3) diff --git a/tests/jobs.py b/tests/jobs.py index 39343e40b5e6d..9f0070f6390d6 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -40,6 +40,9 @@ def setUp(self): self.dagbag = DagBag() def test_backfill_examples(self): + """ + Test backfilling example dags + """ dags = [ dag for dag in self.dagbag.dags.values() if dag.dag_id in ('example_bash_operator',)] @@ -56,12 +59,12 @@ def test_backfill_examples(self): def test_trap_executor_error(self): """ - Test for https://github.com/airbnb/airflow/pull/1220 + Test that errors setting up tasks (before tasks run) are caught - Test that errors setting up tasks (before tasks run) are properly - caught + Test for https://github.com/airbnb/airflow/pull/1220 """ dag = self.dagbag.get_dag('test_raise_executor_error') + dag.clear() job = BackfillJob( dag=dag, start_date=DEFAULT_DATE, @@ -75,9 +78,9 @@ def run_with_timeout(): def test_backfill_pooled_task(self): """ - Test for https://github.com/airbnb/airflow/pull/1225 - Test that queued tasks are executed by BackfillJob + + Test for https://github.com/airbnb/airflow/pull/1225 """ session = settings.Session() pool = Pool(pool='test_backfill_pooled_task_pool', slots=1) @@ -85,6 +88,7 @@ def test_backfill_pooled_task(self): session.commit() dag = self.dagbag.get_dag('test_backfill_pooled_task_dag') + dag.clear() job = BackfillJob( dag=dag, @@ -103,15 +107,18 @@ def test_backfill_pooled_task(self): self.assertEqual(ti.state, State.SUCCESS) def test_backfill_depends_on_past(self): + """ + Test that backfill resects ignore_depends_on_past + """ dag = self.dagbag.get_dag('test_depends_on_past') + dag.clear() run_date = DEFAULT_DATE + datetime.timedelta(days=5) - # import ipdb; ipdb.set_trace() - BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run() - # ti should not have run - ti = TI(dag.tasks[0], run_date) - ti.refresh_from_db() - self.assertIs(ti.state, None) + # backfill should deadlock + self.assertRaisesRegexp( + AirflowException, + 'BackfillJob is deadlocked', + BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run) BackfillJob( dag=dag, @@ -119,12 +126,15 @@ def test_backfill_depends_on_past(self): end_date=run_date, ignore_first_depends_on_past=True).run() - # ti should have run + # ti should have succeeded ti = TI(dag.tasks[0], run_date) ti.refresh_from_db() self.assertEquals(ti.state, State.SUCCESS) def test_cli_backfill_depends_on_past(self): + """ + Test that CLI respects -I argument + """ dag_id = 'test_dagrun_states_deadlock' run_date = DEFAULT_DATE + datetime.timedelta(days=1) args = [ @@ -135,8 +145,9 @@ def test_cli_backfill_depends_on_past(self): run_date.isoformat(), ] dag = self.dagbag.get_dag(dag_id) + dag.clear() - self.assertRaisesRegex( + self.assertRaisesRegexp( AirflowException, 'BackfillJob is deadlocked', cli.backfill, @@ -172,6 +183,7 @@ def evaluate_dagrun( scheduler = SchedulerJob() dag = self.dagbag.get_dag(dag_id) + dag.clear() dr = scheduler.schedule_dag(dag) if advance_execution_date: # run a second time to schedule a dagrun after the start_date @@ -208,8 +220,7 @@ def evaluate_dagrun( def test_dagrun_fail(self): """ - Test that a DagRun with one failed task and one incomplete root task - is marked a failure + DagRuns with one failed and one incomplete root task -> FAILED """ self.evaluate_dagrun( dag_id='test_dagrun_states_fail', @@ -219,8 +230,7 @@ def test_dagrun_fail(self): def test_dagrun_success(self): """ - Test that a DagRun with one failed task and one successful root task - is marked a success + DagRuns with one failed and one successful root task -> SUCCESS """ self.evaluate_dagrun( dag_id='test_dagrun_states_success', @@ -230,8 +240,7 @@ def test_dagrun_success(self): def test_dagrun_root_fail(self): """ - Test that a DagRun with one successful root task and one failed root - task is marked a failure + DagRuns with one successful and one failed root task -> FAILED """ self.evaluate_dagrun( dag_id='test_dagrun_states_root_fail', @@ -241,6 +250,8 @@ def test_dagrun_root_fail(self): def test_dagrun_deadlock(self): """ + Deadlocked DagRun is marked a failure + Test that a deadlocked dagrun is marked as a failure by having depends_on_past and an execution_date after the start_date """ @@ -253,6 +264,8 @@ def test_dagrun_deadlock(self): def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): """ + DagRun is marked a success if ignore_first_depends_on_past=True + Test that an otherwise-deadlocked dagrun is marked as a success if ignore_first_depends_on_past=True and the dagrun execution_date is after the start_date. diff --git a/tests/models.py b/tests/models.py index 8f04f4bc1ec5d..f786b328bb223 100644 --- a/tests/models.py +++ b/tests/models.py @@ -227,6 +227,7 @@ def run_with_error(ti): def test_depends_on_past(self): dagbag = models.DagBag() dag = dagbag.get_dag('test_depends_on_past') + dag.clear() task = dag.tasks[0] run_date = task.start_date + datetime.timedelta(days=5) ti = TI(task, run_date) diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index b3dd4f1641527..0006f6068da3e 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -82,6 +82,16 @@ def test_subdag_pools(self): session.delete(pool_10) session.commit() + def test_subdag_deadlock(self): + dagbag = DagBag() + dag = dagbag.get_dag('test_subdag_deadlock') + dag.clear() + subdag = dagbag.get_dag('test_subdag_deadlock.subdag') + subdag.clear() -if __name__ == "__main__": - unittest.main() + # first make sure subdag is deadlocked + self.assertRaisesRegexp(AirflowException, 'deadlocked', subdag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + # now make sure dag picks up the subdag error + subdag.clear() + self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) diff --git a/tests/utils.py b/tests/utils.py index 692e63b4e9c0e..89a39d8302336 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -27,6 +27,9 @@ class LogUtilsTest(unittest.TestCase): def test_gcs_url_parse(self): + """ + Test GCS url parsing + """ logging.info( 'About to create a GCSLog object without a connection. This will ' 'log an error but testing will proceed.') From b2844af020cb5a470bd83ead09ddb121923084ca Mon Sep 17 00:00:00 2001 From: jlowin Date: Mon, 4 Apr 2016 18:59:13 -0400 Subject: [PATCH 3/3] Fix infinite retries with pools, with test Addresses the issue raised in #1299 --- airflow/jobs.py | 4 ++-- tests/dags/test_issue_1225.py | 12 ++++++++++++ tests/jobs.py | 31 ++++++++++++++++++++++++++++++- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index 7e4867a770e25..c7cbcc7903897 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -571,8 +571,6 @@ def process_events(self, executor, dagbag): # collect queued tasks for prioritiztion if ti.state == State.QUEUED: self.queued_tis.add(ti) - elif ti in self.queued_tis: - self.queued_tis.remove(ti) else: # special instructions for failed executions could go here pass @@ -601,6 +599,8 @@ def prioritize_queued(self, session, executor, dagbag): else: d[ti.pool].append(ti) + self.queued_tis.clear() + dag_blacklist = set(dagbag.paused_dags()) for pool, tis in list(d.items()): if not pool: diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index 1c81bf0218f94..898cc04991eb9 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -115,3 +115,15 @@ def fail(): subdag=subdag7) subdag7_task1.set_downstream(subdag7_task2) subdag7_task2.set_downstream(subdag7_task3) + +# DAG tests that queued tasks are run +dag8 = DAG( + dag_id='test_scheduled_queued_tasks', + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + default_args=default_args) +dag8_task1 = PythonOperator( + python_callable=fail, + task_id='test_queued_task', + dag=dag8, + pool='test_queued_pool') diff --git a/tests/jobs.py b/tests/jobs.py index 9f0070f6390d6..ac892404c362e 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -76,7 +76,7 @@ def run_with_timeout(): job.run() self.assertRaises(AirflowException, run_with_timeout) - def test_backfill_pooled_task(self): + def test_backfill_pooled_tasks(self): """ Test that queued tasks are executed by BackfillJob @@ -262,6 +262,35 @@ def test_dagrun_deadlock(self): dagrun_state=State.FAILED, advance_execution_date=True) + def test_scheduler_pooled_tasks(self): + """ + Test that the scheduler handles queued tasks correctly + See issue #1299 + """ + session = settings.Session() + if not ( + session.query(Pool) + .filter(Pool.pool == 'test_queued_pool') + .first()): + pool = Pool(pool='test_queued_pool', slots=5) + session.merge(pool) + session.commit() + session.close() + + dag_id = 'test_scheduled_queued_tasks' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + + scheduler = SchedulerJob(dag_id, num_runs=10) + scheduler.run() + + task_1 = dag.tasks[0] + ti = TI(task_1, dag.start_date) + ti.refresh_from_db() + self.assertEqual(ti.state, State.FAILED) + + dag.clear() + def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): """ DagRun is marked a success if ignore_first_depends_on_past=True