Skip to content

Commit

Permalink
Add better debug logging to K8sexec and K8sPodOp (apache#11502)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimberman authored Oct 15, 2020
1 parent 13959df commit eee4e30
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def queue_task_instance(
pool=pool,
pickle_id=pickle_id,
cfg_path=cfg_path)
self.log.debug("created command %s", command_list_to_run)
self.queue_command(
task_instance,
command_list_to_run,
Expand Down
8 changes: 7 additions & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:

def _health_check_kube_watcher(self):
if self.kube_watcher.is_alive():
pass
self.log.debug("KubeJobWatcher alive, continuing")
else:
self.log.error(
'Error while health checking kube watcher process. '
Expand Down Expand Up @@ -368,6 +368,7 @@ def run_next(self, next_job: KubernetesJobType) -> None:
def delete_pod(self, pod_id: str, namespace: str) -> None:
"""Deletes POD"""
try:
self.log.debug("Deleting pod %s in namespace %s", pod_id, namespace)
self.kube_client.delete_namespaced_pod(
pod_id, namespace, body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
**self.kube_config.kube_client_request_args)
Expand All @@ -385,11 +386,13 @@ def sync(self) -> None:
:return:
"""
self.log.debug("Syncing KubernetesExecutor")
self._health_check_kube_watcher()
while True:
try:
task = self.watcher_queue.get_nowait()
try:
self.log.debug("Processing task %s", task)
self.process_watcher_task(task)
finally:
self.watcher_queue.task_done()
Expand All @@ -409,6 +412,7 @@ def process_watcher_task(self, task: KubernetesWatchType) -> None:
self.result_queue.put((key, state, pod_id, namespace, resource_version))

def _annotations_to_key(self, annotations: Dict[str, str]) -> Optional[TaskInstanceKey]:
self.log.debug("Creating task key for annotations %s", annotations)
dag_id = annotations['dag_id']
task_id = annotations['task_id']
try_number = int(annotations['try_number'])
Expand Down Expand Up @@ -519,6 +523,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
proper support
for State.LAUNCHED
"""
self.log.debug("Clearing tasks that have not been launched")
if not self.kube_client:
raise AirflowException(NOT_STARTED_MESSAGE)
queued_tasks = session \
Expand All @@ -531,6 +536,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:

for task in queued_tasks:
# pylint: disable=protected-access
self.log.debug("Checking task %s", task)
dict_string = (
"dag_id={},task_id={},execution_date={},airflow-worker={}".format(
pod_generator.make_safe_label_value(task.dag_id),
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
will supersede all other values.
"""
self.log.debug("Creating pod for K8sPodOperator task %s", self.task_id)
if self.pod_template_file:
self.log.debug("Pod template file found, will parse for base pod")
pod_template = pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)
else:
pod_template = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="name"))
Expand Down Expand Up @@ -405,8 +407,10 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
pod = PodGenerator.reconcile_pods(pod_template, pod)

for secret in self.secrets:
self.log.debug("Adding secret to task %s", self.task_id)
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = PodGenerator.add_xcom_sidecar(pod)
return pod

Expand All @@ -421,6 +425,7 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
if not (self.full_pod_spec or self.pod_template_file):
# Add Airflow Version to the label
# And a label to identify that pod is launched by KubernetesPodOperator
self.log.debug("Adding k8spodoperator labels to pod before launch for task %s", self.task_id)
self.labels.update(
{
'airflow_version': airflow_version.replace('+', '-'),
Expand All @@ -442,6 +447,7 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
raise
finally:
if self.is_delete_operator_pod:
self.log.debug("Deleting pod for task %s", self.task_id)
launcher.delete_pod(self.pod)
return final_state, self.pod, result

Expand Down

0 comments on commit eee4e30

Please sign in to comment.