From 15ff540ecd5e60e7ce080177ea3ea227582a4672 Mon Sep 17 00:00:00 2001 From: Dan Davydov Date: Mon, 12 Dec 2016 12:10:15 -0800 Subject: [PATCH] [AIRFLOW-678] Prevent scheduler from double triggering TIs At the moment there is no lock/synchronization around the loop where the scheduler puts tasks in the SCHEDULED state. This means that if somehow the task starts running or gets SCHEDULED somewhere else somehow (e.g. manually running a task via the webserver) the task can have it's state changed from RUNNING/QUEUED to SCHEDULED which can cause a single task instance to be run twice at the same time. Testing Done: - Tested this branch on the Airbnb Airflow staging cluster - Airbnb has been running very similar logic in our production for many months (not 1-1 since we are still running off of the last release branch) - In the future we ideally need an integration test to catch double triggers but this is not trivial to do properly Closes #1924 from aoen/ddavydov/fix_scheduler_race_condition --- airflow/jobs.py | 24 ++++++++++++++++++++---- airflow/ti_deps/dep_context.py | 12 ++++-------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index 22cdeb04d6a6f..229424e352a53 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -45,7 +45,7 @@ from airflow.exceptions import AirflowException from airflow.models import DagRun from airflow.settings import Stats -from airflow.ti_deps.dep_context import RUN_DEPS, DepContext +from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS from airflow.utils.state import State from airflow.utils.db import provide_session, pessimistic_connection_handling from airflow.utils.dag_processing import (AbstractDagFileProcessor, @@ -1532,9 +1532,25 @@ def process_file(self, file_path, pickle_dags=False, session=None): dag = dagbag.dags[ti_key[0]] task = dag.get_task(ti_key[1]) ti = models.TaskInstance(task, ti_key[2]) - # Task starts out in the scheduled state. All tasks in the - # scheduled state will be sent to the executor - ti.state = State.SCHEDULED + + ti.refresh_from_db(session=session, lock_for_update=True) + # We can defer checking the task dependency checks to the worker themselves + # since they can be expensive to run in the scheduler. + dep_context = DepContext(deps=QUEUE_DEPS, ignore_task_deps=True) + + # Only schedule tasks that have their dependencies met, e.g. to avoid + # a task that recently got it's state changed to RUNNING from somewhere + # other than the scheduler from getting it's state overwritten. + # TODO(aoen): It's not great that we have to check all the task instance + # dependencies twice; once to get the task scheduled, and again to actually + # run the task. We should try to come up with a way to only check them once. + if ti.are_dependencies_met( + dep_context=dep_context, + session=session, + verbose=True): + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED # Also save this task instance to the DB. self.logger.info("Creating / updating {} in ORM".format(ti)) diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py index 73ae924a9979f..583099dfccf14 100644 --- a/airflow/ti_deps/dep_context.py +++ b/airflow/ti_deps/dep_context.py @@ -81,17 +81,13 @@ def __init__( State.UP_FOR_RETRY, } -# The minimum execution context for task instances to be executed. -MIN_EXEC_DEPS = { +# Context to get the dependencies that need to be met in order for a task instance to +# be backfilled. +QUEUE_DEPS = { NotRunningDep(), NotSkippedDep(), RunnableExecDateDep(), -} - -# Context to get the dependencies that need to be met in order for a task instance to -# be backfilled. -QUEUE_DEPS = MIN_EXEC_DEPS | { - ValidStateDep(QUEUEABLE_STATES) + ValidStateDep(QUEUEABLE_STATES), } # Dependencies that need to be met for a given task instance to be able to get run by an