Skip to content

Commit

Permalink
Retry on Airflow Schedule DAG Run DB Deadlock (apache#26347)
Browse files Browse the repository at this point in the history

Co-authored-by: Anthony Panat <[email protected]>
Co-authored-by: Anthony Panat <[email protected]>
  • Loading branch information
3 people authored Oct 2, 2022
1 parent f977804 commit 0da4993
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,12 +943,7 @@ def _do_scheduling(self, session: Session) -> int:
# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun

callback_tuples = []
for dag_run in dag_runs:
callback_to_run = self._schedule_dag_run(dag_run, session)
callback_tuples.append((dag_run, callback_to_run))

guard.commit()
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)

# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
Expand Down Expand Up @@ -1232,6 +1227,18 @@ def _update_state(dag: DAG, dag_run: DagRun):
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag, dag_run)

@retry_db_transaction
def _schedule_all_dag_runs(self, guard, dag_runs, session):
"""Makes scheduling decisions for all `dag_runs`"""
callback_tuples = []
for dag_run in dag_runs:
callback_to_run = self._schedule_dag_run(dag_run, session)
callback_tuples.append((dag_run, callback_to_run))

guard.commit()

return callback_tuples

def _schedule_dag_run(
self,
dag_run: DagRun,
Expand Down

0 comments on commit 0da4993

Please sign in to comment.