Skip to content

Commit

Permalink
[AIRFLOW-149] Task Dependency Engine + Why Isn't My Task Running View
Browse files Browse the repository at this point in the history
Here is the original PR with Max's LGTM:
aoen#1
Since then I have made some fixes but this PR is essentially the same.
It could definitely use more eyes as there are likely still issues.

**Goals**
- Simplify, consolidate, and make consistent the logic of whether or not
  a task should be run
- Provide a view/better logging that gives insight into why a task
  instance is not currently running (no more viewing the scheduler logs
  to find out why a task instance isn't running for the majority of
  cases):
![image](https://cloud.githubusercontent.com/assets/1592778/17637621/aa669f5e-6099-11e6-81c2-d988d2073aac.png)

**Notable Functional Changes**
- Webserver view + task_failing_deps CLI command to explain why a given
  task instance isn't being run by the scheduler
- Running a backfill in the command line and running a task in the UI
  will now display detailed error messages based on which dependencies
  were not met for a task instead of appearing to succeed but actually
  failing silently
- Maximum task concurrency and pools are not respected by backfills
- Backfill now has the equivalent of the old force flag to run even for
  successful tasks
  This will break one use case:
  Using pools to restrict some resource on airflow executors themselves
  (rather than an external resource like a DB), e.g. some task uses 60%
  of cpu on a worker so we restrict that task's pool size to 1 to
  prevent two of the tasks from running on the same host. When
  backfilling a task of this type, now the backfill will wait on the
  pool to have slots open up before running the task even though we
  don't need to do this if backfilling on a different host outside of
  the pool. I think breaking this use case is OK since the use case is a
  hack due to not having a proper resource isolation solution (e.g.
  mesos should be used in this case instead).
- To make things less confusing for users, there is now a "ignore all
  dependencies" option for running tasks, "ignore dependencies" has been
  renamed to "ignore task dependencies", and "force" has been renamed to
  "ignore task instance state". The new "Ignore all dependencies" flag
  will ignore the following:
  - task instance's pool being full
  - execution date for a task instance being in the future
  - a task instance being in the retry waiting period
  - the task instance's task ending prior to the task instance's
    execution date
  - task instance is already queued
  - task instance has already completed
  - task instance is in the shutdown state
  - WILL NOT IGNORE task instance is already running
- SLA miss emails will now include all tasks that did not finish for a
  particular DAG run, even if
  the tasks didn't run because depends_on_past was not met for a task
- Tasks with pools won't get queued automatically the first time they
  reach a worker; if they are ready to run they will be run immediately
- Running a task via the UI or via the command line (backfill/run
  commands) will now log why a task could not get run if one if it's
  dependencies isn't met. For tasks kicked off via the web UI this
  means that tasks don't silently fail to get queued despite a
  successful message in the UI.
- Queuing a task into a pool that doesn't exist will now get stopped in
  the scheduler instead of a worker

**Follow Up Items**
- Update the docs to reference the new explainer views/CLI command

Closes apache#1729 from aoen/ddavydov/blockedTIExplainerRebasedMaster
  • Loading branch information
aoen committed Aug 26, 2016
1 parent 16fac98 commit f360414
Show file tree
Hide file tree
Showing 56 changed files with 2,440 additions and 570 deletions.
78 changes: 67 additions & 11 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from airflow.models import (DagModel, DagBag, TaskInstance,
DagPickle, DagRun, Variable, DagStat,
Pool)
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.state import State
Expand Down Expand Up @@ -154,8 +155,8 @@ def backfill(args, dag=None):
local=args.local,
donot_pickle=(args.donot_pickle or
conf.getboolean('core', 'donot_pickle')),
ignore_dependencies=args.ignore_dependencies,
ignore_first_depends_on_past=args.ignore_first_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
pool=args.pool)


Expand Down Expand Up @@ -356,18 +357,20 @@ def run(args, dag=None):
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
force=args.force,
pickle_id=args.pickle,
ignore_dependencies=args.ignore_dependencies,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=args.ignore_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool)
run_job.run()
elif args.raw:
ti.run(
mark_success=args.mark_success,
force=args.force,
ignore_dependencies=args.ignore_dependencies,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=args.ignore_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
job_id=args.job_id,
pool=args.pool,
)
Expand Down Expand Up @@ -396,9 +399,10 @@ def run(args, dag=None):
ti,
mark_success=args.mark_success,
pickle_id=pickle_id,
ignore_dependencies=args.ignore_dependencies,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=args.ignore_depends_on_past,
force=args.force,
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool)
executor.heartbeat()
executor.end()
Expand Down Expand Up @@ -444,6 +448,31 @@ def run(args, dag=None):
'Unsupported remote log location: {}'.format(remote_base))


