Skip to content

Commit

Permalink
[AIRFLOW-989] Do not mark dag run successful if unfinished tasks
Browse files Browse the repository at this point in the history
Dag runs could be marked successful if all root
tasks were successful,
even if some tasks did not run yet, ie. in case of
clearing. Now
we consider unfinished_tasks, before marking
successful.

Closes apache#2154 from bolkedebruin/AIRFLOW-989
  • Loading branch information
bolkedebruin committed Mar 15, 2017
1 parent cadfae5 commit 3d6095f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
6 changes: 3 additions & 3 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4091,9 +4091,9 @@ def update_state(self, session=None):
logging.info('Marking run {} failed'.format(self))
self.state = State.FAILED

# if all roots succeeded, the run succeeded
elif all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
# if all roots succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
logging.info('Marking run {} successful'.format(self))
self.state = State.SUCCESS

Expand Down
51 changes: 51 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,57 @@ def test_dagrun_success_when_all_skipped(self):
updated_dag_state = dag_run.update_state()
self.assertEqual(State.SUCCESS, updated_dag_state)

def test_dagrun_success_conditions(self):
session = settings.Session()

dag = DAG(
'test_dagrun_success_conditions',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

# A -> B
# A -> C -> D
# ordered: B, D, C, A or D, B, C, A or D, C, B, A
with dag:
op1 = DummyOperator(task_id='A')
op2 = DummyOperator(task_id='B')
op3 = DummyOperator(task_id='C')
op4 = DummyOperator(task_id='D')
op1.set_upstream([op2, op3])
op3.set_upstream(op4)

dag.clear()

now = datetime.datetime.now()
dr = dag.create_dagrun(run_id='test_dagrun_success_conditions',
state=State.RUNNING,
execution_date=now,
start_date=now)

# op1 = root
ti_op1 = dr.get_task_instance(task_id=op1.task_id)
ti_op1.set_state(state=State.SUCCESS, session=session)

ti_op2 = dr.get_task_instance(task_id=op2.task_id)
ti_op3 = dr.get_task_instance(task_id=op3.task_id)
ti_op4 = dr.get_task_instance(task_id=op4.task_id)

# root is successful, but unfinished tasks
state = dr.update_state()
self.assertEqual(State.RUNNING, state)

# one has failed, but root is successful
ti_op2.set_state(state=State.FAILED, session=session)
ti_op3.set_state(state=State.SUCCESS, session=session)
ti_op4.set_state(state=State.SUCCESS, session=session)
state = dr.update_state()
self.assertEqual(State.SUCCESS, state)

# upstream dependency failed, root has not run
ti_op1.set_state(State.NONE, session)
state = dr.update_state()
self.assertEqual(State.FAILED, state)


class DagBagTest(unittest.TestCase):

Expand Down

0 comments on commit 3d6095f

Please sign in to comment.