Skip to content

Commit

Permalink
Keep pod name for k8s executor under 63 characters (apache#28237)
Browse files Browse the repository at this point in the history
Because of the way that the task log handler reads from running k8s executor pods, we must keep pod name <= 63 characters.  The handler gets pod name from ti.hostname. TI hostname is derived from the container hostname, which is truncated to 63 characters. We could lift this limit by using label selectors instead of pod name to find the pod. But for now, easy enough to keep limited to 63.

Since we limit to 63 in the code, we can remove the logic to find the matching pod when length is >= 63.
  • Loading branch information
dstandish authored Dec 8, 2022
1 parent cbfbf8b commit bdc3d2e
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 19 deletions.
7 changes: 6 additions & 1 deletion airflow/kubernetes/kubernetes_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ def create_pod_id(
dag_id: str | None = None,
task_id: str | None = None,
*,
max_length: int = 80,
max_length: int = 63, # must be 63 for now, see below
unique: bool = True,
) -> str:
"""
Generates unique pod ID given a dag_id and / or task_id.
Because of the way that the task log handler reads from running k8s executor pods,
we must keep pod name <= 63 characters. The handler gets pod name from ti.hostname.
TI hostname is derived from the container hostname, which is truncated to 63 characters.
We could lift this limit by using label selectors instead of pod name to find the pod.
:param dag_id: DAG ID
:param task_id: Task ID
:param max_length: max number of characters
Expand Down
9 changes: 9 additions & 0 deletions airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,15 @@ def construct_pod(
"pod_id supplied is longer than 253 characters; truncating and adding unique suffix."
)
pod_id = add_pod_suffix(pod_name=pod_id, max_len=253)
if len(pod_id) > 63:
# because in task handler we get pod name from ti hostname (which truncates
# pod_id to 63 characters) we won't be able to find the pod unless it is <= 63 characters.
# our code creates pod names shorter than this so this warning should not normally be triggered.
warnings.warn(
"Supplied pod_id is longer than 63 characters. Due to implementation details, the webserver "
"may not be able to stream logs while task is running. Please choose a shorter pod name."
)

try:
image = pod_override_object.spec.containers[0].image # type: ignore
if not image:
Expand Down
6 changes: 4 additions & 2 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _create_pod_id(
dag_id: str | None = None,
task_id: str | None = None,
*,
max_length: int = 80,
max_length: int = 63,
unique: bool = True,
) -> str:
"""
Expand Down Expand Up @@ -648,7 +648,9 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
pod = PodGenerator.reconcile_pods(pod_template, pod)

if not pod.metadata.name:
pod.metadata.name = _create_pod_id(task_id=self.task_id, unique=self.random_name_suffix)
pod.metadata.name = _create_pod_id(
task_id=self.task_id, unique=self.random_name_suffix, max_length=80
)
elif self.random_name_suffix:
# user has supplied pod name, we're just adding suffix
pod.metadata.name = _add_pod_suffix(pod_name=pod.metadata.name)
Expand Down
14 changes: 0 additions & 14 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,6 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No

kube_client = get_kube_client()

if len(ti.hostname) >= 63:
# Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
# is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
# on any label of a FQDN.
pod_list = kube_client.list_namespaced_pod(conf.get("kubernetes_executor", "namespace"))
matches = [
pod.metadata.name
for pod in pod_list.items
if pod.metadata.name.startswith(ti.hostname)
]
if len(matches) == 1:
if len(matches[0]) > len(ti.hostname):
ti.hostname = matches[0]

log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"

res = kube_client.read_namespaced_pod_log(
Expand Down
6 changes: 4 additions & 2 deletions tests/kubernetes/test_kubernetes_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ def test_create_pod_id_dag_and_task(self, dag_id, task_id, expected, create_pod_

def test_create_pod_id_dag_too_long_with_suffix(self, create_pod_id):
actual = create_pod_id("0" * 254)
assert re.match(r"0{71}-[a-z0-9]{8}", actual)
assert len(actual) == 63
assert re.match(r"0{54}-[a-z0-9]{8}", actual)
assert re.match(pod_name_regex, actual)

def test_create_pod_id_dag_too_long_non_unique(self, create_pod_id):
actual = create_pod_id("0" * 254, unique=False)
assert re.match(r"0{80}", actual)
assert len(actual) == 63
assert re.match(r"0{63}", actual)
assert re.match(pod_name_regex, actual)

@pytest.mark.parametrize("unique", [True, False])
Expand Down

0 comments on commit bdc3d2e

Please sign in to comment.