def task_failed_deps(args):
"""
Returns the unmet dependencies for a task instance from the perspective of the
scheduler (i.e. why a task instance doesn't get scheduled and then queued by the
scheduler, and then run by an executor).
>>> airflow task_failed_deps tutorial sleep 2015-01-01
Task instance dependencies not met:
Dagrun Running: Task instance's dagrun did not exist: Unknown reason
Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es).
"""
dag = get_dag(args)
task = dag.get_task(task_id=args.task_id)
ti = TaskInstance(task, args.execution_date)

dep_context = DepContext(deps=SCHEDULER_DEPS)
failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
if failed_deps:
print("Task instance dependencies not met:")
for dep in failed_deps:
print("{}: {}".format(dep.dep_name, dep.reason))
else:
print("Task instance dependencies are all met.")


def task_state(args):
"""
Returns the state of a TaskInstance at the command line.
Expand Down Expand Up @@ -505,7 +534,7 @@ def test(args, dag=None):
if args.dry_run:
ti.dry_run()
else:
ti.run(force=True, ignore_dependencies=True, test_mode=True)
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)


def render(args):
Expand Down Expand Up @@ -1063,13 +1092,32 @@ class CLIFactory(object):
("-kt", "--keytab"), "keytab",
nargs='?', default=conf.get('kerberos', 'keytab')),
# run
# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
# all dependencies (not just past success), e.g. the ignore_depends_on_past
# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
# the "ignore_all_dependencies" command should be called the"force" command
# instead.
'force': Arg(
("-f", "--force"),
"Force a run regardless of previous success", "store_true"),
"Ignore previous task instance state, rerun regardless if task already "
"succeeded/failed",
"store_true"),
'raw': Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true"),
'ignore_all_dependencies': Arg(
("-A", "--ignore_all_dependencies"),
"Ignores all non-critical dependencies, including ignore_ti_state and "
"ignore_task_deps"
"store_true"),
# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
# vague (e.g. a task being in the appropriate state to be run is also a dependency
# but is not ignored by this flag), the name 'ignore_task_dependencies' is
# slightly better (as it ignores all dependencies that are specific to the task),
# so deprecate the old command name and use this instead.
'ignore_dependencies': Arg(
("-i", "--ignore_dependencies"),
"Ignore upstream and depends_on_past dependencies", "store_true"),
"Ignore task-specific dependencies, e.g. upstream, depends_on_past, and "
"retry delay dependencies",
"store_true"),
'ignore_depends_on_past': Arg(
("-I", "--ignore_depends_on_past"),
"Ignore depends_on_past dependencies (but respect "
Expand Down Expand Up @@ -1229,7 +1277,7 @@ class CLIFactory(object):
'args': (
'dag_id', 'task_id', 'execution_date', 'subdir',
'mark_success', 'force', 'pool',
'local', 'raw', 'ignore_dependencies',
'local', 'raw', 'ignore_all_dependencies', 'ignore_dependencies',
'ignore_depends_on_past', 'ship_dag', 'pickle', 'job_id'),
}, {
'func': initdb,
Expand All @@ -1243,6 +1291,14 @@ class CLIFactory(object):
'func': dag_state,
'help': "Get the status of a dag run",
'args': ('dag_id', 'execution_date', 'subdir'),
}, {
'func': task_failed_deps,
'help': (
"Returns the unmet dependencies for a task instance from the perspective "
"of the scheduler. In other words, why a task instance doesn't get "
"scheduled and then queued by the scheduler, and then run by an "
"executor)."),
'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
}, {
'func': task_state,
'help': "Get the status of a task instance",
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Details page.
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""

Expand Down
10 changes: 6 additions & 4 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,19 @@ def queue_task_instance(
task_instance,
mark_success=False,
pickle_id=None,
force=False,
ignore_dependencies=False,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=None):
pool = pool or task_instance.pool
command = task_instance.command(
local=True,
mark_success=mark_success,
force=force,
ignore_dependencies=ignore_dependencies,
ignore_all_deps=ignore_all_deps,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state,
pool=pool,
pickle_id=pickle_id)
self.queue_command(
Expand Down
64 changes: 41 additions & 23 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from airflow.exceptions import AirflowException
from airflow.models import DagRun
from airflow.settings import Stats
from airflow.ti_deps.dep_context import RUN_DEPS, DepContext
from airflow.utils.state import State
from airflow.utils.db import provide_session, pessimistic_connection_handling
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
Expand Down Expand Up @@ -592,8 +593,6 @@ def manage_slas(self, dag, session=None):
session.delete(ti)
session.commit()

blocking_tis = ([ti for ti in blocking_tis
if ti.are_dependencies_met(session=session)])
task_list = "\n".join([
sla.task_id + ' on ' + sla.execution_date.isoformat()
for sla in slas])
Expand Down Expand Up @@ -781,15 +780,14 @@ def _process_task_instances(self, dag, queue):
active DAG runs and adding task instances that should run to the
queue.
"""
DagModel = models.DagModel
session = settings.Session()

