Skip to content

Commit

Permalink
Log traceback in trigger excs (apache#21213)
Browse files Browse the repository at this point in the history
  • Loading branch information
malthe authored Feb 28, 2022
1 parent 2049998 commit 4ad21f5
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 13 deletions.
1 change: 1 addition & 0 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
isort:skip_file
"""


# flake8: noqa: F401

import sys
Expand Down
18 changes: 10 additions & 8 deletions airflow/jobs/triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ def handle_failed_triggers(self):
"""
while self.runner.failed_triggers:
# Tell the model to fail this trigger's deps
trigger_id = self.runner.failed_triggers.popleft()
Trigger.submit_failure(trigger_id=trigger_id)
trigger_id, saved_exc = self.runner.failed_triggers.popleft()
Trigger.submit_failure(trigger_id=trigger_id, exc=saved_exc)
# Emit stat event
Stats.incr('triggers.failed')

Expand Down Expand Up @@ -211,7 +211,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
events: Deque[Tuple[int, TriggerEvent]]

# Outbound queue of failed triggers
failed_triggers: Deque[int]
failed_triggers: Deque[Tuple[int, BaseException]]

# Should-we-stop flag
stop: bool = False
Expand Down Expand Up @@ -292,6 +292,7 @@ async def cleanup_finished_triggers(self):
for trigger_id, details in list(self.triggers.items()):
if details["task"].done():
# Check to see if it exited for good reasons
saved_exc = None
try:
result = details["task"].result()
except (asyncio.CancelledError, SystemExit, KeyboardInterrupt):
Expand All @@ -302,7 +303,8 @@ async def cleanup_finished_triggers(self):
continue
except BaseException as e:
# This is potentially bad, so log it.
self.log.error("Trigger %s exited with error %s", details["name"], e)
self.log.exception("Trigger %s exited with error %s", details["name"], e)
saved_exc = e
else:
# See if they foolishly returned a TriggerEvent
if isinstance(result, TriggerEvent):
Expand All @@ -316,7 +318,7 @@ async def cleanup_finished_triggers(self):
"Trigger %s exited without sending an event. Dependent tasks will be failed.",
details["name"],
)
self.failed_triggers.append(trigger_id)
self.failed_triggers.append((trigger_id, saved_exc))
del self.triggers[trigger_id]
await asyncio.sleep(0)

Expand Down Expand Up @@ -387,7 +389,7 @@ def update_triggers(self, requested_trigger_ids: Set[int]):
running_trigger_ids.union(x[0] for x in self.events)
.union(self.to_cancel)
.union(x[0] for x in self.to_create)
.union(self.failed_triggers)
.union(trigger[0] for trigger in self.failed_triggers)
)
# Work out the two difference sets
new_trigger_ids = requested_trigger_ids - known_trigger_ids
Expand All @@ -403,9 +405,9 @@ def update_triggers(self, requested_trigger_ids: Set[int]):
# Resolve trigger record into an actual class instance
try:
trigger_class = self.get_trigger_by_classpath(new_triggers[new_id].classpath)
except BaseException:
except BaseException as e:
# Either the trigger code or the path to it is bad. Fail the trigger.
self.failed_triggers.append(new_id)
self.failed_triggers.append((new_id, e))
continue
self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
# Enqueue orphaned triggers for cancellation
Expand Down
3 changes: 3 additions & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,9 @@ def _execute_task(self, context, task_copy):
# this task was scheduled specifically to fail.
if self.next_method == "__fail__":
next_kwargs = self.next_kwargs or {}
traceback = self.next_kwargs.get("traceback")
if traceback is not None:
self.log.error("Trigger failed:\n%s", "\n".join(traceback))
raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
# Grab the callable off the Operator/Task and add in any kwargs
execute_callable = getattr(task_copy, self.next_method)
Expand Down
6 changes: 4 additions & 2 deletions airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import datetime
from traceback import format_exception
from typing import Any, Dict, Iterable, Optional

from sqlalchemy import Column, Integer, String, func, or_
Expand Down Expand Up @@ -124,7 +125,7 @@ def submit_event(cls, trigger_id, event, session=None):

@classmethod
@provide_session
def submit_failure(cls, trigger_id, session=None):
def submit_failure(cls, trigger_id, exc=None, session=None):
"""
Called when a trigger has failed unexpectedly, and we need to mark
everything that depended on it as failed. Notably, we have to actually
Expand All @@ -144,8 +145,9 @@ def submit_failure(cls, trigger_id, session=None):
TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
):
# Add the error and set the next_method to the fail state
traceback = format_exception(type(exc), exc, exc.__traceback__) if exc else None
task_instance.next_method = "__fail__"
task_instance.next_kwargs = {"error": "Trigger failure"}
task_instance.next_kwargs = {"error": "Trigger failure", "traceback": traceback}
# Remove ourselves as its trigger
task_instance.trigger_id = None
# Finally, mark it as scheduled so it gets re-queued
Expand Down
11 changes: 8 additions & 3 deletions tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,11 @@ def test_trigger_failing(session):
# Wait for up to 3 seconds for it to fire and appear in the event queue
for _ in range(30):
if job.runner.failed_triggers:
assert list(job.runner.failed_triggers) == [1]
assert len(job.runner.failed_triggers) == 1
trigger_id, exc = list(job.runner.failed_triggers)[0]
assert trigger_id == 1
assert isinstance(exc, ValueError)
assert exc.args[0] == "Deliberate trigger failure"
break
time.sleep(0.1)
else:
Expand Down Expand Up @@ -448,7 +452,7 @@ def test_invalid_trigger(session, dag_maker):
job.load_triggers()

# Make sure it turned up in the failed queue
assert list(job.runner.failed_triggers) == [1]
assert len(job.runner.failed_triggers) == 1

# Run the failed trigger handler
job.handle_failed_triggers()
Expand All @@ -458,4 +462,5 @@ def test_invalid_trigger(session, dag_maker):
task_instance.refresh_from_db()
assert task_instance.state == TaskInstanceState.SCHEDULED
assert task_instance.next_method == "__fail__"
assert task_instance.next_kwargs == {'error': 'Trigger failure'}
assert task_instance.next_kwargs['error'] == 'Trigger failure'
assert task_instance.next_kwargs['traceback'][-1] == "ModuleNotFoundError: No module named 'fake'\n"

0 comments on commit 4ad21f5

Please sign in to comment.