Skip to content

Commit

Permalink
Merge pull request apache#1290 from jlowin/subdag-backfill-status
Browse files Browse the repository at this point in the history
Make sure backfill deadlocks raise errors
  • Loading branch information
bolkedebruin committed Apr 5, 2016
2 parents 58abca2 + b2844af commit fd9388c
Show file tree
Hide file tree
Showing 14 changed files with 315 additions and 192 deletions.
3 changes: 2 additions & 1 deletion airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def process_subdir(subdir):
def get_dag(args):
dagbag = DagBag(process_subdir(args.subdir))
if args.dag_id not in dagbag.dags:
raise AirflowException('dag_id could not be found')
raise AirflowException(
'dag_id could not be found: {}'.format(args.dag_id))
return dagbag.dags[args.dag_id]


Expand Down
13 changes: 12 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def run_command(command):
TEST_CONFIG = """\
[core]
airflow_home = {AIRFLOW_HOME}
dags_folder = {AIRFLOW_HOME}/dags
dags_folder = {TEST_DAGS_FOLDER}
base_log_folder = {AIRFLOW_HOME}/logs
executor = SequentialExecutor
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
Expand Down Expand Up @@ -583,6 +583,17 @@ def mkdir_p(path):
else:
AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG'])

# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'dags')
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')


def parameterized_config(template):
"""
Expand Down
167 changes: 99 additions & 68 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def process_dag(self, dag, executor):
skip_tis = {(ti[0], ti[1]) for ti in qry.all()}

descartes = [obj for obj in product(dag.tasks, active_runs)]
could_not_run = set()
self.logger.info('Checking dependencies on {} tasks instances, minus {} '
'skippable ones'.format(len(descartes), len(skip_tis)))
for task, dttm in descartes:
Expand All @@ -513,6 +514,23 @@ def process_dag(self, dag, executor):
elif ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Firing task: {}'.format(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
else:
could_not_run.add(ti)

# this type of deadlock happens when dagruns can't even start and so
# the TI's haven't been persisted to the database.
if len(could_not_run) == len(descartes):
self.logger.error(
'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id))
(session
.query(models.DagRun)
.filter(
models.DagRun.dag_id == dag.dag_id,
models.DagRun.state == State.RUNNING,
models.DagRun.execution_date.in_(active_runs))
.update(
{models.DagRun.state: State.FAILED},
synchronize_session='fetch'))

# Releasing the lock
self.logger.debug("Unlocking DAG (scheduler_lock)")
Expand Down Expand Up @@ -553,8 +571,6 @@ def process_events(self, executor, dagbag):
# collect queued tasks for prioritiztion
if ti.state == State.QUEUED:
self.queued_tis.add(ti)
elif ti in self.queued_tis:
self.queued_tis.remove(ti)
else:
# special instructions for failed executions could go here
pass
Expand Down Expand Up @@ -583,6 +599,8 @@ def prioritize_queued(self, session, executor, dagbag):
else:
d[ti.pool].append(ti)

self.queued_tis.clear()

dag_blacklist = set(dagbag.paused_dags())
for pool, tis in list(d.items()):
if not pool:
Expand Down Expand Up @@ -781,11 +799,12 @@ def _execute(self):

# Build a list of all instances to run
tasks_to_run = {}
failed = []
succeeded = []
started = []
wont_run = []
not_ready_to_run = set()
failed = set()
succeeded = set()
started = set()
skipped = set()
not_ready = set()
deadlocked = set()

for task in self.dag.tasks:
if (not self.include_adhoc) and task.adhoc:
Expand All @@ -800,67 +819,56 @@ def _execute(self):
session.commit()

# Triggering what is ready to get triggered
deadlocked = False
while tasks_to_run and not deadlocked:

not_ready.clear()
for key, ti in list(tasks_to_run.items()):

ti.refresh_from_db()
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))

# Did the task finish without failing? -- then we're done
if (
ti.state in (State.SUCCESS, State.SKIPPED) and
key in tasks_to_run):
succeeded.append(key)
tasks_to_run.pop(key)
# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if key not in started:
if ti.state == State.SUCCESS:
succeeded.add(key)
tasks_to_run.pop(key)
continue
elif ti.state == State.SKIPPED:
skipped.add(key)
tasks_to_run.pop(key)
continue

# Is the task runnable? -- the run it
elif ti.is_queueable(
# Is the task runnable? -- then run it
if ti.is_queueable(
include_queued=True,
ignore_depends_on_past=ignore_depends_on_past,
flag_upstream_failed=True):
self.logger.debug('Sending {} to executor'.format(ti))
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
pickle_id=pickle_id,
ignore_dependencies=self.ignore_dependencies,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool)
ti.state = State.RUNNING
if key not in started:
started.append(key)
if ti in not_ready_to_run:
not_ready_to_run.remove(ti)

# Mark the task as not ready to run. If the set of tasks
# that aren't ready ever equals the set of tasks to run,
# then the backfill is deadlocked
started.add(key)

# Mark the task as not ready to run
elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
not_ready_to_run.add(ti)
if not_ready_to_run == set(tasks_to_run.values()):
msg = 'BackfillJob is deadlocked: no tasks can be run.'
if any(
t.are_dependencies_met() !=
t.are_dependencies_met(
ignore_depends_on_past=True)
for t in tasks_to_run.values()):
msg += (
' Some of the tasks that were unable to '
'run have "depends_on_past=True". Try running '
'the backfill with the option '
'"ignore_first_depends_on_past=True" '
' or passing "-I" at the command line.')
self.logger.error(msg)
deadlocked = True
wont_run.extend(not_ready_to_run)
tasks_to_run.clear()
self.logger.debug('Added {} to not_ready'.format(ti))
not_ready.add(key)