# update the state of the previously active dag runs
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
active_dag_runs = []
for run in dag_runs:
self.logger.info("Examining DAG run {}".format(run))
# do not consider runs that are executed in the future
# don't consider runs that are executed in the future
if run.execution_date > datetime.now():
self.logging.error("Execution date is in future: {}"
.format(run.execution_date))
Expand Down Expand Up @@ -827,7 +825,9 @@ def _process_task_instances(self, dag, queue):
if task.adhoc:
continue

if ti.is_runnable(flag_upstream_failed=True):
if ti.are_dependencies_met(
dep_context=DepContext(flag_upstream_failed=True),
session=session):
self.logger.debug('Queuing task: {}'.format(ti))
queue.append(ti.key)

Expand Down Expand Up @@ -1015,9 +1015,10 @@ def _execute_task_instances(self,
task_instance.execution_date,
local=True,
mark_success=False,
force=False,
ignore_dependencies=False,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=task_instance.pool,
file_path=simple_dag_bag.get_dag(task_instance.dag_id).full_filepath,
pickle_id=simple_dag_bag.get_dag(task_instance.dag_id).pickle_id)
Expand Down Expand Up @@ -1564,11 +1565,14 @@ class BackfillJob(BaseJob):

def __init__(
self,
dag, start_date=None, end_date=None, mark_success=False,
dag,
start_date=None,
end_date=None,
mark_success=False,
include_adhoc=False,
donot_pickle=False,
ignore_dependencies=False,
ignore_first_depends_on_past=False,
ignore_task_deps=False,
pool=None,
*args, **kwargs):
self.dag = dag
Expand All @@ -1578,8 +1582,8 @@ def __init__(
self.mark_success = mark_success
self.include_adhoc = include_adhoc
self.donot_pickle = donot_pickle
self.ignore_dependencies = ignore_dependencies
self.ignore_first_depends_on_past = ignore_first_depends_on_past
self.ignore_task_deps = ignore_task_deps
self.pool = pool
super(BackfillJob, self).__init__(*args, **kwargs)

Expand Down Expand Up @@ -1705,11 +1709,16 @@ def _execute(self):
session.commit()
continue

backfill_context = DepContext(
deps=RUN_DEPS,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=self.ignore_task_deps,
flag_upstream_failed=True)
# Is the task runnable? -- then run it
if ti.is_queueable(
include_queued=True,
ignore_depends_on_past=ignore_depends_on_past,
flag_upstream_failed=True):
if ti.are_dependencies_met(
dep_context=backfill_context,
session=session,
verbose=True):
self.logger.debug('Sending {} to executor'.format(ti))
if ti.state == State.NONE:
ti.state = State.SCHEDULED
Expand All @@ -1719,7 +1728,7 @@ def _execute(self):
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_dependencies=self.ignore_dependencies,
ignore_task_deps=self.ignore_task_deps,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool)
started.add(key)
Expand Down Expand Up @@ -1864,8 +1873,14 @@ def _execute(self):
'---------------------------------------------------\n'
'BackfillJob is deadlocked.')
deadlocked_depends_on_past = any(
t.are_dependencies_met() != t.are_dependencies_met(
ignore_depends_on_past=True)
t.are_dependencies_met(
dep_context=DepContext(ignore_depends_on_past=False),
session=session,
verbose=True) !=
t.are_dependencies_met(
dep_context=DepContext(ignore_depends_on_past=True),
session=session,
verbose=True)
for t in deadlocked)
if deadlocked_depends_on_past:
err += (
Expand All @@ -1890,17 +1905,19 @@ class LocalTaskJob(BaseJob):
def __init__(
self,
task_instance,
ignore_dependencies=False,
ignore_all_deps=False,
ignore_depends_on_past=False,
force=False,
ignore_task_deps=False,
ignore_ti_state=False,
mark_success=False,
pickle_id=None,
pool=None,
*args, **kwargs):
self.task_instance = task_instance
self.ignore_dependencies = ignore_dependencies
self.ignore_all_deps = ignore_all_deps
self.ignore_depends_on_past = ignore_depends_on_past
self.force = force
self.ignore_task_deps = ignore_task_deps
self.ignore_ti_state = ignore_ti_state
self.pool = pool
self.pickle_id = pickle_id
self.mark_success = mark_success
Expand All @@ -1918,9 +1935,10 @@ def __init__(
def _execute(self):
command = self.task_instance.command(
raw=True,
ignore_dependencies=self.ignore_dependencies,
ignore_all_deps=self.ignore_all_deps,
ignore_depends_on_past=self.ignore_depends_on_past,
force=self.force,
ignore_task_deps=self.ignore_task_deps,
ignore_ti_state=self.ignore_ti_state,
pickle_id=self.pickle_id,
mark_success=self.mark_success,
job_id=self.id,
Expand Down
Loading

0 comments on commit f360414

Please sign in to comment.