Skip to content

Commit

Permalink
[AIRFLOW-678] Prevent scheduler from double triggering TIs
Browse files Browse the repository at this point in the history
At the moment there is no lock/synchronization
around the loop where the scheduler puts tasks in
the SCHEDULED state. This means that if somehow
the task starts running or gets SCHEDULED
somewhere else somehow (e.g. manually running a
task via the webserver) the task can have it's
state changed from RUNNING/QUEUED to SCHEDULED
which can cause a single task instance to be run
twice at the same time.

Testing Done:
- Tested this branch on the Airbnb Airflow staging
cluster
- Airbnb has been running very similar logic in
our production for many months (not 1-1 since we
are still running off of the last release branch)
- In the future we ideally need an integration
test to catch double triggers but this is not
trivial to do properly

Closes apache#1924 from
aoen/ddavydov/fix_scheduler_race_condition
  • Loading branch information
aoen committed Dec 12, 2016
1 parent 2306892 commit 15ff540
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
24 changes: 20 additions & 4 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from airflow.exceptions import AirflowException
from airflow.models import DagRun
from airflow.settings import Stats
from airflow.ti_deps.dep_context import RUN_DEPS, DepContext
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
from airflow.utils.state import State
from airflow.utils.db import provide_session, pessimistic_connection_handling
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
Expand Down Expand Up @@ -1532,9 +1532,25 @@ def process_file(self, file_path, pickle_dags=False, session=None):
dag = dagbag.dags[ti_key[0]]
task = dag.get_task(ti_key[1])
ti = models.TaskInstance(task, ti_key[2])
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED

ti.refresh_from_db(session=session, lock_for_update=True)
# We can defer checking the task dependency checks to the worker themselves
# since they can be expensive to run in the scheduler.
dep_context = DepContext(deps=QUEUE_DEPS, ignore_task_deps=True)

# Only schedule tasks that have their dependencies met, e.g. to avoid
# a task that recently got it's state changed to RUNNING from somewhere
# other than the scheduler from getting it's state overwritten.
# TODO(aoen): It's not great that we have to check all the task instance
# dependencies twice; once to get the task scheduled, and again to actually
# run the task. We should try to come up with a way to only check them once.
if ti.are_dependencies_met(
dep_context=dep_context,
session=session,
verbose=True):
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED

# Also save this task instance to the DB.
self.logger.info("Creating / updating {} in ORM".format(ti))
Expand Down
12 changes: 4 additions & 8 deletions airflow/ti_deps/dep_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,13 @@ def __init__(
State.UP_FOR_RETRY,
}

# The minimum execution context for task instances to be executed.
MIN_EXEC_DEPS = {
# Context to get the dependencies that need to be met in order for a task instance to
# be backfilled.
QUEUE_DEPS = {
NotRunningDep(),
NotSkippedDep(),
RunnableExecDateDep(),
}

# Context to get the dependencies that need to be met in order for a task instance to
# be backfilled.
QUEUE_DEPS = MIN_EXEC_DEPS | {
ValidStateDep(QUEUEABLE_STATES)
ValidStateDep(QUEUEABLE_STATES),
}

# Dependencies that need to be met for a given task instance to be able to get run by an
Expand Down

0 comments on commit 15ff540

Please sign in to comment.