From 8aac56656d29009dbca24a5948c2a2097043f4f3 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 16 Dec 2022 00:43:52 +0800 Subject: [PATCH] Correctly select a mapped task's "previous" task (#28379) --- airflow/ti_deps/deps/prev_dagrun_dep.py | 2 +- tests/models/test_dagrun.py | 35 +++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py index 50be22971d507..0e25fe867d93a 100644 --- a/airflow/ti_deps/deps/prev_dagrun_dep.py +++ b/airflow/ti_deps/deps/prev_dagrun_dep.py @@ -70,7 +70,7 @@ def _get_dep_statuses(self, ti: TI, session, dep_context): yield self._passing_status(reason="This task instance was the first task instance for its task.") return - previous_ti = last_dagrun.get_task_instance(ti.task_id, session=session) + previous_ti = last_dagrun.get_task_instance(ti.task_id, map_index=ti.map_index, session=session) if not previous_ti: if ti.task.ignore_first_depends_on_past: has_historical_ti = ( diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index c08b548870add..3d368ae962a64 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -2187,3 +2187,38 @@ def say_bye(): assert tis["add_one__1"] == TaskInstanceState.SKIPPED assert tis["add_one__2"] == TaskInstanceState.SKIPPED assert dr.state == State.SUCCESS + + +def test_mapped_task_depends_on_past(dag_maker, session): + with dag_maker(session=session): + + @task(depends_on_past=True) + def print_value(value): + print(value) + + print_value.expand_kwargs([{"value": i} for i in range(2)]) + + dr1: DagRun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + dr2: DagRun = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED) + + # print_value in dr2 is not ready yet since the task depends on past. + decision = dr2.task_instance_scheduling_decisions(session=session) + assert len(decision.schedulable_tis) == 0 + + # Run print_value in dr1. + decision = dr1.task_instance_scheduling_decisions(session=session) + assert len(decision.schedulable_tis) == 2 + for ti in decision.schedulable_tis: + ti.run(session=session) + + # Now print_value in dr2 can run + decision = dr2.task_instance_scheduling_decisions(session=session) + assert len(decision.schedulable_tis) == 2 + for ti in decision.schedulable_tis: + ti.run(session=session) + + # Both runs are finished now. + decision = dr1.task_instance_scheduling_decisions(session=session) + assert len(decision.unfinished_tis) == 0 + decision = dr2.task_instance_scheduling_decisions(session=session) + assert len(decision.unfinished_tis) == 0