diff --git a/airflow/kubernetes/kubernetes_helper_functions.py b/airflow/kubernetes/kubernetes_helper_functions.py index 6422cb35b42fa..e264ce3589e7f 100644 --- a/airflow/kubernetes/kubernetes_helper_functions.py +++ b/airflow/kubernetes/kubernetes_helper_functions.py @@ -48,12 +48,17 @@ def create_pod_id( dag_id: str | None = None, task_id: str | None = None, *, - max_length: int = 80, + max_length: int = 63, # must be 63 for now, see below unique: bool = True, ) -> str: """ Generates unique pod ID given a dag_id and / or task_id. + Because of the way that the task log handler reads from running k8s executor pods, + we must keep pod name <= 63 characters. The handler gets pod name from ti.hostname. + TI hostname is derived from the container hostname, which is truncated to 63 characters. + We could lift this limit by using label selectors instead of pod name to find the pod. + :param dag_id: DAG ID :param task_id: Task ID :param max_length: max number of characters diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 7af163255950b..a31805f7cd3c7 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -346,6 +346,15 @@ def construct_pod( "pod_id supplied is longer than 253 characters; truncating and adding unique suffix." ) pod_id = add_pod_suffix(pod_name=pod_id, max_len=253) + if len(pod_id) > 63: + # because in task handler we get pod name from ti hostname (which truncates + # pod_id to 63 characters) we won't be able to find the pod unless it is <= 63 characters. + # our code creates pod names shorter than this so this warning should not normally be triggered. + warnings.warn( + "Supplied pod_id is longer than 63 characters. Due to implementation details, the webserver " + "may not be able to stream logs while task is running. Please choose a shorter pod name." + ) + try: image = pod_override_object.spec.containers[0].image # type: ignore if not image: diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 6aafce2f12b60..62d4262a84bdb 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -92,7 +92,7 @@ def _create_pod_id( dag_id: str | None = None, task_id: str | None = None, *, - max_length: int = 80, + max_length: int = 63, unique: bool = True, ) -> str: """ @@ -648,7 +648,9 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: pod = PodGenerator.reconcile_pods(pod_template, pod) if not pod.metadata.name: - pod.metadata.name = _create_pod_id(task_id=self.task_id, unique=self.random_name_suffix) + pod.metadata.name = _create_pod_id( + task_id=self.task_id, unique=self.random_name_suffix, max_length=80 + ) elif self.random_name_suffix: # user has supplied pod name, we're just adding suffix pod.metadata.name = _add_pod_suffix(pod_name=pod.metadata.name) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 5a5b55d8a41a3..6aee75ee335de 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -196,20 +196,6 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No kube_client = get_kube_client() - if len(ti.hostname) >= 63: - # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname - # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards - # on any label of a FQDN. - pod_list = kube_client.list_namespaced_pod(conf.get("kubernetes_executor", "namespace")) - matches = [ - pod.metadata.name - for pod in pod_list.items - if pod.metadata.name.startswith(ti.hostname) - ] - if len(matches) == 1: - if len(matches[0]) > len(ti.hostname): - ti.hostname = matches[0] - log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n" res = kube_client.read_namespaced_pod_log( diff --git a/tests/kubernetes/test_kubernetes_helper_functions.py b/tests/kubernetes/test_kubernetes_helper_functions.py index 43bdab47e1eb1..52a453e2e8f01 100644 --- a/tests/kubernetes/test_kubernetes_helper_functions.py +++ b/tests/kubernetes/test_kubernetes_helper_functions.py @@ -88,12 +88,14 @@ def test_create_pod_id_dag_and_task(self, dag_id, task_id, expected, create_pod_ def test_create_pod_id_dag_too_long_with_suffix(self, create_pod_id): actual = create_pod_id("0" * 254) - assert re.match(r"0{71}-[a-z0-9]{8}", actual) + assert len(actual) == 63 + assert re.match(r"0{54}-[a-z0-9]{8}", actual) assert re.match(pod_name_regex, actual) def test_create_pod_id_dag_too_long_non_unique(self, create_pod_id): actual = create_pod_id("0" * 254, unique=False) - assert re.match(r"0{80}", actual) + assert len(actual) == 63 + assert re.match(r"0{63}", actual) assert re.match(pod_name_regex, actual) @pytest.mark.parametrize("unique", [True, False])