Skip to content

Commit

Permalink
KPO Add follow log in termination step (apache#38081)
Browse files Browse the repository at this point in the history
* KPO Add follow log in termination step

Look like in Asyc KPO the termination step producing some duplicate logs.
This PR fixes it with follow, and last_log_time
  • Loading branch information
pankajastro authored Mar 26, 2024
1 parent 1175ac4 commit a3f7ddd
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
21 changes: 15 additions & 6 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import datetime
import json
import logging
import math
import re
import shlex
import string
Expand Down Expand Up @@ -710,10 +711,13 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
pod=self.pod, event=event, client=self.client, mode=ExecutionMode.SYNC
)

follow = self.logging_interval is None
last_log_time = event.get("last_log_time")

if event["status"] in ("error", "failed", "timeout"):
# fetch some logs when pod is failed
if self.get_logs:
self.write_logs(self.pod)
self.write_logs(self.pod, follow=follow, since_time=last_log_time)

if self.do_xcom_push:
_ = self.extract_xcom(pod=self.pod)
Expand All @@ -723,13 +727,12 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:

elif event["status"] == "running":
if self.get_logs:
last_log_time = event.get("last_log_time")
self.log.info("Resuming logs read from time %r", last_log_time)

pod_log_status = self.pod_manager.fetch_container_logs(
pod=self.pod,
container_name=self.BASE_CONTAINER_NAME,
follow=self.logging_interval is None,
follow=follow,
since_time=last_log_time,
)

Expand All @@ -742,7 +745,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
elif event["status"] == "success":
# fetch some logs when pod is executed successfully
if self.get_logs:
self.write_logs(self.pod)
self.write_logs(self.pod, follow=follow, since_time=last_log_time)

if self.do_xcom_push:
xcom_sidecar_output = self.extract_xcom(pod=self.pod)
Expand Down Expand Up @@ -773,12 +776,18 @@ def _clean(self, event: dict[str, Any]):
def execute_complete(self, context: Context, event: dict, **kwargs):
return self.trigger_reentry(context=context, event=event)

def write_logs(self, pod: k8s.V1Pod):
def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime | None = None):
try:
since_seconds = (
math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) - since_time).total_seconds())
if since_time
else None
)
logs = self.pod_manager.read_pod_logs(
pod=pod,
container_name=self.base_container_name,
follow=False,
follow=follow,
since_seconds=since_seconds,
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/cncf/kubernetes/triggers/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ async def _wait_for_container_completion(self) -> TriggerEvent:
container_state = self.define_container_state(pod)
if container_state == ContainerState.TERMINATED:
return TriggerEvent(
{"status": "success", "namespace": self.pod_namespace, "name": self.pod_name}
{
"status": "success",
"namespace": self.pod_namespace,
"name": self.pod_name,
"last_log_time": self.last_log_time,
}
)
elif container_state == ContainerState.FAILED:
return TriggerEvent(
Expand All @@ -254,6 +259,7 @@ async def _wait_for_container_completion(self) -> TriggerEvent:
"namespace": self.pod_namespace,
"name": self.pod_name,
"message": "Container state failed",
"last_log_time": self.last_log_time,
}
)
if time_get_more_logs and datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs:
Expand Down
1 change: 1 addition & 0 deletions tests/providers/cncf/kubernetes/triggers/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ async def test_run_loop_return_failed_event(self, mock_hook, mock_method, mock_w
"namespace": "default",
"name": "test-pod-name",
"message": "Container state failed",
"last_log_time": None,
}
)
actual_event = await trigger.run().asend(None)
Expand Down

0 comments on commit a3f7ddd

Please sign in to comment.