Skip to content

Commit

Permalink
[AIRFLOW-1024] Ignore celery executor errors (apache#49)
Browse files Browse the repository at this point in the history
Code defensively around the interactions with
celery so that
we just log errors instead of crashing the
scheduler.
It might makes sense to make the try catches one
level higher
(to catch errors from all executors), but this
needs some investigation.

Closes apache#2355 from aoen/ddavydov--
handle_celery_executor_errors_gracefully
  • Loading branch information
aoen committed Jun 9, 2017
1 parent 38cbf13 commit 7af20fe
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
import ssl
import time
import traceback

from celery import Celery
from celery import states as celery_states
Expand Down Expand Up @@ -101,23 +102,27 @@ def sync(self):
self.logger.debug(
"Inquiring about {} celery task(s)".format(len(self.tasks)))
for key, async in list(self.tasks.items()):
state = async.state
if self.last_state[key] != state:
if state == celery_states.SUCCESS:
self.success(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.FAILURE:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.REVOKED:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
else:
self.logger.info("Unexpected state: " + async.state)
self.last_state[key] = async.state
try:
state = async.state
if self.last_state[key] != state:
if state == celery_states.SUCCESS:
self.success(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.FAILURE:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.REVOKED:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
else:
self.logger.info("Unexpected state: " + async.state)
self.last_state[key] = async.state
except Exception as e:
logging.error("Error syncing the celery executor, ignoring "
"it:\n{}\n".format(e, traceback.format_exc()))

def end(self, synchronous=False):
if synchronous:
Expand Down

0 comments on commit 7af20fe

Please sign in to comment.