From 791f3cfc5cc7f2798ee0fcd18ff72906c48ebd01 Mon Sep 17 00:00:00 2001 From: Maksim Date: Tue, 21 May 2024 13:27:09 -0700 Subject: [PATCH] Fix acknowledged functionality in deferrable mode for PubSubPullSensor (#39711) --- .../providers/google/cloud/triggers/pubsub.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/airflow/providers/google/cloud/triggers/pubsub.py b/airflow/providers/google/cloud/triggers/pubsub.py index 6b95e283adddb..535bfe2ba1c68 100644 --- a/airflow/providers/google/cloud/triggers/pubsub.py +++ b/airflow/providers/google/cloud/triggers/pubsub.py @@ -97,22 +97,19 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] try: - pulled_messages = None while True: - if pulled_messages: + if pulled_messages := await self.hook.pull( + project_id=self.project_id, + subscription=self.subscription, + max_messages=self.max_messages, + return_immediately=True, + ): if self.ack_messages: await self.message_acknowledgement(pulled_messages) yield TriggerEvent({"status": "success", "message": pulled_messages}) return - else: - pulled_messages = await self.hook.pull( - project_id=self.project_id, - subscription=self.subscription, - max_messages=self.max_messages, - return_immediately=True, - ) - self.log.info("Sleeping for %s seconds.", self.poke_interval) - await asyncio.sleep(self.poke_interval) + self.log.info("Sleeping for %s seconds.", self.poke_interval) + await asyncio.sleep(self.poke_interval) except Exception as e: yield TriggerEvent({"status": "error", "message": str(e)}) return