Skip to content

Commit

Permalink
Merge pull request apache#1220 from jlowin/fix_executor_failed
Browse files Browse the repository at this point in the history
Fix case where Executors fail to report failure, creating infinite loop

Thanks for the great commit message
  • Loading branch information
bolkedebruin committed Mar 28, 2016
2 parents a136b2f + f0eeb15 commit d7c95a8
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 12 deletions.
6 changes: 3 additions & 3 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def run(self):
self.__class__.__name__, command))
command = "exec bash -c '{0}'".format(command)
try:
subprocess.Popen(command, shell=True).wait()
subprocess.check_call(command, shell=True)
state = State.SUCCESS
except Exception as e:
except subprocess.CalledProcessError as e:
state = State.FAILED
self.logger.error("failed to execute task {}:".format(str(e)))
self.logger.error("Failed to execute task {}:".format(str(e)))
# raise e
self.result_queue.put((key, state))
self.task_queue.task_done()
Expand Down
11 changes: 6 additions & 5 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ def execute_async(self, key, command, queue=None):
def sync(self):
for key, command in self.commands_to_run:
self.logger.info("Executing command: {}".format(command))

try:
sp = subprocess.Popen(command, shell=True)
sp.wait()
except Exception as e:
subprocess.check_call(command, shell=True)
self.change_state(key, State.SUCCESS)
except subprocess.CalledProcessError as e:
self.change_state(key, State.FAILED)
raise e
self.change_state(key, State.SUCCESS)
self.logger.error("Failed to execute task {}:".format(str(e)))

self.commands_to_run = []

def end(self):
Expand Down
34 changes: 30 additions & 4 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from builtins import str
from past.builtins import basestring
from collections import defaultdict
from collections import defaultdict, Counter
from datetime import datetime
from itertools import product
import getpass
Expand Down Expand Up @@ -734,6 +734,7 @@ def _execute(self):

executor = self.executor
executor.start()
executor_fails = Counter()

# Build a list of all instances to run
tasks_to_run = {}
Expand Down Expand Up @@ -785,12 +786,21 @@ def _execute(self):
if (
ti.state in (State.FAILED, State.SKIPPED) or
state == State.FAILED):
if ti.state == State.FAILED or state == State.FAILED:
# executor reports failure; task reports running
if ti.state == State.RUNNING and state == State.FAILED:
msg = (
'Executor reports that task instance {} failed '
'although the task says it is running.'.format(key))
self.logger.error(msg)
ti.handle_failure(msg)
# executor and task report failure
elif ti.state == State.FAILED or state == State.FAILED:
failed.append(key)
self.logger.error("Task instance " + str(key) + " failed")
self.logger.error("Task instance {} failed".format(key))
# task reports skipped
elif ti.state == State.SKIPPED:
wont_run.append(key)
self.logger.error("Skipping " + str(key) + " failed")
self.logger.error("Skipping {} ".format(key))
tasks_to_run.pop(key)
# Removing downstream tasks that also shouldn't run
for t in self.dag.get_task(task_id).get_flat_relatives(
Expand All @@ -799,9 +809,11 @@ def _execute(self):
if key in tasks_to_run:
wont_run.append(key)
tasks_to_run.pop(key)
# executor and task report success
elif ti.state == State.SUCCESS and state == State.SUCCESS:
succeeded.append(key)
tasks_to_run.pop(key)
# executor reports success but task does not -- this is weird
elif (
ti.state not in (State.SUCCESS, State.QUEUED) and
state == State.SUCCESS):
Expand All @@ -812,6 +824,20 @@ def _execute(self):
"reported state is '{}'. TI is {}"
"".format(ti.state, state, ti))

# if the executor fails 3 or more times, stop trying to
# run the task
executor_fails[key] += 1
if executor_fails[key] >= 3:
msg = (
'The airflow run command failed to report an '
'error for task {} three or more times. The task '
'is being marked as failed. This is very unusual '
'and probably means that an error is taking place '
'before the task even starts.'.format(key))
self.logger.error(msg)
ti.handle_failure(msg)
tasks_to_run.pop(key)

msg = (
"[backfill progress] "
"waiting: {0} | "
Expand Down
53 changes: 53 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import errno
import signal
from time import sleep

from dateutil.relativedelta import relativedelta
Expand All @@ -36,6 +37,8 @@

NUM_EXAMPLE_DAGS = 14
DEV_NULL = '/dev/null'
TEST_DAG_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'dags')
DEFAULT_DATE = datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
Expand All @@ -49,6 +52,29 @@
import pickle


class timeout:
"""
A context manager used to limit execution time.
Note -- won't work on Windows (based on signal, like Airflow timeouts)
Based on: http://stackoverflow.com/a/22348885
"""
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message

def handle_timeout(self, signum, frame):
raise ValueError(self.error_message)

def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)

def __exit__(self, type, value, traceback):
signal.alarm(0)


class FakeDatetime(datetime):
"A fake replacement for datetime that can be mocked for testing."

Expand Down Expand Up @@ -245,6 +271,33 @@ def test_backfill_examples(self):
end_date=DEFAULT_DATE)
job.run()

def test_trap_executor_error(self):
"""
Test for https://github.com/airbnb/airflow/pull/1220
Test that errors setting up tasks (before tasks run) are properly
caught
"""
self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
dags = [
dag for dag in self.dagbag.dags.values()
if dag.dag_id in ('test_raise_executor_error',)]
for dag in dags:
dag.clear(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
for dag in dags:
job = jobs.BackfillJob(
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
# run with timeout because this creates an infinite loop if not
# caught
def run_with_timeout():
with timeout(seconds=15):
job.run()
self.assertRaises(AirflowException, run_with_timeout)

def test_pickling(self):
dp = self.dag.pickle()
assert self.dag.dag_id == dp.pickle.dag_id
Expand Down
49 changes: 49 additions & 0 deletions tests/dags/test_raise_executor_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
DAG designed to test what happens when running a DAG fails before
a task runs -- prior to a fix, this could actually cause an Executor to report
SUCCESS. Since the task never reports any status, this can lead to an infinite
rescheduling loop.
"""
from datetime import datetime

from airflow.models import DAG
from airflow.operators import SubDagOperator
from airflow.example_dags.subdags.subdag import subdag

args = {
'owner': 'airflow',
'start_date': datetime(2016, 1, 1),
}

dag = DAG(
dag_id='test_raise_executor_error',
default_args=args,
schedule_interval="@daily",
)

section_1 = SubDagOperator(
task_id='subdag_op',
subdag=subdag('test_raise_executor_error', 'subdag_op', args),
default_args=args,
dag=dag,
)

# change the subdag name -- this creates an error because the subdag
# won't be found, but it'll do it in a way that causes the executor to report
# success
section_1.subdag.dag_id = 'bad_id'

0 comments on commit d7c95a8

Please sign in to comment.