self.heartbeat()
executor.heartbeat()

# If the set of tasks that aren't ready ever equals the set of
# tasks to run, then the backfill is deadlocked
if not_ready and not_ready == set(tasks_to_run):
deadlocked.update(tasks_to_run.values())
tasks_to_run.clear()

# Reacting to events
for key, state in list(executor.get_event_buffer().items()):
dag_id, task_id, execution_date = key
Expand All @@ -882,12 +890,12 @@ def _execute(self):

# task reports skipped
elif ti.state == State.SKIPPED:
wont_run.append(key)
skipped.add(key)
self.logger.error("Skipping {} ".format(key))

# anything else is a failure
else:
failed.append(key)
failed.add(key)
self.logger.error("Task instance {} failed".format(key))

tasks_to_run.pop(key)
Expand All @@ -899,18 +907,19 @@ def _execute(self):
if ti.state == State.SUCCESS:
self.logger.info(
'Task instance {} succeeded'.format(key))
succeeded.append(key)
succeeded.add(key)
tasks_to_run.pop(key)

# task reports failure
elif ti.state == State.FAILED:
self.logger.error("Task instance {} failed".format(key))
failed.append(key)
failed.add(key)
tasks_to_run.pop(key)

# this probably won't ever be triggered
elif key in not_ready_to_run:
continue
elif ti in not_ready:
self.logger.info(
"{} wasn't expected to run, but it did".format(ti))

# executor reports success but task does not - this is weird
elif ti.state not in (
Expand Down Expand Up @@ -939,29 +948,51 @@ def _execute(self):
ti.handle_failure(msg)
tasks_to_run.pop(key)

msg = (
"[backfill progress] "
"waiting: {0} | "
"succeeded: {1} | "
"kicked_off: {2} | "
"failed: {3} | "
"wont_run: {4} ").format(
len(tasks_to_run),
len(succeeded),
len(started),
len(failed),
len(wont_run))
msg = ' | '.join([
"[backfill progress]",
"waiting: {0}",
"succeeded: {1}",
"kicked_off: {2}",
"failed: {3}",
"skipped: {4}",
"deadlocked: {5}"
]).format(
len(tasks_to_run),
len(succeeded),
len(started),
len(failed),
len(skipped),
len(deadlocked))
self.logger.info(msg)

executor.end()
session.close()

err = ''
if failed:
msg = (
"------------------------------------------\n"
"Some tasks instances failed, "
"here's the list:\n{}".format(failed))
raise AirflowException(msg)
self.logger.info("All done. Exiting.")
err += (
"---------------------------------------------------\n"
"Some task instances failed:\n{}\n".format(failed))
if deadlocked:
err += (
'---------------------------------------------------\n'
'BackfillJob is deadlocked.')
deadlocked_depends_on_past = any(
t.are_dependencies_met() != t.are_dependencies_met(
ignore_depends_on_past=True)
for t in deadlocked)
if deadlocked_depends_on_past:
err += (
'Some of the deadlocked tasks were unable to run because '
'of "depends_on_past" relationships. Try running the '
'backfill with the option '
'"ignore_first_depends_on_past=True" or passing "-I" at '
'the command line.')
err += ' These tasks were unable to run:\n{}\n'.format(deadlocked)
if err:
raise AirflowException(err)

self.logger.info("Backfill done. Exiting.")


class LocalTaskJob(BaseJob):
Expand Down
3 changes: 2 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2398,7 +2398,8 @@ def get_active_runs(self):
# AND there are unfinished tasks...
any(ti.state in State.unfinished() for ti in task_instances) and
# AND none of them have dependencies met...
all(not ti.are_dependencies_met() for ti in task_instances
all(not ti.are_dependencies_met(session=session)
for ti in task_instances
if ti.state in State.unfinished()))

for run in active_runs:
Expand Down
18 changes: 18 additions & 0 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,26 @@ def runnable(cls):
cls.QUEUED
]

@classmethod
def finished(cls):
"""
A list of states indicating that a task started and completed a
run attempt. Note that the attempt could have resulted in failure or
have been interrupted; in any case, it is no longer running.
"""
return [
cls.SUCCESS,
cls.SHUTDOWN,
cls.FAILED,
cls.SKIPPED,
]

@classmethod
def unfinished(cls):
"""
A list of states indicating that a task either has not completed
a run or has not even started.
"""
return [
cls.NONE,
cls.QUEUED,
Expand Down
22 changes: 0 additions & 22 deletions airflow/utils/tests.py

This file was deleted.

3 changes: 3 additions & 0 deletions scripts/ci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ if [ "${TRAVIS}" ]; then
echo "Using travis airflow.cfg"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cp -f ${DIR}/airflow_travis.cfg ~/airflow/unittests.cfg

ROOTDIR="$(dirname $(dirname $DIR))"
export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags"
fi

echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN
Expand Down
2 changes: 1 addition & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
configuration.test_mode()


try:
import cPickle as pickle
Expand Down
10 changes: 10 additions & 0 deletions tests/dags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Unit Tests DAGs Folder

This folder contains DAGs for Airflow unit testing.

To access a DAG in this folder, use the following code inside a unit test. Note this only works when `test_mode` is on; otherwise the normal Airflow `DAGS_FOLDER` will take precedence.

```python
dagbag = DagBag()
dag = dagbag.get(dag_id)
```
Loading

0 comments on commit fd9388c

Please sign in to comment.