From 053485ba564dd85b5dbb4fec093375e21d1cbd41 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Mon, 12 Feb 2024 13:28:55 +0530 Subject: [PATCH] Modify KPO to log container log periodically (#37279) The current state of the KubernetesPodOperator (KPO) only prints container logs at the end of task execution. While this is sufficient for short-running tasks, it becomes less user-friendly when the container runs for an extended period. This PR enhances the KPO by modifying the trigger and operator to fetch container logs periodically making it possible to monitor the task's progress in the Airflow task UI. a new parameter has been introduced to the operator: logging_interval: This parameter specifies the maximum time, in seconds, that the task should remain deferred before resuming to fetch the latest logs. --- .../cncf/kubernetes/operators/pod.py | 92 ++++++- .../providers/cncf/kubernetes/triggers/pod.py | 157 ++++++------ .../cncf/kubernetes/utils/pod_manager.py | 8 + .../operators.rst | 8 + .../cncf/kubernetes/operators/test_pod.py | 60 ++++- .../cncf/kubernetes/triggers/test_pod.py | 237 +++++++----------- .../cloud/triggers/test_kubernetes_engine.py | 68 ++--- .../kubernetes/example_kubernetes_async.py | 28 +++ 8 files changed, 378 insertions(+), 280 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 0564afdcc70e8..73389f4038282 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -35,7 +35,12 @@ from urllib3.exceptions import HTTPError from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException +from airflow.exceptions import ( + AirflowException, + AirflowProviderDeprecationWarning, + AirflowSkipException, + TaskDeferred, +) from airflow.models import BaseOperator from airflow.providers.cncf.kubernetes import pod_generator from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import ( @@ -63,7 +68,9 @@ EMPTY_XCOM_RESULT, OnFinishAction, PodLaunchFailedException, + PodLaunchTimeoutException, PodManager, + PodNotFoundException, PodOperatorHookProtocol, PodPhase, container_is_succeeded, @@ -77,6 +84,7 @@ if TYPE_CHECKING: import jinja2 + from pendulum import DateTime from typing_extensions import Literal from airflow.providers.cncf.kubernetes.secret import Secret @@ -203,6 +211,9 @@ class KubernetesPodOperator(BaseOperator): of KubernetesPodOperator. :param progress_callback: Callback function for receiving k8s container logs. `progress_callback` is deprecated, please use :param `callbacks` instead. + :param logging_interval: max time in seconds that task should be in deferred state before + resuming to fetch the latest logs. If ``None``, then the task will remain in deferred state until pod + is done, and no logs will be visible until that time. """ # !!! Changes in KubernetesPodOperator's arguments should be also reflected in !!! @@ -297,6 +308,7 @@ def __init__( active_deadline_seconds: int | None = None, callbacks: type[KubernetesPodOperatorCallback] | None = None, progress_callback: Callable[[str], None] | None = None, + logging_interval: int | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -387,6 +399,7 @@ def __init__( self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD self.termination_message_policy = termination_message_policy self.active_deadline_seconds = active_deadline_seconds + self.logging_interval = logging_interval self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict self._progress_callback = progress_callback @@ -641,13 +654,13 @@ def execute_async(self, context: Context): self.invoke_defer_method() - def invoke_defer_method(self): + def invoke_defer_method(self, last_log_time: DateTime | None = None): """Redefine triggers which are being used in child classes.""" trigger_start_time = utcnow() self.defer( trigger=KubernetesPodTrigger( - pod_name=self.pod.metadata.name, - pod_namespace=self.pod.metadata.namespace, + pod_name=self.pod.metadata.name, # type: ignore[union-attr] + pod_namespace=self.pod.metadata.namespace, # type: ignore[union-attr] trigger_start_time=trigger_start_time, kubernetes_conn_id=self.kubernetes_conn_id, cluster_context=self.cluster_context, @@ -659,10 +672,79 @@ def invoke_defer_method(self): startup_check_interval=self.startup_check_interval_seconds, base_container_name=self.base_container_name, on_finish_action=self.on_finish_action.value, + last_log_time=last_log_time, + logging_interval=self.logging_interval, ), - method_name="execute_complete", + method_name="trigger_reentry", ) + @staticmethod + def raise_for_trigger_status(event: dict[str, Any]) -> None: + """Raise exception if pod is not in expected state.""" + if event["status"] == "error": + error_type = event["error_type"] + description = event["description"] + if error_type == "PodLaunchTimeoutException": + raise PodLaunchTimeoutException(description) + else: + raise AirflowException(description) + + def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: + """ + Point of re-entry from trigger. + + If ``logging_interval`` is None, then at this point the pod should be done and we'll just fetch + the logs and exit. + + If ``logging_interval`` is not None, it could be that the pod is still running and we'll just + grab the latest logs and defer back to the trigger again. + """ + remote_pod = None + try: + self.pod_request_obj = self.build_pod_request_obj(context) + self.pod = self.find_pod( + namespace=self.namespace or self.pod_request_obj.metadata.namespace, + context=context, + ) + + # we try to find pod before possibly raising so that on_kill will have `pod` attr + self.raise_for_trigger_status(event) + + if not self.pod: + raise PodNotFoundException("Could not find pod after resuming from deferral") + + if self.get_logs: + last_log_time = event and event.get("last_log_time") + if last_log_time: + self.log.info("Resuming logs read from time %r", last_log_time) + pod_log_status = self.pod_manager.fetch_container_logs( + pod=self.pod, + container_name=self.BASE_CONTAINER_NAME, + follow=self.logging_interval is None, + since_time=last_log_time, + ) + if pod_log_status.running: + self.log.info("Container still running; deferring again.") + self.invoke_defer_method(pod_log_status.last_log_time) + + if self.do_xcom_push: + result = self.extract_xcom(pod=self.pod) + remote_pod = self.pod_manager.await_pod_completion(self.pod) + except TaskDeferred: + raise + except Exception: + self.cleanup( + pod=self.pod or self.pod_request_obj, + remote_pod=remote_pod, + ) + raise + self.cleanup( + pod=self.pod or self.pod_request_obj, + remote_pod=remote_pod, + ) + if self.do_xcom_push: + return result + def execute_complete(self, context: Context, event: dict, **kwargs): self.log.debug("Triggered with event: %s", event) pod = None diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 3dd9eb173ca57..e34a73f146fe2 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -20,18 +20,24 @@ import datetime import traceback import warnings -from asyncio import CancelledError from enum import Enum from functools import cached_property from typing import TYPE_CHECKING, Any, AsyncIterator from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook -from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction, PodPhase +from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + OnFinishAction, + PodLaunchTimeoutException, + PodPhase, + container_is_running, +) from airflow.triggers.base import BaseTrigger, TriggerEvent +from airflow.utils import timezone if TYPE_CHECKING: from kubernetes_asyncio.client.models import V1Pod + from pendulum import DateTime class ContainerState(str, Enum): @@ -70,6 +76,9 @@ class KubernetesPodTrigger(BaseTrigger): state, or the execution is interrupted. If True (default), delete the pod; if False, leave the pod. Deprecated - use `on_finish_action` instead. + :param logging_interval: number of seconds to wait before kicking it back to + the operator to print latest logs. If ``None`` will wait until container done. + :param last_log_time: where to resume logs from """ def __init__( @@ -88,6 +97,8 @@ def __init__( startup_check_interval: int = 1, on_finish_action: str = "delete_pod", should_delete_pod: bool | None = None, + last_log_time: DateTime | None = None, + logging_interval: int | None = None, ): super().__init__() self.pod_name = pod_name @@ -102,6 +113,8 @@ def __init__( self.get_logs = get_logs self.startup_timeout = startup_timeout self.startup_check_interval = startup_check_interval + self.last_log_time = last_log_time + self.logging_interval = logging_interval if should_delete_pod is not None: warnings.warn( @@ -137,6 +150,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "trigger_start_time": self.trigger_start_time, "should_delete_pod": self.should_delete_pod, "on_finish_action": self.on_finish_action.value, + "last_log_time": self.last_log_time, + "logging_interval": self.logging_interval, }, ) @@ -144,97 +159,69 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] """Get current pod status and yield a TriggerEvent.""" self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace) try: - while True: - pod = await self.hook.get_pod( - name=self.pod_name, - namespace=self.pod_namespace, + state = await self._wait_for_pod_start() + if state in PodPhase.terminal_states: + event = TriggerEvent( + {"status": "done", "namespace": self.pod_namespace, "pod_name": self.pod_name} ) - - pod_status = pod.status.phase - self.log.debug("Pod %s status: %s", self.pod_name, pod_status) - - container_state = self.define_container_state(pod) - self.log.debug("Container %s status: %s", self.base_container_name, container_state) - - if container_state == ContainerState.TERMINATED: - yield TriggerEvent( - { - "name": self.pod_name, - "namespace": self.pod_namespace, - "status": "success", - "message": "All containers inside pod have started successfully.", - } - ) - return - elif self.should_wait(pod_phase=pod_status, container_state=container_state): - self.log.info("Container is not completed and still working.") - - if pod_status == PodPhase.PENDING and container_state != ContainerState.RUNNING: - delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time - if delta.total_seconds() >= self.startup_timeout: - message = ( - f"Pod took longer than {self.startup_timeout} seconds to start. " - "Check the pod events in kubernetes to determine why." - ) - yield TriggerEvent( - { - "name": self.pod_name, - "namespace": self.pod_namespace, - "status": "timeout", - "message": message, - } - ) - return - else: - self.log.info("Sleeping for %s seconds.", self.startup_check_interval) - await asyncio.sleep(self.startup_check_interval) - else: - self.log.info("Sleeping for %s seconds.", self.poll_interval) - await asyncio.sleep(self.poll_interval) - else: - yield TriggerEvent( - { - "name": self.pod_name, - "namespace": self.pod_namespace, - "status": "failed", - "message": pod.status.message, - } - ) - return - except CancelledError: - # That means that task was marked as failed - if self.get_logs: - self.log.info("Outputting container logs...") - await self.hook.read_logs( - name=self.pod_name, - namespace=self.pod_namespace, - ) - if self.on_finish_action == OnFinishAction.DELETE_POD: - self.log.info("Deleting pod...") - await self.hook.delete_pod( - name=self.pod_name, - namespace=self.pod_namespace, - ) - yield TriggerEvent( - { - "name": self.pod_name, - "namespace": self.pod_namespace, - "status": "cancelled", - "message": "Pod execution was cancelled", - } - ) + else: + event = await self._wait_for_container_completion() + yield event except Exception as e: - self.log.exception("Exception occurred while checking pod phase:") + description = self._format_exception_description(e) yield TriggerEvent( { - "name": self.pod_name, - "namespace": self.pod_namespace, "status": "error", - "message": str(e), - "stack_trace": traceback.format_exc(), + "error_type": e.__class__.__name__, + "description": description, } ) + def _format_exception_description(self, exc: Exception) -> Any: + if isinstance(exc, PodLaunchTimeoutException): + return exc.args[0] + + description = f"Trigger {self.__class__.__name__} failed with exception {exc.__class__.__name__}." + message = exc.args and exc.args[0] or "" + if message: + description += f"\ntrigger exception message: {message}" + curr_traceback = traceback.format_exc() + description += f"\ntrigger traceback:\n{curr_traceback}" + return description + + async def _wait_for_pod_start(self) -> Any: + """Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error.""" + start_time = timezone.utcnow() + timeout_end = start_time + datetime.timedelta(seconds=self.startup_timeout) + while timeout_end > timezone.utcnow(): + pod = await self.hook.get_pod(self.pod_name, self.pod_namespace) + if not pod.status.phase == "Pending": + return pod.status.phase + self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase) + await asyncio.sleep(self.poll_interval) + raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout") + + async def _wait_for_container_completion(self) -> TriggerEvent: + """ + Wait for container completion. + + Waits until container is no longer in running state. If trigger is configured with a logging period, + then will emit an event to resume the task for the purpose of fetching more logs. + """ + time_begin = timezone.utcnow() + time_get_more_logs = None + if self.logging_interval is not None: + time_get_more_logs = time_begin + datetime.timedelta(seconds=self.logging_interval) + while True: + pod = await self.hook.get_pod(self.pod_name, self.pod_namespace) + if not container_is_running(pod=pod, container_name=self.base_container_name): + return TriggerEvent( + {"status": "done", "namespace": self.pod_namespace, "pod_name": self.pod_name} + ) + if time_get_more_logs and timezone.utcnow() > time_get_more_logs: + return TriggerEvent({"status": "running", "last_log_time": self.last_log_time}) + await asyncio.sleep(self.poll_interval) + def _get_async_hook(self) -> AsyncKubernetesHook: # TODO: Remove this method when the min version of kubernetes provider is 7.12.0 in Google provider. return AsyncKubernetesHook( diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 58520ac42c89b..61eb56bf82b14 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -189,6 +189,14 @@ def get_container_termination_message(pod: V1Pod, container_name: str): return container_status.state.terminated.message if container_status else None +class PodLaunchTimeoutException(AirflowException): + """When pod does not leave the ``Pending`` phase within specified timeout.""" + + +class PodNotFoundException(AirflowException): + """Expected pod does not exist in kube-api.""" + + class PodLogsConsumer: """ Responsible for pulling pod logs from a stream with checking a container status before reading data. diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst index 58fab491b8d9c..b2e5f15f393cd 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst @@ -147,6 +147,14 @@ Also for this action you can use operator in the deferrable mode: :start-after: [START howto_operator_k8s_private_image_async] :end-before: [END howto_operator_k8s_private_image_async] +Example to fetch and display container log periodically + +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes_async.py + :language: python + :start-after: [START howto_operator_async_log] + :end-before: [END howto_operator_async_log] + + How does XCom work? ^^^^^^^^^^^^^^^^^^^ The :class:`~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator` handles diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index fa617a39fd053..c27cd231465cb 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -34,7 +34,11 @@ from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator, _optionally_suppress from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger -from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase +from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + PodLaunchTimeoutException, + PodLoggingStatus, + PodPhase, +) from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils import timezone from airflow.utils.session import create_session @@ -1969,6 +1973,60 @@ def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_m with pytest.raises(AirflowException, match=expect_match): k.cleanup(pod, pod) + @mock.patch( + "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status" + ) + @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs") + def test_get_logs_running( + self, + fetch_container_logs, + await_pod_completion, + find_pod, + raise_for_trigger_status, + ): + """When logs fetch exits with status running, raise task deferred""" + pod = MagicMock() + find_pod.return_value = pod + op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) + await_pod_completion.return_value = None + fetch_container_logs.return_value = PodLoggingStatus(True, None) + with pytest.raises(TaskDeferred): + op.trigger_reentry(create_context(op), None) + fetch_container_logs.is_called_with(pod, "base") + + @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup") + @mock.patch( + "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status" + ) + @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs") + def test_get_logs_not_running(self, fetch_container_logs, find_pod, raise_for_trigger_status, cleanup): + pod = MagicMock() + find_pod.return_value = pod + op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) + fetch_container_logs.return_value = PodLoggingStatus(False, None) + op.trigger_reentry(create_context(op), None) + fetch_container_logs.is_called_with(pod, "base") + + @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup") + @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod") + def test_trigger_error(self, find_pod, cleanup): + """Assert that trigger_reentry raise exception in case of error""" + find_pod.return_value = MagicMock() + op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) + with pytest.raises(PodLaunchTimeoutException): + context = create_context(op) + op.trigger_reentry( + context, + { + "status": "error", + "error_type": "PodLaunchTimeoutException", + "description": "any message", + }, + ) + @pytest.mark.parametrize("do_xcom_push", [True, False]) @patch(KUB_OP_PATH.format("extract_xcom")) diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index 9c016ea8cfb9a..d12100e4e35c7 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -20,11 +20,14 @@ import asyncio import datetime import logging -from asyncio import CancelledError, Future +from asyncio import Future from unittest import mock +from unittest.mock import MagicMock import pytest from kubernetes.client import models as k8s +from pendulum import DateTime +from pytest import param from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase @@ -65,6 +68,22 @@ def trigger(): ) +def get_read_pod_mock_containers(statuses_to_emit=None): + """ + Emit pods with given phases sequentially. + `statuses_to_emit` should be a list of bools indicating running or not. + """ + + async def mock_read_namespaced_pod(*args, **kwargs): + container_mock = MagicMock() + container_mock.state.running = statuses_to_emit.pop(0) + event_mock = MagicMock() + event_mock.status.container_statuses = [container_mock] + return event_mock + + return mock_read_namespaced_pod + + class TestKubernetesPodTrigger: @staticmethod def _mock_pod_result(result_to_mock): @@ -90,6 +109,8 @@ def test_serialize(self, trigger): "trigger_start_time": TRIGGER_START_TIME, "on_finish_action": ON_FINISH_ACTION, "should_delete_pod": ON_FINISH_ACTION == "delete_pod", + "last_log_time": None, + "logging_interval": None, } @pytest.mark.asyncio @@ -101,10 +122,9 @@ async def test_run_loop_return_success_event(self, mock_hook, mock_method, trigg expected_event = TriggerEvent( { - "name": POD_NAME, + "pod_name": POD_NAME, "namespace": NAMESPACE, - "status": "success", - "message": "All containers inside pod have started successfully.", + "status": "done", } ) actual_event = await trigger.run().asend(None) @@ -112,36 +132,16 @@ async def test_run_loop_return_success_event(self, mock_hook, mock_method, trigg assert actual_event == expected_event @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") + @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_run_loop_return_failed_event(self, mock_hook, mock_method, trigger): - mock_hook.get_pod.return_value = self._mock_pod_result( - mock.MagicMock( - status=mock.MagicMock( - message=FAILED_RESULT_MSG, - ) - ) - ) - mock_method.return_value = ContainerState.FAILED - - expected_event = TriggerEvent( - { - "name": POD_NAME, - "namespace": NAMESPACE, - "status": "failed", - "message": FAILED_RESULT_MSG, - } - ) - actual_event = await trigger.run().asend(None) - - assert actual_event == expected_event - - @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_PATH}.define_container_state") - @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_run_loop_return_waiting_event(self, mock_hook, mock_method, trigger, caplog): + async def test_run_loop_return_waiting_event( + self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog + ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING + mock_container_is_running.return_value = True caplog.set_level(logging.INFO) @@ -153,11 +153,16 @@ async def test_run_loop_return_waiting_event(self, mock_hook, mock_method, trigg assert f"Sleeping for {POLL_INTERVAL} seconds." @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") + @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_run_loop_return_running_event(self, mock_hook, mock_method, trigger, caplog): + async def test_run_loop_return_running_event( + self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog + ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING + mock_container_is_running.return_value = True caplog.set_level(logging.INFO) @@ -168,6 +173,30 @@ async def test_run_loop_return_running_event(self, mock_hook, mock_method, trigg assert "Container is not completed and still working." assert f"Sleeping for {POLL_INTERVAL} seconds." + @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_PATH}.hook") + async def test_run_loop_return_failed_event(self, mock_hook, mock_method, trigger): + mock_hook.get_pod.return_value = self._mock_pod_result( + mock.MagicMock( + status=mock.MagicMock( + message=FAILED_RESULT_MSG, + ) + ) + ) + mock_method.return_value = ContainerState.FAILED + + expected_event = TriggerEvent( + { + "pod_name": POD_NAME, + "namespace": NAMESPACE, + "status": "done", + } + ) + actual_event = await trigger.run().asend(None) + + assert actual_event == expected_event + @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}.hook") async def test_logging_in_trigger_when_exception_should_execute_successfully( @@ -181,14 +210,8 @@ async def test_logging_in_trigger_when_exception_should_execute_successfully( generator = trigger.run() actual = await generator.asend(None) - actual_stack_trace = actual.payload.pop("stack_trace") - assert ( - TriggerEvent( - {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", "message": "Test exception"} - ) - == actual - ) - assert actual_stack_trace.startswith("Traceback (most recent call last):") + actual_stack_trace = actual.payload.pop("description") + assert actual_stack_trace.startswith("Trigger KubernetesPodTrigger failed with exception Exception") @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}.define_container_state") @@ -209,96 +232,39 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( assert "Container logs:" @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_logging_in_trigger_when_cancelled_should_execute_successfully_and_delete_pod( - self, - mock_hook, - caplog, - ): - """ - Test that KubernetesPodTrigger fires the correct event in case if the task was cancelled. - """ - - mock_hook.get_pod.side_effect = CancelledError() - mock_hook.read_logs.return_value = self._mock_pod_result(mock.MagicMock()) - mock_hook.delete_pod.return_value = self._mock_pod_result(mock.MagicMock()) - - trigger = KubernetesPodTrigger( - pod_name=POD_NAME, - pod_namespace=NAMESPACE, - base_container_name=BASE_CONTAINER_NAME, - kubernetes_conn_id=CONN_ID, - poll_interval=POLL_INTERVAL, - cluster_context=CLUSTER_CONTEXT, - config_file=CONFIG_FILE, - in_cluster=IN_CLUSTER, - get_logs=GET_LOGS, - startup_timeout=STARTUP_TIMEOUT_SECS, - trigger_start_time=TRIGGER_START_TIME, - on_finish_action="delete_pod", - ) - - generator = trigger.run() - actual = await generator.asend(None) - assert ( - TriggerEvent( - { - "name": POD_NAME, - "namespace": NAMESPACE, - "status": "cancelled", - "message": "Pod execution was cancelled", - } - ) - == actual - ) - assert "Outputting container logs..." in caplog.text - assert "Deleting pod..." in caplog.text - - @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_logging_in_trigger_when_cancelled_should_execute_successfully_without_delete_pod( - self, - mock_hook, - caplog, - ): + @pytest.mark.parametrize( + "logging_interval, exp_event", + [ + param(0, {"status": "running", "last_log_time": DateTime(2022, 1, 1)}, id="short_interval"), + param(None, {"status": "done", "namespace": mock.ANY, "pod_name": mock.ANY}, id="no_interval"), + ], + ) + @mock.patch( + "kubernetes_asyncio.client.CoreV1Api.read_namespaced_pod", + new=get_read_pod_mock_containers([1, 1, None, None]), + ) + @mock.patch("kubernetes_asyncio.config.load_kube_config") + async def test_running_log_interval(self, load_kube_config, logging_interval, exp_event): """ - Test that KubernetesPodTrigger fires the correct event if the task was cancelled. + If log interval given, should emit event with running status and last log time. + Otherwise, should make it to second loop and emit "done" event. + For this test we emit container status "running, running not". + The first "running" status gets us out of wait_for_pod_start. + The second "running" will fire a "running" event when logging interval is non-None. When logging + interval is None, the second "running" status will just result in continuation of the loop. And + when in the next loop we get a non-running status, the trigger fires a "done" event. """ - - mock_hook.get_pod.side_effect = CancelledError() - mock_hook.read_logs.return_value = self._mock_pod_result(mock.MagicMock()) - mock_hook.delete_pod.return_value = self._mock_pod_result(mock.MagicMock()) - trigger = KubernetesPodTrigger( - pod_name=POD_NAME, - pod_namespace=NAMESPACE, - base_container_name=BASE_CONTAINER_NAME, - kubernetes_conn_id=CONN_ID, - poll_interval=POLL_INTERVAL, - cluster_context=CLUSTER_CONTEXT, - config_file=CONFIG_FILE, - in_cluster=IN_CLUSTER, - get_logs=GET_LOGS, - startup_timeout=STARTUP_TIMEOUT_SECS, - trigger_start_time=TRIGGER_START_TIME, - on_finish_action="delete_succeeded_pod", - ) - - generator = trigger.run() - actual = await generator.asend(None) - assert ( - TriggerEvent( - { - "name": POD_NAME, - "namespace": NAMESPACE, - "status": "cancelled", - "message": "Pod execution was cancelled", - } - ) - == actual + pod_name=mock.ANY, + pod_namespace=mock.ANY, + trigger_start_time=mock.ANY, + base_container_name=mock.ANY, + startup_timeout=5, + poll_interval=1, + logging_interval=logging_interval, + last_log_time=DateTime(2022, 1, 1), ) - assert "Outputting container logs..." in caplog.text - assert "Deleting pod..." not in caplog.text + assert await trigger.run().__anext__() == TriggerEvent(exp_event) @pytest.mark.parametrize( "container_state, expected_state", @@ -340,12 +306,12 @@ def test_define_container_state_should_execute_successfully( @pytest.mark.asyncio @pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED]) - @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_timeout_event( self, mock_hook, mock_method, trigger, caplog, container_state ): - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) + trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(seconds=5) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -359,15 +325,4 @@ async def test_run_loop_return_timeout_event( generator = trigger.run() actual = await generator.asend(None) - assert ( - TriggerEvent( - { - "name": POD_NAME, - "namespace": NAMESPACE, - "status": "timeout", - "message": f"Pod took longer than {STARTUP_TIMEOUT_SECS} seconds to start." - " Check the pod events in kubernetes to determine why.", - } - ) - == actual - ) + assert actual == TriggerEvent({"status": "done", "namespace": NAMESPACE, "pod_name": POD_NAME}) diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index ec31b2bcc47f0..ca7b7ba3588fd 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -20,7 +20,7 @@ import asyncio import datetime import logging -from asyncio import CancelledError, Future +from asyncio import Future from unittest import mock import pytest @@ -118,10 +118,9 @@ async def test_run_loop_return_success_event_should_execute_successfully( expected_event = TriggerEvent( { - "name": POD_NAME, + "pod_name": POD_NAME, "namespace": NAMESPACE, - "status": "success", - "message": "All containers inside pod have started successfully.", + "status": "done", } ) actual_event = await trigger.run().asend(None) @@ -145,10 +144,9 @@ async def test_run_loop_return_failed_event_should_execute_successfully( expected_event = TriggerEvent( { - "name": POD_NAME, + "pod_name": POD_NAME, "namespace": NAMESPACE, - "status": "failed", - "message": FAILED_RESULT_MSG, + "status": "done", } ) actual_event = await trigger.run().asend(None) @@ -156,14 +154,18 @@ async def test_run_loop_return_failed_event_should_execute_successfully( assert actual_event == expected_event @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") + @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") + @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_waiting_event_should_execute_successfully( - self, mock_hook, mock_method, trigger, caplog + self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) - mock_method.return_value = ContainerState.WAITING + mock_method.return_value = ContainerState.RUNNING + mock_container_is_running.return_value = True + trigger.logging_interval = 10 caplog.set_level(logging.INFO) task = asyncio.create_task(trigger.run().__anext__()) @@ -174,12 +176,15 @@ async def test_run_loop_return_waiting_event_should_execute_successfully( assert f"Sleeping for {POLL_INTERVAL} seconds." @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") + @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") + @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_running_event_should_execute_successfully( - self, mock_hook, mock_method, trigger, caplog + self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_container_is_running.return_value = True mock_method.return_value = ContainerState.RUNNING caplog.set_level(logging.INFO) @@ -203,14 +208,9 @@ async def test_logging_in_trigger_when_exception_should_execute_successfully( generator = trigger.run() actual = await generator.asend(None) - actual_stack_trace = actual.payload.pop("stack_trace") - assert ( - TriggerEvent( - {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", "message": "Test exception"} - ) - == actual - ) - assert actual_stack_trace.startswith("Traceback (most recent call last):") + + actual_stack_trace = actual.payload.pop("description") + assert actual_stack_trace.startswith("Trigger GKEStartPodTrigger failed with exception Exception") @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") @@ -229,34 +229,6 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( await generator.asend(None) assert "Container logs:" - @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_GKE_PATH}.hook") - async def test_logging_in_trigger_when_cancelled_should_execute_successfully( - self, mock_hook, trigger, caplog - ): - """ - Test that GKEStartPodTrigger fires the correct event in case if the task was cancelled. - """ - mock_hook.get_pod.side_effect = CancelledError() - mock_hook.read_logs.return_value = self._mock_pod_result(mock.MagicMock()) - mock_hook.delete_pod.return_value = self._mock_pod_result(mock.MagicMock()) - - generator = trigger.run() - actual = await generator.asend(None) - assert ( - TriggerEvent( - { - "name": POD_NAME, - "namespace": NAMESPACE, - "status": "cancelled", - "message": "Pod execution was cancelled", - } - ) - == actual - ) - assert "Outputting container logs..." in caplog.text - assert "Deleting pod..." in caplog.text - @pytest.mark.parametrize( "container_state, expected_state", [ diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py b/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py index 0b0220f822178..881bfd61f7c8d 100644 --- a/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py +++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_async.py @@ -131,6 +131,34 @@ deferrable=True, ) + # [START howto_operator_async_log] + kubernetes_task_async_log = KubernetesPodOperator( + task_id="kubernetes_task_async_log", + namespace="kubernetes_task_async_log", + in_cluster=False, + name="astro_k8s_test_pod", + image="ubuntu", + cmds=[ + "bash", + "-cx", + ( + "i=0; " + "while [ $i -ne 100 ]; " + "do i=$(($i+1)); " + "echo $i; " + "sleep 1; " + "done; " + "mkdir -p /airflow/xcom/; " + 'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json' + ), + ], + do_xcom_push=True, + deferrable=True, + get_logs=True, + logging_interval=5, + ) + # [END howto_operator_async_log] + # [START howto_operator_k8s_private_image_async] quay_k8s_async = KubernetesPodOperator( task_id="kubernetes_private_img_task_async",