Skip to content

Commit

Permalink
[AIRFLOW-6052] Add TypeHints to kubernetes_pod_operator (apache#6648)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Nov 23, 2019
1 parent 3ac5270 commit a391d5f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 59 deletions.
125 changes: 67 additions & 58 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
# under the License.
"""Executes task in a Kubernetes POD"""
import re
from typing import Dict, List, Optional

import kubernetes.client.models as k8s

from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator, pod_launcher
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod import Resources
from airflow.kubernetes.pod import Port, Resources
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.helpers import validate_key
Expand All @@ -37,36 +44,26 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
simplifies the authorization process.
:param namespace: the namespace to run within kubernetes.
:type namespace: str
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories.
:type image: str
:param namespace: the namespace to run within kubernetes.
:type namespace: str
:param name: name of the pod in which the task will run, will be used to
generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
:type name: str
:param cmds: entrypoint of the container. (templated)
The docker images's entrypoint is used if this is not provided.
:type cmds: list[str]
:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.
:type arguments: list[str]
:param image_pull_policy: Specify a policy to cache or always pull an image.
:type image_pull_policy: str
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param ports: ports for launched pod.
:type ports: list[airflow.kubernetes.pod.Port]
:param volume_mounts: volumeMounts for launched pod.
:type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
:type volumes: list[airflow.kubernetes.volume.Volume]
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
:param name: name of the pod in which the task will run, will be used to
generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
:type name: str
:param env_vars: Environment variables initialized in the container. (templated)
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container.
Expand All @@ -77,11 +74,17 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
:type cluster_context: str
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
:param get_logs: get the stdout of the container as logs of the tasks.
:type get_logs: bool
:param image_pull_policy: Specify a policy to cache or always pull an image.
:type image_pull_policy: str
:param annotations: non-identifying metadata you can attach to the Pod.
Can be a large range of data, and can include characters
that are not permitted by labels.
Can be a large range of data, and can include characters
that are not permitted by labels.
:type annotations: dict
:param resources: A dict containing resources requests and limits.
Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
Expand All @@ -90,15 +93,17 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:type resources: dict
:param affinity: A dict containing a group of affinity scheduling rules.
:type affinity: dict
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
:type config_file: str
:param do_xcom_push: If True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type do_xcom_push: bool
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param service_account_name: Name of the service account
:type service_account_name: str
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted.
If False (default): do nothing, If True: delete the pod
Expand All @@ -110,51 +115,55 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
:param configmaps: A list of configmap names objects that we
want mount as env variables.
:type configmaps: list[str]
:param pod_runtime_info_envs: environment variables about
pod runtime information (ip, namespace, nodeName, podName).
:type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
:param security_context: security options the pod should run with (PodSecurityContext).
:type security_context: dict
:param pod_runtime_info_envs: environment variables about
pod runtime information (ip, namespace, nodeName, podName).
:type pod_runtime_info_envs: list[airflow.kubernetes.pod_runtime_info_env.PodRuntimeInfoEnv]
:param dnspolicy: dnspolicy for the pod.
:type dnspolicy: str
:param full_pod_spec: The complete podSpec
:type full_pod_spec: kubernetes.client.models.V1Pod
:param do_xcom_push: If True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type do_xcom_push: bool
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')

@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
image,
name,
cmds=None,
arguments=None,
ports=None,
volume_mounts=None,
volumes=None,
env_vars=None,
secrets=None,
in_cluster=True,
cluster_context=None,
labels=None,
startup_timeout_seconds=120,
get_logs=True,
image_pull_policy='IfNotPresent',
annotations=None,
resources=None,
affinity=None,
config_file=None,
node_selectors=None,
image_pull_secrets=None,
service_account_name='default',
is_delete_operator_pod=False,
hostnetwork=False,
tolerations=None,
configmaps=None,
security_context=None,
pod_runtime_info_envs=None,
dnspolicy=None,
full_pod_spec=None,
namespace: str,
image: str,
name: str,
cmds: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
ports: Optional[List[Port]] = None,
volume_mounts: Optional[List[VolumeMount]] = None,
volumes: Optional[List[Volume]] = None,
env_vars: Optional[Dict] = None,
secrets: Optional[List[Secret]] = None,
in_cluster: bool = True,
cluster_context: Optional[str] = None,
labels: Optional[Dict] = None,
startup_timeout_seconds: int = 120,
get_logs: bool = True,
image_pull_policy: str = 'IfNotPresent',
annotations: Optional[Dict] = None,
resources: Optional[Dict] = None,
affinity: Optional[Dict] = None,
config_file: Optional[str] = None,
node_selectors: Optional[Dict] = None,
image_pull_secrets: Optional[str] = None,
service_account_name: str = 'default',
is_delete_operator_pod: bool = False,
hostnetwork: bool = False,
tolerations: Optional[List] = None,
configmaps: Optional[List] = None,
security_context: Optional[Dict] = None,
pod_runtime_info_envs: Optional[List[PodRuntimeInfoEnv]] = None,
dnspolicy: Optional[str] = None,
full_pod_spec: Optional[k8s.V1Pod] = None,
*args,
**kwargs):
if kwargs.get('xcom_push') is not None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PodGenerator:
:param configmaps: Any configmap refs to envfrom.
If more than one configmap is required, provide a comma separated list
configmap_a,configmap_b
:type configmaps: str
:type configmaps: List[str]
:param dnspolicy: Specify a dnspolicy for the pod
:type dnspolicy: str
:param pod: The fully specified pod.
Expand Down

0 comments on commit a391d5f

Please sign in to comment.