Skip to content

Commit

Permalink
Make sure Executors properly trap errors
Browse files Browse the repository at this point in the history
SequentialExecutor and LocalExecutor execute `airflow run` commands
with `subprocess.Popen().wait()` and try to catch errors with
`try/except`. However, `subprocess.Popen()` doesn't raise errors;
you have to check the `returncode`. As a result, these Executors
always report that their commands are successful. This is normally fine
because task status gets precedence over executor status, so as long
as the task reports on itself correctly the issue is avoided. But
if an error is raised BEFORE a task runs -- meaning the task is not
yet monitoring its own status -- then the executor will incorrectly
report success. Airflow will actually notice something went wrong,
but because the task doesn't say it failed, it gets rescheduled,
leading to an infinite loop.

To resolve this, replace the Executor's `Popen().wait()` with
`check_call()`, which is a blocking method that raises an error
if the returncode != 0. This way, errors are properly recognized.

Also, prevent infinite loops by limiting the number of times a
task is allowed to be rescheduled due to executor failure to 3.
(Note: this doesn't affect the number of times a task can be
rescheduled due to its own failure).

Last, check for an odd situation where the executor reports failure
but the task reports running.

Closes apache#1199
See apache#1220 for a test case
  • Loading branch information
jlowin committed Mar 26, 2016
1 parent 7a632e2 commit fc23b46
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
6 changes: 3 additions & 3 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def run(self):
self.__class__.__name__, command))
command = "exec bash -c '{0}'".format(command)
try:
subprocess.Popen(command, shell=True).wait()
subprocess.check_call(command, shell=True)
state = State.SUCCESS
except Exception as e:
except subprocess.CalledProcessError as e:
state = State.FAILED
self.logger.error("failed to execute task {}:".format(str(e)))
self.logger.error("Failed to execute task {}:".format(str(e)))
# raise e
self.result_queue.put((key, state))
self.task_queue.task_done()
Expand Down
11 changes: 6 additions & 5 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ def execute_async(self, key, command, queue=None):
def sync(self):
for key, command in self.commands_to_run:
self.logger.info("Executing command: {}".format(command))

try:
sp = subprocess.Popen(command, shell=True)
sp.wait()
except Exception as e:
subprocess.check_call(command, shell=True)
self.change_state(key, State.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, State.FAILED)
raise e
self.change_state(key, State.SUCCESS)
self.logger.error("Failed to execute task {}:".format(str(e)))

self.commands_to_run = []

def end(self):
Expand Down
34 changes: 30 additions & 4 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from builtins import str
from past.builtins import basestring
from collections import defaultdict
from collections import defaultdict, Counter
from datetime import datetime
from itertools import product
import getpass
Expand Down Expand Up @@ -734,6 +734,7 @@ def _execute(self):

executor = self.executor
executor.start()
executor_fails = Counter()

# Build a list of all instances to run
tasks_to_run = {}
Expand Down Expand Up @@ -785,12 +786,21 @@ def _execute(self):
if (
ti.state in (State.FAILED, State.SKIPPED) or
state == State.FAILED):
if ti.state == State.FAILED or state == State.FAILED:
# executor reports failure; task reports running
if ti.state == State.RUNNING and state == State.FAILED:
msg = (
'Executor reports that task instance {} failed '
'although the task says it is running.'.format(key))
self.logger.error(msg)
ti.handle_failure(msg)
# executor and task report failure
elif ti.state == State.FAILED or state == State.FAILED:
failed.append(key)
self.logger.error("Task instance " + str(key) + " failed")
self.logger.error("Task instance {} failed".format(key))
# task reports skipped
elif ti.state == State.SKIPPED:
wont_run.append(key)
self.logger.error("Skipping " + str(key) + " failed")
self.logger.error("Skipping {} ".format(key))
tasks_to_run.pop(key)
# Removing downstream tasks that also shouldn't run
for t in self.dag.get_task(task_id).get_flat_relatives(
Expand All @@ -799,9 +809,11 @@ def _execute(self):
if key in tasks_to_run:
wont_run.append(key)
tasks_to_run.pop(key)
# executor and task report success
elif ti.state == State.SUCCESS and state == State.SUCCESS:
succeeded.append(key)
tasks_to_run.pop(key)
# executor reports success but task does not -- this is weird
elif (
ti.state not in (State.SUCCESS, State.QUEUED) and
state == State.SUCCESS):
Expand All @@ -812,6 +824,20 @@ def _execute(self):
"reported state is '{}'. TI is {}"
"".format(ti.state, state, ti))

# if the executor fails 3 or more times, stop trying to
# run the task
executor_fails[key] += 1
if executor_fails[key] >= 3:
msg = (
'The airflow run command failed to report an '
'error for task {} three or more times. The task '
'is being marked as failed. This is very unusual '
'and probably means that an error is taking place '
'before the task even starts.'.format(key))
self.logger.error(msg)
ti.handle_failure(msg)
tasks_to_run.pop(key)

msg = (
"[backfill progress] "
"waiting: {0} | "
Expand Down

0 comments on commit fc23b46

Please sign in to comment.