Skip to content

Commit

Permalink
D401 Support - A thru Common (Inclusive) (apache#34934)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi authored Oct 16, 2023
1 parent 91986b0 commit f23170c
Show file tree
Hide file tree
Showing 24 changed files with 152 additions and 140 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/utils/suppress.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

def return_on_error(return_value: RT):
"""
Helper decorator which suppress any ``Exception`` raised in decorator function.
Suppress any ``Exception`` raised in decorator function.
Main use-case when functional is optional, however any error on functions/methods might
raise any error which are subclass of ``Exception``.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/asana/hooks/asana.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def client(self) -> Client:

def create_task(self, task_name: str, params: dict | None) -> dict:
"""
Creates an Asana task.
Create an Asana task.
:param task_name: Name of the new task
:param params: Other task attributes, such as due_on, parent, and notes. For a complete list
Expand Down
9 changes: 5 additions & 4 deletions airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,14 @@ def sync(self) -> None:
self.update_all_task_states()

def debug_dump(self) -> None:
"""Called in response to SIGUSR2 by the scheduler."""
"""Debug dump; called in response to SIGUSR2 by the scheduler."""
super().debug_dump()
self.log.info(
"executor.tasks (%d)\n\t%s", len(self.tasks), "\n\t".join(map(repr, self.tasks.items()))
)

def update_all_task_states(self) -> None:
"""Updates states of the tasks."""
"""Update states of the tasks."""
self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
state_and_info_by_celery_task_id = self.bulk_state_fetcher.get_many(self.tasks.values())

Expand All @@ -362,7 +362,7 @@ def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None
self.tasks.pop(key, None)

def update_task_state(self, key: TaskInstanceKey, state: str, info: Any) -> None:
"""Updates state of a single task."""
"""Update state of a single task."""
try:
if state == celery_states.SUCCESS:
self.success(key, info)
Expand Down Expand Up @@ -483,7 +483,8 @@ def get_cli_commands() -> list[GroupCommand]:


def _get_parser() -> argparse.ArgumentParser:
"""This method is used by Sphinx to generate documentation.
"""
Generate documentation; used by Sphinx.
:meta private:
"""
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/celery/executors/celery_executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def on_celery_import_modules(*args, **kwargs):

@app.task
def execute_command(command_to_exec: CommandType) -> None:
"""Executes command."""
"""Execute command."""
dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
celery_task_id = app.current_task.request.id
log.info("[%s] Executing command in Celery: %s", celery_task_id, command_to_exec)
Expand Down Expand Up @@ -192,7 +192,7 @@ def __init__(self, exception: Exception, exception_traceback: str):
def send_task_to_executor(
task_tuple: TaskInstanceInCelery,
) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]:
"""Sends task to executor."""
"""Send task to executor."""
key, command, queue, task_to_run = task_tuple
try:
with timeout(seconds=OPERATION_TIMEOUT):
Expand Down Expand Up @@ -243,7 +243,7 @@ def _tasks_list_to_task_ids(self, async_tasks) -> set[str]:
return {a.task_id for a in async_tasks}

def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
"""Gets status for many Celery tasks using the best method available."""
"""Get status for many Celery tasks using the best method available."""
if isinstance(app.backend, BaseKeyValueStoreBackend):
result = self._get_many_from_kv_backend(async_results)
elif isinstance(app.backend, DatabaseBackend):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Checks if a task is either queued or running in either celery or kubernetes executor.
Check if a task is either queued or running in either celery or kubernetes executor.
:param task_instance: TaskInstance
:return: True if the task is known to this executor
Expand Down Expand Up @@ -243,14 +243,15 @@ def _router(self, simple_task_instance: SimpleTaskInstance) -> CeleryExecutor |
return self.celery_executor

def debug_dump(self) -> None:
"""Called in response to SIGUSR2 by the scheduler."""
"""Debug dump; called in response to SIGUSR2 by the scheduler."""
self.log.info("Dumping CeleryExecutor state")
self.celery_executor.debug_dump()
self.log.info("Dumping KubernetesExecutor state")
self.kubernetes_executor.debug_dump()

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.
"""
Send callback for execution.
:param request: Callback request to be executed.
"""
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/cloudant/hooks/cloudant.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CloudantHook(BaseHook):

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour."""
"""Return custom field behaviour."""
return {
"hidden_fields": ["port", "extra"],
"relabeling": {"host": "Account", "login": "Username (or API Key)", "schema": "Database"},
Expand All @@ -54,7 +54,7 @@ def __init__(self, cloudant_conn_id: str = default_conn_name) -> None:

def get_conn(self) -> cloudant:
"""
Opens a connection to the cloudant service and closes it automatically if used as context manager.
Open a connection to the cloudant service and close it automatically if used as context manager.
.. note::
In the connection form:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,35 @@ def _convert_from_dict(obj, new_class):


def convert_volume(volume) -> k8s.V1Volume:
"""Converts an airflow Volume object into a k8s.V1Volume.
"""
Convert an airflow Volume object into a k8s.V1Volume.
:param volume:
"""
return _convert_kube_model_object(volume, k8s.V1Volume)


def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
"""Converts an airflow VolumeMount object into a k8s.V1VolumeMount.
"""
Convert an airflow VolumeMount object into a k8s.V1VolumeMount.
:param volume_mount:
"""
return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)


def convert_port(port) -> k8s.V1ContainerPort:
"""Converts an airflow Port object into a k8s.V1ContainerPort.
"""
Convert an airflow Port object into a k8s.V1ContainerPort.
:param port:
"""
return _convert_kube_model_object(port, k8s.V1ContainerPort)


def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]:
"""Converts a dictionary into a list of env_vars.
"""
Convert a dictionary into a list of env_vars.
:param env_vars:
"""
Expand All @@ -83,15 +87,17 @@ def convert_env_vars(env_vars) -> list[k8s.V1EnvVar]:


def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar:
"""Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar.
"""
Convert a PodRuntimeInfoEnv into an k8s.V1EnvVar.
:param pod_runtime_info_envs:
"""
return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar)


