Skip to content

Commit

Permalink
Make cleanup method in trigger an async one (apache#30152)
Browse files Browse the repository at this point in the history
Cleanup method is called in async run_trigger and it's easy to
imagine that users might want to do some async operations in it
(for example make an http call). Therefore the cleanup method should
be asynchronous.

Related apache#30141
  • Loading branch information
potiuk authored Mar 17, 2023
1 parent b749f7f commit b65dbaa
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
6 changes: 4 additions & 2 deletions airflow/jobs/triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import time
import warnings
from collections import deque
from contextlib import suppress
from copy import copy
from queue import SimpleQueue
from typing import TYPE_CHECKING, Deque
Expand Down Expand Up @@ -601,8 +602,9 @@ async def run_trigger(self, trigger_id, trigger):
# CancelledError will get injected when we're stopped - which is
# fine, the cleanup process will understand that, but we want to
# allow triggers a chance to cleanup, either in that case or if
# they exit cleanly.
trigger.cleanup()
# they exit cleanly. Exception from cleanup methods are ignored.
with suppress(Exception):
await trigger.cleanup()
if SEND_TRIGGER_END_MARKER:
self.mark_trigger_end(trigger)

Expand Down
8 changes: 7 additions & 1 deletion airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,18 @@ async def run(self) -> AsyncIterator["TriggerEvent"]:
raise NotImplementedError("Triggers must implement run()")
yield # To convince Mypy this is an async iterator.

def cleanup(self) -> None:
async def cleanup(self) -> None:
"""
Cleanup the trigger.
Called when the trigger is no longer needed, and it's being removed
from the active triggerer process.
This method follows the async/await pattern to allow to run the cleanup
in triggerer main event loop. Exceptions raised by the cleanup method
are ignored, so if you would like to be able to debug them and be notified
that cleanup method failed, you should wrap your code with try/except block
and handle it appropriately (in async-compatible way).
"""

def __repr__(self) -> str:
Expand Down
6 changes: 6 additions & 0 deletions newsfragments/30152.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The ``cleanup()`` method in BaseTrigger is now defined as asynchronous (following async/await) pattern.

This is potentially a breaking change for any custom trigger implementations that override the ``cleanup()``
method and uses synchronous code, however using synchronous operations in cleanup was technically wrong,
because the method was executed in the main loop of the Triggerer and it was introducing unnecessary delays
impacting other triggers. The change is unlikely to affect any existing trigger implementations.

0 comments on commit b65dbaa

Please sign in to comment.