Skip to content

Commit

Permalink
Don't update backfill run from the scheduler (apache#26342)
Browse files Browse the repository at this point in the history
Don't update backfill run from the scheduler

When updating the state of paused dags with 'running' dagruns in the scheduler, we should not
update the state of backfill run.

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
ephraimbuddy and uranusjr authored Sep 19, 2022
1 parent cc4902d commit b9c4e98
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
9 changes: 7 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,8 @@ def _execute(self) -> None:
self.log.exception("Exception when executing DagFileProcessorAgent.end")
self.log.info("Exited execute loop")

def _update_dag_run_state_for_paused_dags(self) -> None:
@provide_session
def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) -> None:
try:
paused_dag_ids = DagModel.get_all_paused_dag_ids()
for dag_id in paused_dag_ids:
Expand All @@ -784,7 +785,11 @@ def _update_dag_run_state_for_paused_dags(self) -> None:
dag = SerializedDagModel.get_dag(dag_id)
if dag is None:
continue
dag_runs = DagRun.find(dag_id=dag_id, state=DagRunState.RUNNING)
dag_runs = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.state == DagRunState.RUNNING,
DagRun.run_type != DagRunType.BACKFILL_JOB,
)
for dag_run in dag_runs:
dag_run.dag = dag
_, callback_to_run = dag_run.update_state(execute_callbacks=False)
Expand Down
43 changes: 43 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4439,6 +4439,49 @@ def test_catchup_works_correctly(self, dag_maker):
.scalar()
) > (timezone.utcnow() - timedelta(days=2))

def test_update_dagrun_state_for_paused_dag_not_for_backfill(self, dag_maker, session):
"""Test that the _update_dagrun_state_for_paused_dag does not affect backfilled dagruns"""

with dag_maker('testdag') as dag:
EmptyOperator(task_id='task1')

# Backfill run
backfill_run = dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB)
ti = backfill_run.get_task_instances()[0]
ti.set_state(TaskInstanceState.SUCCESS)
dm = DagModel.get_dagmodel(dag.dag_id)
dm.is_paused = True
session.merge(dm)
session.merge(ti)
session.flush()

# scheduled run
scheduled_run = dag_maker.create_dagrun(
execution_date=datetime.datetime(2022, 1, 1), run_type=DagRunType.SCHEDULED
)
ti = scheduled_run.get_task_instances()[0]
ti.set_state(TaskInstanceState.SUCCESS)
dm = DagModel.get_dagmodel(dag.dag_id)
dm.is_paused = True
session.merge(dm)
session.merge(ti)
session.flush()

assert dag.dag_id in DagModel.get_all_paused_dag_ids()
assert backfill_run.state == State.RUNNING
assert scheduled_run.state == State.RUNNING

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor()
self.scheduler_job._update_dag_run_state_for_paused_dags()
session.flush()

(backfill_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.BACKFILL_JOB, session=session)
assert backfill_run.state == State.RUNNING

(scheduled_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.SCHEDULED, session=session)
assert scheduled_run.state == State.SUCCESS


@pytest.mark.need_serialized_dag
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
Expand Down

0 comments on commit b9c4e98

Please sign in to comment.