def convert_image_pull_secrets(image_pull_secrets) -> list[k8s.V1LocalObjectReference]:
"""Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar.
"""
Convert a PodRuntimeInfoEnv into an k8s.V1EnvVar.
:param image_pull_secrets:
"""
Expand All @@ -103,18 +109,19 @@ def convert_image_pull_secrets(image_pull_secrets) -> list[k8s.V1LocalObjectRefe


def convert_configmap(configmaps) -> k8s.V1EnvFromSource:
"""Converts a str into an k8s.V1EnvFromSource.
"""
Convert a str into an k8s.V1EnvFromSource.
:param configmaps:
"""
return k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmaps))


def convert_affinity(affinity) -> k8s.V1Affinity:
"""Converts a dict into an k8s.V1Affinity."""
"""Convert a dict into an k8s.V1Affinity."""
return _convert_from_dict(affinity, k8s.V1Affinity)


def convert_toleration(toleration) -> k8s.V1Toleration:
"""Converts a dict into an k8s.V1Toleration."""
"""Convert a dict into an k8s.V1Toleration."""
return _convert_from_dict(toleration, k8s.V1Toleration)
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non
)

def start(self) -> None:
"""Starts the executor."""
"""Start the executor."""
self.log.info("Start Kubernetes executor")
self.scheduler_job_id = str(self.job_id)
self.log.debug("Start with scheduler_job_id: %s", self.scheduler_job_id)
Expand Down Expand Up @@ -302,7 +302,7 @@ def execute_async(
queue: str | None = None,
executor_config: Any | None = None,
) -> None:
"""Executes task asynchronously."""
"""Execute task asynchronously."""
if TYPE_CHECKING:
assert self.task_queue

Expand Down Expand Up @@ -689,7 +689,7 @@ def _flush_result_queue(self) -> None:
self.result_queue.task_done()

def end(self) -> None:
"""Called when the executor shuts down."""
"""Shut down the executor."""
if TYPE_CHECKING:
assert self.task_queue
assert self.result_queue
Expand Down Expand Up @@ -725,7 +725,8 @@ def get_cli_commands() -> list[GroupCommand]:


def _get_parser() -> argparse.ArgumentParser:
"""This method is used by Sphinx to generate documentation.
"""
Generate documentation; used by Sphinx.
:meta private:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(
self.kube_config = kube_config

def run(self) -> None:
"""Performs watching."""
"""Perform watching."""
if TYPE_CHECKING:
assert self.scheduler_job_id

Expand Down Expand Up @@ -302,7 +302,7 @@ def __init__(
self.kube_watchers = self._make_kube_watchers()

def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
"""Runs POD asynchronously."""
"""Run POD asynchronously."""
sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod)
json_pod = json.dumps(sanitized_pod, indent=2)

Expand Down Expand Up @@ -407,7 +407,7 @@ def run_next(self, next_job: KubernetesJobType) -> None:
self.log.debug("Kubernetes Job created!")

def delete_pod(self, pod_name: str, namespace: str) -> None:
"""Deletes Pod from a namespace. Does not raise if it does not exist."""
"""Delete Pod from a namespace; does not raise if it does not exist."""
try:
self.log.debug("Deleting pod %s in namespace %s", pod_name, namespace)
self.kube_client.delete_namespaced_pod(
Expand Down Expand Up @@ -435,7 +435,7 @@ def patch_pod_executor_done(self, *, pod_name: str, namespace: str):

def sync(self) -> None:
"""
Checks the status of all currently running kubernetes jobs.
Check the status of all currently running kubernetes jobs.
If a job is completed, its status is placed in the result queue to be sent back to the scheduler.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li

def has_task(self, task_instance: TaskInstance) -> bool:
"""
Checks if a task is either queued or running in either local or kubernetes executor.
Check if a task is either queued or running in either local or kubernetes executor.
:param task_instance: TaskInstance
:return: True if the task is known to this executor
Expand Down Expand Up @@ -226,14 +226,15 @@ def _router(self, simple_task_instance: SimpleTaskInstance) -> LocalExecutor | K
return self.local_executor

def debug_dump(self) -> None:
"""Called in response to SIGUSR2 by the scheduler."""
"""Debug dump; called in response to SIGUSR2 by the scheduler."""
self.log.info("Dumping LocalExecutor state")
self.local_executor.debug_dump()
self.log.info("Dumping KubernetesExecutor state")
self.kubernetes_executor.debug_dump()

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.
"""
Send callback for execution.
:param request: Callback request to be executed.
"""
Expand Down
Loading

0 comments on commit f23170c

Please sign in to comment.