Skip to content

Commit

Permalink
Correctly select a mapped task's "previous" task (apache#28379)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored Dec 15, 2022
1 parent a628408 commit 8aac566
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/ti_deps/deps/prev_dagrun_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _get_dep_statuses(self, ti: TI, session, dep_context):
yield self._passing_status(reason="This task instance was the first task instance for its task.")
return

previous_ti = last_dagrun.get_task_instance(ti.task_id, session=session)
previous_ti = last_dagrun.get_task_instance(ti.task_id, map_index=ti.map_index, session=session)
if not previous_ti:
if ti.task.ignore_first_depends_on_past:
has_historical_ti = (
Expand Down
35 changes: 35 additions & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -2187,3 +2187,38 @@ def say_bye():
assert tis["add_one__1"] == TaskInstanceState.SKIPPED
assert tis["add_one__2"] == TaskInstanceState.SKIPPED
assert dr.state == State.SUCCESS


def test_mapped_task_depends_on_past(dag_maker, session):
with dag_maker(session=session):

@task(depends_on_past=True)
def print_value(value):
print(value)

print_value.expand_kwargs([{"value": i} for i in range(2)])

dr1: DagRun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2: DagRun = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)

# print_value in dr2 is not ready yet since the task depends on past.
decision = dr2.task_instance_scheduling_decisions(session=session)
assert len(decision.schedulable_tis) == 0

# Run print_value in dr1.
decision = dr1.task_instance_scheduling_decisions(session=session)
assert len(decision.schedulable_tis) == 2
for ti in decision.schedulable_tis:
ti.run(session=session)

# Now print_value in dr2 can run
decision = dr2.task_instance_scheduling_decisions(session=session)
assert len(decision.schedulable_tis) == 2
for ti in decision.schedulable_tis:
ti.run(session=session)

# Both runs are finished now.
decision = dr1.task_instance_scheduling_decisions(session=session)
assert len(decision.unfinished_tis) == 0
decision = dr2.task_instance_scheduling_decisions(session=session)
assert len(decision.unfinished_tis) == 0

0 comments on commit 8aac566

Please sign in to comment.