Skip to content

Commit

Permalink
Make live logs reading work for "other" k8s executors (apache#28213)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish authored Dec 8, 2022
1 parent 1eaedc8 commit cbfbf8b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
21 changes: 19 additions & 2 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,24 @@ def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
def _read_grouped_logs(self):
return False

@staticmethod
def _should_check_k8s(queue):
"""
If the task is running through kubernetes executor, return True.
When logs aren't available locally, in this case we read from k8s pod logs.
"""
executor = conf.get("core", "executor")
if executor == "KubernetesExecutor":
return True
elif executor == "LocalKubernetesExecutor":
if queue == conf.get("local_kubernetes_executor", "kubernetes_queue"):
return True
elif executor == "CeleryKubernetesExecutor":
if queue == conf.get("celery_kubernetes_executor", "kubernetes_queue"):
return True
return False

def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):
"""
Template method that contains custom logic of reading
Expand Down Expand Up @@ -163,7 +181,6 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
location = os.path.join(self.local_base, log_relative_path)

log = ""

if os.path.exists(location):
try:
with open(location, encoding="utf-8", errors="surrogateescape") as file:
Expand All @@ -173,7 +190,7 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
log = f"*** Failed to load local log file: {location}\n"
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
elif conf.get("core", "executor") == "KubernetesExecutor":
elif self._should_check_k8s(ti.queue):
try:
from airflow.kubernetes.kube_client import get_kube_client

Expand Down
36 changes: 36 additions & 0 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import logging.config
import os
import re
from unittest.mock import patch

import pytest

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models import DAG, DagRun, TaskInstance
Expand Down Expand Up @@ -264,3 +267,36 @@ def test_log_retrieval_valid(self, create_task_instance):
log_url_ti.hostname = "hostname"
url = FileTaskHandler._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH")
assert url == "http://hostname:8793/log/DYNAMIC_PATH"


@pytest.mark.parametrize(
"config, queue, expected",
[
(dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), None, False),
(dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), "kubernetes", False),
(dict(AIRFLOW__CORE__EXECUTOR="KubernetesExecutor"), None, True),
(dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "any", False),
(dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "kubernetes", True),
(
dict(
AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor",
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
),
"hithere",
True,
),
(dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "any", False),
(dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "kubernetes", True),
(
dict(
AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor",
AIRFLOW__LOCAL_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
),
"hithere",
True,
),
],
)
def test__should_check_k8s(config, queue, expected):
with patch.dict("os.environ", **config):
assert FileTaskHandler._should_check_k8s(queue) == expected

0 comments on commit cbfbf8b

Please sign in to comment.