Skip to content

Commit

Permalink
Fix KPO task hanging when pod fails to start within specified timeout (
Browse files Browse the repository at this point in the history
…apache#37514)

I am observing an issue wrt to the recent deferrable KPO changes in
PR apache#37279 and apache#37454,
where when the pod fails to start within a specified timeout value,
the KPO task is hanging forever whereas it is expected to fail after
the timeout. This PR fixes the issue by correcting a logical error
for detecting if elapsed timeout has occured for raising the timeout
trigger event.
  • Loading branch information
pankajkoti authored Feb 18, 2024
1 parent c49f857 commit 6412b06
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
10 changes: 8 additions & 2 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
)

if event["status"] in ("error", "failed", "timeout"):
# fetch some logs when pod is failed
if self.get_logs:
self.write_logs(self.pod)

if self.do_xcom_push:
_ = self.extract_xcom(pod=self.pod)

Expand All @@ -729,6 +733,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
self.invoke_defer_method()

elif event["status"] == "success":
# fetch some logs when pod is executed successfully
if self.get_logs:
self.write_logs(self.pod)

if self.do_xcom_push:
xcom_sidecar_output = self.extract_xcom(pod=self.pod)
return xcom_sidecar_output
Expand All @@ -741,8 +749,6 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
def _clean(self, event: dict[str, Any]):
if event["status"] == "running":
return
if self.get_logs:
self.write_logs(self.pod)
istio_enabled = self.is_istio_enabled(self.pod)
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/cncf/kubernetes/triggers/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"message": message,
}
)
return
except Exception as e:
yield TriggerEvent(
{
Expand Down Expand Up @@ -223,6 +224,7 @@ async def _wait_for_pod_start(self) -> ContainerState:
return self.define_container_state(pod)
self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase)
await asyncio.sleep(self.poll_interval)
delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time
raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout")

async def _wait_for_container_completion(self) -> TriggerEvent:
Expand Down

0 comments on commit 6412b06

Please sign in to comment.