From f23170c9dd23556a40bd07b5d24f06220eec15c4 Mon Sep 17 00:00:00 2001 From: "D. Ferruzzi" Date: Mon, 16 Oct 2023 00:49:12 -0700 Subject: [PATCH] D401 Support - A thru Common (Inclusive) (#34934) --- .../providers/amazon/aws/utils/suppress.py | 2 +- airflow/providers/asana/hooks/asana.py | 2 +- .../celery/executors/celery_executor.py | 9 ++-- .../celery/executors/celery_executor_utils.py | 6 +-- .../executors/celery_kubernetes_executor.py | 7 +-- airflow/providers/cloudant/hooks/cloudant.py | 4 +- .../backcompat/backwards_compat_converters.py | 25 +++++---- .../executors/kubernetes_executor.py | 9 ++-- .../executors/kubernetes_executor_utils.py | 8 +-- .../executors/local_kubernetes_executor.py | 7 +-- .../cncf/kubernetes/hooks/kubernetes.py | 32 ++++++------ .../providers/cncf/kubernetes/kube_client.py | 4 +- .../kubernetes/kubernetes_helper_functions.py | 2 +- .../cncf/kubernetes/operators/pod.py | 14 ++--- .../cncf/kubernetes/pod_generator.py | 8 +-- .../kubernetes/pod_generator_deprecated.py | 6 +-- .../kubernetes/pod_launcher_deprecated.py | 20 +++---- .../kubernetes/python_kubernetes_script.py | 4 +- airflow/providers/cncf/kubernetes/secret.py | 8 +-- .../providers/cncf/kubernetes/triggers/pod.py | 4 +- .../cncf/kubernetes/utils/pod_manager.py | 52 +++++++++---------- .../cncf/kubernetes/utils/xcom_sidecar.py | 2 +- airflow/providers/common/sql/hooks/sql.py | 51 +++++++++--------- airflow/providers/common/sql/operators/sql.py | 6 ++- 24 files changed, 152 insertions(+), 140 deletions(-) diff --git a/airflow/providers/amazon/aws/utils/suppress.py b/airflow/providers/amazon/aws/utils/suppress.py index 6ef282c10448a..908106f0c3165 100644 --- a/airflow/providers/amazon/aws/utils/suppress.py +++ b/airflow/providers/amazon/aws/utils/suppress.py @@ -41,7 +41,7 @@ def return_on_error(return_value: RT): """ - Helper decorator which suppress any ``Exception`` raised in decorator function. + Suppress any ``Exception`` raised in decorator function. Main use-case when functional is optional, however any error on functions/methods might raise any error which are subclass of ``Exception``. diff --git a/airflow/providers/asana/hooks/asana.py b/airflow/providers/asana/hooks/asana.py index 3bbb40eeb0893..ec9360b27e01d 100644 --- a/airflow/providers/asana/hooks/asana.py +++ b/airflow/providers/asana/hooks/asana.py @@ -96,7 +96,7 @@ def client(self) -> Client: def create_task(self, task_name: str, params: dict | None) -> dict: """ - Creates an Asana task. + Create an Asana task. :param task_name: Name of the new task :param params: Other task attributes, such as due_on, parent, and notes. For a complete list diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index ef0789107913d..f4aa2a9b75df0 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -340,14 +340,14 @@ def sync(self) -> None: self.update_all_task_states() def debug_dump(self) -> None: - """Called in response to SIGUSR2 by the scheduler.""" + """Debug dump; called in response to SIGUSR2 by the scheduler.""" super().debug_dump() self.log.info( "executor.tasks (%d)\n\t%s", len(self.tasks), "\n\t".join(map(repr, self.tasks.items())) ) def update_all_task_states(self) -> None: - """Updates states of the tasks.""" + """Update states of the tasks.""" self.log.debug("Inquiring about %s celery task(s)", len(self.tasks)) state_and_info_by_celery_task_id = self.bulk_state_fetcher.get_many(self.tasks.values()) @@ -362,7 +362,7 @@ def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None self.tasks.pop(key, None) def update_task_state(self, key: TaskInstanceKey, state: str, info: Any) -> None: - """Updates state of a single task.""" + """Update state of a single task.""" try: if state == celery_states.SUCCESS: self.success(key, info) @@ -483,7 +483,8 @@ def get_cli_commands() -> list[GroupCommand]: def _get_parser() -> argparse.ArgumentParser: - """This method is used by Sphinx to generate documentation. + """ + Generate documentation; used by Sphinx. :meta private: """ diff --git a/airflow/providers/celery/executors/celery_executor_utils.py b/airflow/providers/celery/executors/celery_executor_utils.py index 90cbc2f57c54d..01be860b92288 100644 --- a/airflow/providers/celery/executors/celery_executor_utils.py +++ b/airflow/providers/celery/executors/celery_executor_utils.py @@ -109,7 +109,7 @@ def on_celery_import_modules(*args, **kwargs): @app.task def execute_command(command_to_exec: CommandType) -> None: - """Executes command.""" + """Execute command.""" dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command_to_exec) celery_task_id = app.current_task.request.id log.info("[%s] Executing command in Celery: %s", celery_task_id, command_to_exec) @@ -192,7 +192,7 @@ def __init__(self, exception: Exception, exception_traceback: str): def send_task_to_executor( task_tuple: TaskInstanceInCelery, ) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]: - """Sends task to executor.""" + """Send task to executor.""" key, command, queue, task_to_run = task_tuple try: with timeout(seconds=OPERATION_TIMEOUT): @@ -243,7 +243,7 @@ def _tasks_list_to_task_ids(self, async_tasks) -> set[str]: return {a.task_id for a in async_tasks} def get_many(self, async_results) -> Mapping[str, EventBufferValueType]: - """Gets status for many Celery tasks using the best method available.""" + """Get status for many Celery tasks using the best method available.""" if isinstance(app.backend, BaseKeyValueStoreBackend): result = self._get_many_from_kv_backend(async_results) elif isinstance(app.backend, DatabaseBackend): diff --git a/airflow/providers/celery/executors/celery_kubernetes_executor.py b/airflow/providers/celery/executors/celery_kubernetes_executor.py index 0dff33e957b93..e981e75fa15e5 100644 --- a/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -169,7 +169,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li def has_task(self, task_instance: TaskInstance) -> bool: """ - Checks if a task is either queued or running in either celery or kubernetes executor. + Check if a task is either queued or running in either celery or kubernetes executor. :param task_instance: TaskInstance :return: True if the task is known to this executor @@ -243,14 +243,15 @@ def _router(self, simple_task_instance: SimpleTaskInstance) -> CeleryExecutor | return self.celery_executor def debug_dump(self) -> None: - """Called in response to SIGUSR2 by the scheduler.""" + """Debug dump; called in response to SIGUSR2 by the scheduler.""" self.log.info("Dumping CeleryExecutor state") self.celery_executor.debug_dump() self.log.info("Dumping KubernetesExecutor state") self.kubernetes_executor.debug_dump() def send_callback(self, request: CallbackRequest) -> None: - """Sends callback for execution. + """ + Send callback for execution. :param request: Callback request to be executed. """ diff --git a/airflow/providers/cloudant/hooks/cloudant.py b/airflow/providers/cloudant/hooks/cloudant.py index 86a918f7a5e09..f73b573ae1a84 100644 --- a/airflow/providers/cloudant/hooks/cloudant.py +++ b/airflow/providers/cloudant/hooks/cloudant.py @@ -42,7 +42,7 @@ class CloudantHook(BaseHook): @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["port", "extra"], "relabeling": {"host": "Account", "login": "Username (or API Key)", "schema": "Database"}, @@ -54,7 +54,7 @@ def __init__(self, cloudant_conn_id: str = default_conn_name) -> None: def get_conn(self) -> cloudant: """ - Opens a connection to the cloudant service and closes it automatically if used as context manager. + Open a connection to the cloudant service and close it automatically if used as context manager. .. note:: In the connection form: diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py index b639bfccfa59f..bb1e49ecbb7f1 100644 --- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py +++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py @@ -43,7 +43,8 @@ def _convert_from_dict(obj, new_class): def convert_volume(volume) -> k8s.V1Volume: - """Converts an airflow Volume object into a k8s.V1Volume. + """ + Convert an airflow Volume object into a k8s.V1Volume. :param volume: """ @@ -51,7 +52,8 @@ def convert_volume(volume) -> k8s.V1Volume: def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: - """Converts an airflow VolumeMount object into a k8s.V1VolumeMount. + """ + Convert an airflow VolumeMount object into a k8s.V1VolumeMount. :param volume_mount: """ @@ -59,7 +61,8 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: def convert_port(port) -> k8s.V1ContainerPort: - """Converts an airflow Port object into a k8s.V1ContainerPort. + """ + Convert an airflow Port object into a k8s.V1ContainerPort. :param port: """ @@ -67,7 +70,8 @@ def convert_port(port) -> k8s.V1ContainerPort: def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]: - """Converts a dictionary into a list of env_vars. + """ + Convert a dictionary into a list of env_vars. :param env_vars: """ @@ -83,7 +87,8 @@ def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]: def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar: - """Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar. + """ + Convert a PodRuntimeInfoEnv into an k8s.V1EnvVar. :param pod_runtime_info_envs: """ @@ -91,7 +96,8 @@ def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar: def convert_image_pull_secrets(image_pull_secrets) -> list[k8s.V1LocalObjectReference]: - """Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar. + """ + Convert a PodRuntimeInfoEnv into an k8s.V1EnvVar. :param image_pull_secrets: """ @@ -103,7 +109,8 @@ def convert_image_pull_secrets(image_pull_secrets) -> list[k8s.V1LocalObjectRefe def convert_configmap(configmaps) -> k8s.V1EnvFromSource: - """Converts a str into an k8s.V1EnvFromSource. + """ + Convert a str into an k8s.V1EnvFromSource. :param configmaps: """ @@ -111,10 +118,10 @@ def convert_configmap(configmaps) -> k8s.V1EnvFromSource: def convert_affinity(affinity) -> k8s.V1Affinity: - """Converts a dict into an k8s.V1Affinity.""" + """Convert a dict into an k8s.V1Affinity.""" return _convert_from_dict(affinity, k8s.V1Affinity) def convert_toleration(toleration) -> k8s.V1Toleration: - """Converts a dict into an k8s.V1Toleration.""" + """Convert a dict into an k8s.V1Toleration.""" return _convert_from_dict(toleration, k8s.V1Toleration) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 40ac4a2231038..c6787910d2008 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -269,7 +269,7 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non ) def start(self) -> None: - """Starts the executor.""" + """Start the executor.""" self.log.info("Start Kubernetes executor") self.scheduler_job_id = str(self.job_id) self.log.debug("Start with scheduler_job_id: %s", self.scheduler_job_id) @@ -302,7 +302,7 @@ def execute_async( queue: str | None = None, executor_config: Any | None = None, ) -> None: - """Executes task asynchronously.""" + """Execute task asynchronously.""" if TYPE_CHECKING: assert self.task_queue @@ -689,7 +689,7 @@ def _flush_result_queue(self) -> None: self.result_queue.task_done() def end(self) -> None: - """Called when the executor shuts down.""" + """Shut down the executor.""" if TYPE_CHECKING: assert self.task_queue assert self.result_queue @@ -725,7 +725,8 @@ def get_cli_commands() -> list[GroupCommand]: def _get_parser() -> argparse.ArgumentParser: - """This method is used by Sphinx to generate documentation. + """ + Generate documentation; used by Sphinx. :meta private: """ diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 3277d3e60fcf4..074b65d198f6c 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -101,7 +101,7 @@ def __init__( self.kube_config = kube_config def run(self) -> None: - """Performs watching.""" + """Perform watching.""" if TYPE_CHECKING: assert self.scheduler_job_id @@ -302,7 +302,7 @@ def __init__( self.kube_watchers = self._make_kube_watchers() def run_pod_async(self, pod: k8s.V1Pod, **kwargs): - """Runs POD asynchronously.""" + """Run POD asynchronously.""" sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod) json_pod = json.dumps(sanitized_pod, indent=2) @@ -407,7 +407,7 @@ def run_next(self, next_job: KubernetesJobType) -> None: self.log.debug("Kubernetes Job created!") def delete_pod(self, pod_name: str, namespace: str) -> None: - """Deletes Pod from a namespace. Does not raise if it does not exist.""" + """Delete Pod from a namespace; does not raise if it does not exist.""" try: self.log.debug("Deleting pod %s in namespace %s", pod_name, namespace) self.kube_client.delete_namespaced_pod( @@ -435,7 +435,7 @@ def patch_pod_executor_done(self, *, pod_name: str, namespace: str): def sync(self) -> None: """ - Checks the status of all currently running kubernetes jobs. + Check the status of all currently running kubernetes jobs. If a job is completed, its status is placed in the result queue to be sent back to the scheduler. """ diff --git a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index fb268a94471d4..8c948b0d64969 100644 --- a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -156,7 +156,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li def has_task(self, task_instance: TaskInstance) -> bool: """ - Checks if a task is either queued or running in either local or kubernetes executor. + Check if a task is either queued or running in either local or kubernetes executor. :param task_instance: TaskInstance :return: True if the task is known to this executor @@ -226,14 +226,15 @@ def _router(self, simple_task_instance: SimpleTaskInstance) -> LocalExecutor | K return self.local_executor def debug_dump(self) -> None: - """Called in response to SIGUSR2 by the scheduler.""" + """Debug dump; called in response to SIGUSR2 by the scheduler.""" self.log.info("Dumping LocalExecutor state") self.local_executor.debug_dump() self.log.info("Dumping KubernetesExecutor state") self.kubernetes_executor.debug_dump() def send_callback(self, request: CallbackRequest) -> None: - """Sends callback for execution. + """ + Send callback for execution. :param request: Callback request to be executed. """ diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 8e2989da067c2..d64e9d680ff93 100644 --- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -87,7 +87,7 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol): @staticmethod def get_connection_form_widgets() -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import BooleanField, StringField @@ -112,7 +112,7 @@ def get_connection_form_widgets() -> dict[str, Any]: @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["host", "schema", "login", "password", "port", "extra"], "relabeling": {}, @@ -171,7 +171,7 @@ def conn_extras(self): def _get_field(self, field_name): """ - Handles backcompat for extra fields. + Handle backcompat for extra fields. Prior to Airflow 2.3, in order to make use of UI customizations for extra fields, we needed to store them with the prefix ``extra__kubernetes__``. This method @@ -188,7 +188,7 @@ def _get_field(self, field_name): return self.conn_extras.get(prefixed_name) or None def get_conn(self) -> client.ApiClient: - """Returns kubernetes api session for use with requests.""" + """Return kubernetes api session for use with requests.""" in_cluster = self._coalesce_param(self.in_cluster, self._get_field("in_cluster")) cluster_context = self._coalesce_param(self.cluster_context, self._get_field("cluster_context")) kubeconfig_path = self._coalesce_param(self.config_file, self._get_field("kube_config_path")) @@ -289,7 +289,7 @@ def create_custom_object( self, group: str, version: str, plural: str, body: str | dict, namespace: str | None = None ): """ - Creates custom resource definition object in Kubernetes. + Create custom resource definition object in Kubernetes. :param group: api group :param version: api version @@ -360,17 +360,17 @@ def delete_custom_object( ) def get_namespace(self) -> str | None: - """Returns the namespace that defined in the connection.""" + """Return the namespace that defined in the connection.""" if self.conn_id: return self._get_field("namespace") return None def get_xcom_sidecar_container_image(self): - """Returns the xcom sidecar image that defined in the connection.""" + """Return the xcom sidecar image that defined in the connection.""" return self._get_field("xcom_sidecar_container_image") def get_xcom_sidecar_container_resources(self): - """Returns the xcom sidecar resources that defined in the connection.""" + """Return the xcom sidecar resources that defined in the connection.""" field = self._get_field("xcom_sidecar_container_resources") if not field: return None @@ -383,7 +383,7 @@ def get_pod_log_stream( namespace: str | None = None, ) -> tuple[watch.Watch, Generator[str, None, None]]: """ - Retrieves a log stream for a container in a kubernetes pod. + Retrieve a log stream for a container in a kubernetes pod. :param pod_name: pod name :param container: container name @@ -407,7 +407,7 @@ def get_pod_logs( namespace: str | None = None, ): """ - Retrieves a container's log from the specified pod. + Retrieve a container's log from the specified pod. :param pod_name: pod name :param container: container name @@ -435,7 +435,7 @@ def get_namespaced_pod_list( **kwargs, ): """ - Retrieves a list of Kind pod which belong default kubernetes namespace. + Retrieve a list of Kind pod which belong default kubernetes namespace. :param label_selector: A selector to restrict the list of returned objects by their labels :param namespace: kubernetes namespace @@ -451,7 +451,7 @@ def get_namespaced_pod_list( def _get_bool(val) -> bool | None: - """Converts val to bool if can be done with certainty; if we cannot infer intention we return None.""" + """Convert val to bool if can be done with certainty; if we cannot infer intention we return None.""" if isinstance(val, bool): return val elif isinstance(val, str): @@ -470,7 +470,7 @@ def __init__(self, *args, **kwargs): self._extras: dict | None = None async def _load_config(self): - """Returns Kubernetes API session for use with requests.""" + """Return Kubernetes API session for use with requests.""" in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster")) cluster_context = self._coalesce_param(self.cluster_context, await self._get_field("cluster_context")) kubeconfig_path = self._coalesce_param(self.config_file, await self._get_field("kube_config_path")) @@ -555,7 +555,7 @@ async def get_conn(self) -> async_client.ApiClient: async def get_pod(self, name: str, namespace: str) -> V1Pod: """ - Gets pod's object. + Get pod's object. :param name: Name of the pod. :param namespace: Name of the pod's namespace. @@ -570,7 +570,7 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: async def delete_pod(self, name: str, namespace: str): """ - Deletes pod's object. + Delete pod's object. :param name: Name of the pod. :param namespace: Name of the pod's namespace. @@ -588,7 +588,7 @@ async def delete_pod(self, name: str, namespace: str): async def read_logs(self, name: str, namespace: str): """ - Reads logs inside the pod while starting containers inside. + Read logs inside the pod while starting containers inside. All the logs will be outputted with its timestamp to track the logs after the execution of the pod is completed. The diff --git a/airflow/providers/cncf/kubernetes/kube_client.py b/airflow/providers/cncf/kubernetes/kube_client.py index b9dec69402ef4..e6d79f39a694e 100644 --- a/airflow/providers/cncf/kubernetes/kube_client.py +++ b/airflow/providers/cncf/kubernetes/kube_client.py @@ -52,7 +52,7 @@ def _disable_verify_ssl() -> None: def _enable_tcp_keepalive() -> None: """ - This function enables TCP keepalive mechanism. + Enable TCP keepalive mechanism. This prevents urllib3 connection to hang indefinitely when idle connection is time-outed on services like cloud load balancers or firewalls. @@ -96,7 +96,7 @@ def get_kube_client( config_file: str | None = None, ) -> client.CoreV1Api: """ - Retrieves Kubernetes client. + Retrieve Kubernetes client. :param in_cluster: whether we are in cluster :param cluster_context: context of the cluster diff --git a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py index b54519fe27339..0a0bb9b582622 100644 --- a/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py +++ b/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py @@ -63,7 +63,7 @@ def create_pod_id( unique: bool = True, ) -> str: """ - Generates unique pod ID given a dag_id and / or task_id. + Generate unique pod ID given a dag_id and / or task_id. The default of 80 for max length is somewhat arbitrary, mainly a balance between content and not overwhelming terminal windows of reasonable width. The true diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index efb51366fb3f6..d8b01b155b84b 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -110,7 +110,7 @@ def _create_pod_id( unique: bool = True, ) -> str: """ - Generates unique pod ID given a dag_id and / or task_id. + Generate unique pod ID given a dag_id and / or task_id. TODO: when min airflow version >= 2.5, delete this function and import from kubernetes_helper_functions. @@ -525,7 +525,7 @@ def client(self) -> CoreV1Api: return self.hook.core_v1_client def find_pod(self, namespace: str, context: Context, *, exclude_checked: bool = True) -> k8s.V1Pod | None: - """Returns an already-running pod for this task instance if one exists.""" + """Return an already-running pod for this task instance if one exists.""" label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked) pod_list = self.client.list_namespaced_pod( namespace=namespace, @@ -564,7 +564,7 @@ def await_pod_start(self, pod: k8s.V1Pod): raise def extract_xcom(self, pod: k8s.V1Pod): - """Retrieves xcom value and kills xcom sidecar container.""" + """Retrieve xcom value and kill xcom sidecar container.""" result = self.pod_manager.extract_xcom(pod) if isinstance(result, str) and result.rstrip() == "__airflow_xcom_result_empty__": self.log.info("xcom result file is empty.") @@ -637,7 +637,7 @@ def execute_async(self, context: Context): self.invoke_defer_method() def invoke_defer_method(self): - """Method to easily redefine triggers which are being used in child classes.""" + """Redefine triggers which are being used in child classes.""" trigger_start_time = utcnow() self.defer( trigger=KubernetesPodTrigger( @@ -771,7 +771,7 @@ def _read_pod_events(self, pod, *, reraise=True): self.log.error("Pod Event: %s - %s", event.reason, event.message) def is_istio_enabled(self, pod: V1Pod) -> bool: - """Checks if istio is enabled for the namespace of the pod by inspecting the namespace labels.""" + """Check if istio is enabled for the namespace of the pod by inspecting the namespace labels.""" if not pod: return False @@ -867,7 +867,7 @@ def on_kill(self) -> None: def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: """ - Returns V1Pod object based on pod template file, full pod spec, and other operator parameters. + Return V1Pod object based on pod template file, full pod spec, and other operator parameters. The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod template file. @@ -973,7 +973,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: def dry_run(self) -> None: """ - Prints out the pod definition that would be created by this operator. + Print out the pod definition that would be created by this operator. Does not include labels specific to the task instance (since there isn't one in a dry_run) and excludes all empty elements. diff --git a/airflow/providers/cncf/kubernetes/pod_generator.py b/airflow/providers/cncf/kubernetes/pod_generator.py index e6d5e316b6401..581aa61d16e22 100644 --- a/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/airflow/providers/cncf/kubernetes/pod_generator.py @@ -152,7 +152,7 @@ def __init__( self.extract_xcom = extract_xcom def gen_pod(self) -> k8s.V1Pod: - """Generates pod.""" + """Generate pod.""" warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning) result = self.ud_pod @@ -165,7 +165,7 @@ def gen_pod(self) -> k8s.V1Pod: @staticmethod def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: - """Adds sidecar.""" + """Add sidecar.""" warnings.warn( "This function is deprecated. " "Please use airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead" @@ -181,7 +181,7 @@ def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: @staticmethod def from_obj(obj) -> dict | k8s.V1Pod | None: - """Converts to pod from obj.""" + """Convert to pod from obj.""" if obj is None: return None @@ -215,7 +215,7 @@ def from_obj(obj) -> dict | k8s.V1Pod | None: @staticmethod def from_legacy_obj(obj) -> k8s.V1Pod | None: - """Converts to pod from obj.""" + """Convert to pod from obj.""" if obj is None: return None diff --git a/airflow/providers/cncf/kubernetes/pod_generator_deprecated.py b/airflow/providers/cncf/kubernetes/pod_generator_deprecated.py index 8d64e96d6c400..1b528d644b219 100644 --- a/airflow/providers/cncf/kubernetes/pod_generator_deprecated.py +++ b/airflow/providers/cncf/kubernetes/pod_generator_deprecated.py @@ -217,7 +217,7 @@ def __init__( self.extract_xcom = extract_xcom def gen_pod(self) -> k8s.V1Pod: - """Generates pod.""" + """Generate pod.""" result = None if result is None: @@ -235,7 +235,7 @@ def gen_pod(self) -> k8s.V1Pod: @staticmethod def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: - """Adds sidecar.""" + """Add sidecar.""" pod_cp = copy.deepcopy(pod) pod_cp.spec.volumes = pod.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) @@ -247,7 +247,7 @@ def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: @staticmethod def from_obj(obj) -> k8s.V1Pod | None: - """Converts to pod from obj.""" + """Convert to pod from obj.""" if obj is None: return None diff --git a/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py b/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py index 4ddd523e3474d..18799ed920e71 100644 --- a/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py +++ b/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py @@ -78,7 +78,7 @@ def __init__( extract_xcom: bool = False, ): """ - Deprecated class for launching pods. + Launch pods; DEPRECATED. Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead to create the launcher. @@ -94,7 +94,7 @@ def __init__( self.extract_xcom = extract_xcom def run_pod_async(self, pod: V1Pod, **kwargs): - """Runs pod asynchronously.""" + """Run pod asynchronously.""" pod_mutation_hook(pod) sanitized_pod = self._client.api_client.sanitize_for_serialization(pod) @@ -112,7 +112,7 @@ def run_pod_async(self, pod: V1Pod, **kwargs): return resp def delete_pod(self, pod: V1Pod): - """Deletes pod.""" + """Delete pod.""" try: self._client.delete_namespaced_pod( pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions() @@ -124,7 +124,7 @@ def delete_pod(self, pod: V1Pod): def start_pod(self, pod: V1Pod, startup_timeout: int = 120): """ - Launches the pod synchronously and waits for completion. + Launch the pod synchronously and wait for completion. :param pod: :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task) @@ -141,7 +141,7 @@ def start_pod(self, pod: V1Pod, startup_timeout: int = 120): def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | None]: """ - Monitors a pod and returns the final state. + Monitor a pod and return the final state. :param pod: pod spec that will be monitored :param get_logs: whether to read the logs locally @@ -202,17 +202,17 @@ def _task_status(self, event): return status def pod_not_started(self, pod: V1Pod): - """Tests if pod has not started.""" + """Test if pod has not started.""" state = self._task_status(self.read_pod(pod)) return state == State.QUEUED def pod_is_running(self, pod: V1Pod): - """Tests if pod is running.""" + """Test if pod is running.""" state = self._task_status(self.read_pod(pod)) return state not in (State.SUCCESS, State.FAILED) def base_container_is_running(self, pod: V1Pod): - """Tests if base container is running.""" + """Test if base container is running.""" event = self.read_pod(pod) status = next((s for s in event.status.container_statuses if s.name == "base"), None) if not status: @@ -227,7 +227,7 @@ def read_pod_logs( timestamps: bool = False, since_seconds: int | None = None, ): - """Reads log from the pod.""" + """Read log from the pod.""" additional_kwargs = {} if since_seconds: additional_kwargs["since_seconds"] = since_seconds @@ -250,7 +250,7 @@ def read_pod_logs( @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def read_pod_events(self, pod): - """Reads events from the pod.""" + """Read events from the pod.""" try: return self._client.list_namespaced_event( namespace=pod.metadata.namespace, field_selector=f"involvedObject.name={pod.metadata.name}" diff --git a/airflow/providers/cncf/kubernetes/python_kubernetes_script.py b/airflow/providers/cncf/kubernetes/python_kubernetes_script.py index 01fbfd7561b09..75937287d582e 100644 --- a/airflow/providers/cncf/kubernetes/python_kubernetes_script.py +++ b/airflow/providers/cncf/kubernetes/python_kubernetes_script.py @@ -39,7 +39,7 @@ def _balance_parens(after_decorator): def remove_task_decorator(python_source: str, task_decorator_name: str) -> str: """ - Removes @task.kubernetes or similar as well as @setup and @teardown. + Remove @task.kubernetes or similar as well as @setup and @teardown. :param python_source: python source code :param task_decorator_name: the task decorator name @@ -68,7 +68,7 @@ def write_python_script( render_template_as_native_obj: bool = False, ): """ - Renders the python script to a file to execute in the virtual environment. + Render the python script to a file to execute in the virtual environment. :param jinja_context: The jinja context variables to unpack and replace with its placeholders in the template file. diff --git a/airflow/providers/cncf/kubernetes/secret.py b/airflow/providers/cncf/kubernetes/secret.py index d4fba36f16b50..4e0412743c3e0 100644 --- a/airflow/providers/cncf/kubernetes/secret.py +++ b/airflow/providers/cncf/kubernetes/secret.py @@ -65,7 +65,7 @@ def __init__(self, deploy_type, deploy_target, secret, key=None, items=None): self.key = key def to_env_secret(self) -> k8s.V1EnvVar: - """Stores es environment secret.""" + """Store es environment secret.""" return k8s.V1EnvVar( name=self.deploy_target, value_from=k8s.V1EnvVarSource( @@ -74,11 +74,11 @@ def to_env_secret(self) -> k8s.V1EnvVar: ) def to_env_from_secret(self) -> k8s.V1EnvFromSource: - """Reads from environment to secret.""" + """Read from environment to secret.""" return k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=self.secret)) def to_volume_secret(self) -> tuple[k8s.V1Volume, k8s.V1VolumeMount]: - """Converts to volume secret.""" + """Convert to volume secret.""" vol_id = f"secretvol{uuid.uuid4()}" volume = k8s.V1Volume(name=vol_id, secret=k8s.V1SecretVolumeSource(secret_name=self.secret)) if self.items: @@ -86,7 +86,7 @@ def to_volume_secret(self) -> tuple[k8s.V1Volume, k8s.V1VolumeMount]: return (volume, k8s.V1VolumeMount(mount_path=self.deploy_target, name=vol_id, read_only=True)) def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: - """Attaches to pod.""" + """Attach to pod.""" cp_pod = copy.deepcopy(pod) if self.deploy_type == "volume": diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index a4e7528ee6d2a..d098525c06036 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -116,7 +116,7 @@ def __init__( self._since_time = None def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes KubernetesCreatePodTrigger arguments and classpath.""" + """Serialize KubernetesCreatePodTrigger arguments and classpath.""" return ( "airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger", { @@ -137,7 +137,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current pod status and yields a TriggerEvent.""" + """Get current pod status and yield a TriggerEvent.""" hook = self._get_async_hook() self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace) try: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 65a264eeeeacb..71ba391e53c42 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -100,17 +100,17 @@ def get_pod(self, name: str, namespace: str) -> V1Pod: """Read pod object from kubernetes API.""" def get_namespace(self) -> str | None: - """Returns the namespace that defined in the connection.""" + """Return the namespace that defined in the connection.""" def get_xcom_sidecar_container_image(self) -> str | None: - """Returns the xcom sidecar image that defined in the connection.""" + """Return the xcom sidecar image that defined in the connection.""" def get_xcom_sidecar_container_resources(self) -> str | None: - """Returns the xcom sidecar resources that defined in the connection.""" + """Return the xcom sidecar resources that defined in the connection.""" def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None: - """Retrieves container status.""" + """Retrieve container status.""" container_statuses = pod.status.container_statuses if pod and pod.status else None if container_statuses: # In general the variable container_statuses can store multiple items matching different containers. @@ -123,7 +123,7 @@ def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | def container_is_running(pod: V1Pod, container_name: str) -> bool: """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is running. + Examine V1Pod ``pod`` to determine whether ``container_name`` is running. If that container is present and running, returns True. Returns False otherwise. """ @@ -135,7 +135,7 @@ def container_is_running(pod: V1Pod, container_name: str) -> bool: def container_is_completed(pod: V1Pod, container_name: str) -> bool: """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is completed. + Examine V1Pod ``pod`` to determine whether ``container_name`` is completed. If that container is present and completed, returns True. Returns False otherwise. """ @@ -147,7 +147,7 @@ def container_is_completed(pod: V1Pod, container_name: str) -> bool: def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. + Examine V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. If that container is present and completed and succeeded, returns True. Returns False otherwise. """ @@ -162,7 +162,7 @@ def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: def container_is_terminated(pod: V1Pod, container_name: str) -> bool: """ - Examines V1Pod ``pod`` to determine whether ``container_name`` is terminated. + Examine V1Pod ``pod`` to determine whether ``container_name`` is terminated. If that container is present and terminated, returns True. Returns False otherwise. """ @@ -219,7 +219,7 @@ def __init__( self.read_pod_cache_timeout = read_pod_cache_timeout def __iter__(self) -> Generator[bytes, None, None]: - r"""The generator yields log items divided by the '\n' symbol.""" + r"""Yield log items divided by the '\n' symbol.""" incomplete_log_item: list[bytes] = [] if self.logs_available(): for data_chunk in self.response.stream(amt=None, decode_content=True): @@ -270,7 +270,7 @@ def read_pod(self): @dataclass class PodLoggingStatus: - """Used for returning the status of the pod and last log time when exiting from `fetch_container_logs`.""" + """Return the status of the pod and last log time when exiting from `fetch_container_logs`.""" running: bool last_log_time: DateTime | None @@ -285,7 +285,7 @@ def __init__( progress_callback: Callable[[str], None] | None = None, ): """ - Creates the launcher. + Create the launcher. :param kube_client: kubernetes client :param progress_callback: Callback function invoked when fetching container log. @@ -296,7 +296,7 @@ def __init__( self._watch = watch.Watch() def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod: - """Runs POD asynchronously.""" + """Run POD asynchronously.""" sanitized_pod = self._client.api_client.sanitize_for_serialization(pod) json_pod = json.dumps(sanitized_pod, indent=2) @@ -314,7 +314,7 @@ def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod: return resp def delete_pod(self, pod: V1Pod) -> None: - """Deletes POD.""" + """Delete POD.""" try: self._client.delete_namespaced_pod( pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions() @@ -331,12 +331,12 @@ def delete_pod(self, pod: V1Pod) -> None: retry=tenacity.retry_if_exception(should_retry_start_pod), ) def create_pod(self, pod: V1Pod) -> V1Pod: - """Launches the pod asynchronously.""" + """Launch the pod asynchronously.""" return self.run_pod_async(pod) def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None: """ - Waits for the pod to reach phase other than ``Pending``. + Wait for the pod to reach phase other than ``Pending``. :param pod: :param startup_timeout: Timeout (in seconds) for startup of the pod @@ -376,7 +376,7 @@ def fetch_container_logs( post_termination_timeout: int = 120, ) -> PodLoggingStatus: """ - Follows the logs of container and streams to airflow logging. + Follow the logs of container and stream to airflow logging. Returns when container exits. @@ -399,7 +399,7 @@ def consume_logs( logs: PodLogsConsumer | None, ) -> tuple[DateTime | None, PodLogsConsumer | None]: """ - Tries to follow container logs until container completes. + Try to follow container logs until container completes. For a long-running container, sometimes the log read may be interrupted Such errors of this kind are suppressed. @@ -549,7 +549,7 @@ def fetch_requested_container_logs( def await_container_completion(self, pod: V1Pod, container_name: str) -> None: """ - Waits for the given container in the given pod to be completed. + Wait for the given container in the given pod to be completed. :param pod: pod spec that will be monitored :param container_name: name of the container within the pod to monitor @@ -566,7 +566,7 @@ def await_pod_completion( self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base" ) -> V1Pod: """ - Monitors a pod and returns the final state. + Monitor a pod and return the final state. :param istio_enabled: whether istio is enabled in the namespace :param pod: pod spec that will be monitored @@ -600,12 +600,12 @@ def parse_log_line(self, line: str) -> tuple[DateTime | None, str]: return last_log_time, message def container_is_running(self, pod: V1Pod, container_name: str) -> bool: - """Reads pod and checks if container is running.""" + """Read pod and checks if container is running.""" remote_pod = self.read_pod(pod) return container_is_running(pod=remote_pod, container_name=container_name) def container_is_terminated(self, pod: V1Pod, container_name: str) -> bool: - """Reads pod and checks if container is terminated.""" + """Read pod and checks if container is terminated.""" remote_pod = self.read_pod(pod) return container_is_terminated(pod=remote_pod, container_name=container_name) @@ -620,7 +620,7 @@ def read_pod_logs( follow=True, post_termination_timeout: int = 120, ) -> PodLogsConsumer: - """Reads log from the POD.""" + """Read log from the POD.""" additional_kwargs = {} if since_seconds: additional_kwargs["since_seconds"] = since_seconds @@ -662,7 +662,7 @@ def get_container_names(self, pod: V1Pod) -> list[str]: @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) def read_pod_events(self, pod: V1Pod) -> CoreV1EventList: - """Reads events from the POD.""" + """Read events from the POD.""" try: return self._client.list_namespaced_event( namespace=pod.metadata.namespace, field_selector=f"involvedObject.name={pod.metadata.name}" @@ -689,7 +689,7 @@ def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None: time.sleep(1) def extract_xcom(self, pod: V1Pod) -> str: - """Retrieves XCom value and kills xcom sidecar container.""" + """Retrieve XCom value and kill xcom sidecar container.""" try: result = self.extract_xcom_json(pod) return result @@ -702,7 +702,7 @@ def extract_xcom(self, pod: V1Pod) -> str: reraise=True, ) def extract_xcom_json(self, pod: V1Pod) -> str: - """Retrieves XCom value and also checks if xcom json is valid.""" + """Retrieve XCom value and also check if xcom json is valid.""" with closing( kubernetes_stream( self._client.connect_get_namespaced_pod_exec, @@ -736,7 +736,7 @@ def extract_xcom_json(self, pod: V1Pod) -> str: reraise=True, ) def extract_xcom_kill(self, pod: V1Pod): - """Kills xcom sidecar container.""" + """Kill xcom sidecar container.""" with closing( kubernetes_stream( self._client.connect_get_namespaced_pod_exec, diff --git a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 11a62f9c5d9cb..0fa3b61dd3f98 100644 --- a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -50,7 +50,7 @@ def add_xcom_sidecar( sidecar_container_image: str | None = None, sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, ) -> k8s.V1Pod: - """Adds sidecar.""" + """Add sidecar.""" pod_cp = copy.deepcopy(pod) pod_cp.spec.volumes = pod.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index c3907041928fd..adbb95d784d64 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -50,7 +50,7 @@ def return_single_query_results(sql: str | Iterable[str], return_last: bool, split_statements: bool): """ - Determines when results of single query only should be returned. + Determine when results of single query only should be returned. For compatibility reasons, the behaviour of the DBAPIHook is somewhat confusing. In some cases, when multiple queries are run, the return value will be an iterable (list) of results @@ -77,7 +77,7 @@ def return_single_query_results(sql: str | Iterable[str], return_last: bool, spl def fetch_all_handler(cursor) -> list[tuple] | None: - """Handler for DbApiHook.run() to return results.""" + """Return results for DbApiHook.run().""" if not hasattr(cursor, "description"): raise RuntimeError( "The database we interact with does not support DBAPI 2.0. Use operator and " @@ -90,7 +90,7 @@ def fetch_all_handler(cursor) -> list[tuple] | None: def fetch_one_handler(cursor) -> list[tuple] | None: - """Handler for DbApiHook.run() to return first result.""" + """Return first result for DbApiHook.run().""" if not hasattr(cursor, "description"): raise RuntimeError( "The database we interact with does not support DBAPI 2.0. Use operator and " @@ -103,7 +103,7 @@ def fetch_one_handler(cursor) -> list[tuple] | None: class ConnectorProtocol(Protocol): - """A protocol where you can connect to a database.""" + """Database connection protocol.""" def connect(self, host: str, port: int, username: str, schema: str) -> Any: """ @@ -173,7 +173,7 @@ def __init__(self, *args, schema: str | None = None, log_sql: bool = True, **kwa self.descriptions: list[Sequence[Sequence] | None] = [] def get_conn(self): - """Returns a connection object.""" + """Return a connection object.""" db = self.get_connection(getattr(self, cast(str, self.conn_name_attr))) return self.connector.connect(host=db.host, port=db.port, username=db.login, schema=db.schema) @@ -200,10 +200,9 @@ def get_sqlalchemy_engine(self, engine_kwargs=None): def get_pandas_df(self, sql, parameters: Iterable | Mapping[str, Any] | None = None, **kwargs): """ - Executes the sql and returns a pandas dataframe. + Execute the sql and returns a pandas dataframe. - :param sql: the sql statement to be executed (str) or a list of - sql statements to execute + :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param parameters: The parameters to render the SQL query with. :param kwargs: (optional) passed into pandas.io.sql.read_sql method """ @@ -222,10 +221,9 @@ def get_pandas_df_by_chunks( self, sql, parameters: Iterable | Mapping[str, Any] | None = None, *, chunksize: int | None, **kwargs ): """ - Executes the sql and returns a generator. + Execute the sql and return a generator. - :param sql: the sql statement to be executed (str) or a list of - sql statements to execute + :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param parameters: The parameters to render the SQL query with :param chunksize: number of rows to include in each chunk :param kwargs: (optional) passed into pandas.io.sql.read_sql method @@ -247,7 +245,7 @@ def get_records( parameters: Iterable | Mapping[str, Any] | None = None, ) -> Any: """ - Executes the sql and returns a set of records. + Execute the sql and return a set of records. :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param parameters: The parameters to render the SQL query with. @@ -256,7 +254,7 @@ def get_records( def get_first(self, sql: str | list[str], parameters: Iterable | Mapping[str, Any] | None = None) -> Any: """ - Executes the sql and returns the first resulting row. + Execute the sql and return the first resulting row. :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param parameters: The parameters to render the SQL query with. @@ -270,7 +268,7 @@ def strip_sql_string(sql: str) -> str: @staticmethod def split_sql_string(sql: str) -> list[str]: """ - Splits string into multiple SQL expressions. + Split string into multiple SQL expressions. :param sql: SQL string potentially consisting of multiple expressions :return: list of individual expressions @@ -413,7 +411,7 @@ def run( return results def _run_command(self, cur, sql_statement, parameters): - """Runs a statement using an already open cursor.""" + """Run a statement using an already open cursor.""" if self.log_sql: self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters) @@ -427,7 +425,7 @@ def _run_command(self, cur, sql_statement, parameters): self.log.info("Rows affected: %s", cur.rowcount) def set_autocommit(self, conn, autocommit): - """Sets the autocommit flag on the connection.""" + """Set the autocommit flag on the connection.""" if not self.supports_autocommit and autocommit: self.log.warning( "%s connection doesn't support autocommit but autocommit activated.", @@ -446,12 +444,13 @@ def get_autocommit(self, conn) -> bool: return getattr(conn, "autocommit", False) and self.supports_autocommit def get_cursor(self): - """Returns a cursor.""" + """Return a cursor.""" return self.get_conn().cursor() @classmethod def _generate_insert_sql(cls, table, values, target_fields, replace, **kwargs) -> str: - """Helper class method that generates the INSERT SQL statement. + """ + Generate the INSERT SQL statement. The REPLACE variant is specific to MySQL syntax. @@ -517,7 +516,7 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replac @staticmethod def _serialize_cell(cell, conn=None) -> str | None: """ - Returns the SQL literal of the cell as a string. + Return the SQL literal of the cell as a string. :param cell: The cell to insert into the table :param conn: The database connection @@ -531,7 +530,7 @@ def _serialize_cell(cell, conn=None) -> str | None: def bulk_dump(self, table, tmp_file): """ - Dumps a database table into a tab-delimited file. + Dump a database table into a tab-delimited file. :param table: The name of the source table :param tmp_file: The path of the target file @@ -540,7 +539,7 @@ def bulk_dump(self, table, tmp_file): def bulk_load(self, table, tmp_file): """ - Loads a tab-delimited file into a database table. + Load a tab-delimited file into a database table. :param table: The name of the target table :param tmp_file: The path of the file to load into the table @@ -562,7 +561,7 @@ def test_connection(self): def get_openlineage_database_info(self, connection) -> DatabaseInfo | None: """ - Returns database specific information needed to generate and parse lineage metadata. + Return database specific information needed to generate and parse lineage metadata. This includes information helpful for constructing information schema query and creating correct namespace. @@ -572,7 +571,7 @@ def get_openlineage_database_info(self, connection) -> DatabaseInfo | None: def get_openlineage_database_dialect(self, connection) -> str: """ - Returns database dialect used for SQL parsing. + Return database dialect used for SQL parsing. For a list of supported dialects check: https://openlineage.io/docs/development/sql#sql-dialects """ @@ -580,7 +579,7 @@ def get_openlineage_database_dialect(self, connection) -> str: def get_openlineage_default_schema(self) -> str | None: """ - Returns default schema specific to database. + Return default schema specific to database. .. seealso:: - :class:`airflow.providers.openlineage.sqlparser.SQLParser` @@ -589,7 +588,7 @@ def get_openlineage_default_schema(self) -> str | None: def get_openlineage_database_specific_lineage(self, task_instance) -> OperatorLineage | None: """ - Returns additional database specific lineage, e.g. query execution information. + Return additional database specific lineage, e.g. query execution information. This method is called only on completion of the task. @@ -600,7 +599,7 @@ def get_openlineage_database_specific_lineage(self, task_instance) -> OperatorLi @staticmethod def get_openlineage_authority_part(connection, default_port: int | None = None) -> str: """ - This method serves as common method for several hooks to get authority part from Airflow Connection. + Get authority part from Airflow Connection. The authority represents the hostname and port of the connection and conforms OpenLineage naming convention for a number of databases (e.g. MySQL, Postgres, Trino). diff --git a/airflow/providers/common/sql/operators/sql.py b/airflow/providers/common/sql/operators/sql.py index bfccd9c1fc056..bfd27e8ad381c 100644 --- a/airflow/providers/common/sql/operators/sql.py +++ b/airflow/providers/common/sql/operators/sql.py @@ -55,6 +55,8 @@ def _parse_boolean(val: str) -> str | bool: def _get_failed_checks(checks, col=None): """ + Get failed checks. + IMPORTANT!!! Keep it for compatibility with released 8.4.0 version of google provider. Unfortunately the provider used _get_failed_checks and parse_boolean as imports and we should @@ -248,7 +250,7 @@ def __init__( def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequence] | None]) -> list[Any]: """ - Processes output before it is returned by the operator. + Process output before it is returned by the operator. It can be overridden by the subclass in case some extra processing is needed. Note that unlike DBApiHook return values returned - the results passed and returned by ``_process_output`` should @@ -1100,7 +1102,7 @@ def execute(self, context: Context): def push(self, meta_data): """ - Optional: Send data check info and metadata to an external database. + Send data check info and metadata to an external database. Default functionality will log metadata. """