Skip to content

Commit

Permalink
Fixing SKIPPED from propagating when it shouldn't
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Sep 22, 2015
1 parent dcad5c6 commit 73e9d2d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
7 changes: 7 additions & 0 deletions airflow/example_dags/example_branch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@
dag=dag)
branching.set_upstream(run_this_first)

join = DummyOperator(
task_id='join',
trigger_rule='one_success',
dag=dag
)

for option in options:
t = DummyOperator(task_id=option, dag=dag)
t.set_upstream(branching)
dummy_follow = DummyOperator(task_id='follow_' + option, dag=dag)
t.set_downstream(dummy_follow)
dummy_follow.set_downstream(join)
4 changes: 2 additions & 2 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,12 +759,12 @@ def are_dependencies_met(
)
successes, skipped, failed, upstream_failed, done = qry.first()
if flag_upstream_failed:
if skipped:
if skipped >= len(task._upstream_list):
self.state = State.SKIPPED
self.start_date = datetime.now()
self.end_date = datetime.now()
session.merge(self)
elif successes < done >= len(task._upstream_list):
elif failed + upstream_failed >= len(task._upstream_list):
self.state = State.UPSTREAM_FAILED
self.start_date = datetime.now()
self.end_date = datetime.now()
Expand Down

0 comments on commit 73e9d2d

Please sign in to comment.