Skip to content

Commit

Permalink
Completed D400 for multiple folders (apache#27969)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdsoha authored Dec 6, 2022
1 parent 484fa36 commit e6cd96b
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 168 deletions.
32 changes: 14 additions & 18 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@

class BaseExecutor(LoggingMixin):
"""
Class to derive in order to interface with executor-type systems
like Celery, Kubernetes, Local, Sequential and the likes.
Class to derive in order to implement concrete executors.
Such as, Celery, Kubernetes, Local, Sequential and the likes.
:param parallelism: how many jobs should run at one time. Set to
``0`` for infinity
``0`` for infinity.
"""

supports_ad_hoc_ti_run: bool = False
Expand Down Expand Up @@ -90,7 +90,7 @@ def queue_command(
priority: int = 1,
queue: str | None = None,
):
"""Queues command to task"""
"""Queues command to task."""
if task_instance.key not in self.queued_tasks:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)
Expand Down Expand Up @@ -266,9 +266,10 @@ def success(self, key: TaskInstanceKey, info=None) -> None:

def get_event_buffer(self, dag_ids=None) -> dict[TaskInstanceKey, EventBufferValueType]:
"""
Returns and flush the event buffer. In case dag_ids is specified
it will only return and flush events for the given dag_ids. Otherwise
it returns and flushes all events.
Return and flush the event buffer.
In case dag_ids is specified it will only return and flush events
for the given dag_ids. Otherwise, it returns and flushes all events.
:param dag_ids: the dag_ids to return events for; returns all if given ``None``.
:return: a dict of events
Expand Down Expand Up @@ -302,15 +303,11 @@ def execute_async(
raise NotImplementedError()

def end(self) -> None: # pragma: no cover
"""
This method is called when the caller is done submitting job and
wants to wait synchronously for the job submitted previously to be
all done.
"""
"""Wait synchronously for the previously submitted job to complete."""
raise NotImplementedError()

def terminate(self):
"""This method is called when the daemon receives a SIGTERM"""
"""This method is called when the daemon receives a SIGTERM."""
raise NotImplementedError()

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
Expand All @@ -328,7 +325,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task

@property
def slots_available(self):
"""Number of new tasks this executor instance can accept"""
"""Number of new tasks this executor instance can accept."""
if self.parallelism:
return self.parallelism - len(self.running) - len(self.queued_tasks)
else:
Expand All @@ -337,10 +334,9 @@ def slots_available(self):
@staticmethod
def validate_command(command: list[str]) -> None:
"""
Back-compat method to Check if the command to execute is airflow command
Back-compat method to Check if the command to execute is airflow command.
:param command: command to check
:return: None
"""
warnings.warn(
"""
Expand All @@ -354,7 +350,7 @@ def validate_command(command: list[str]) -> None:
@staticmethod
def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None, str | None]:
"""
Check if the command to execute is airflow command
Check if the command to execute is airflow command.
Returns tuple (dag_id,task_id) retrieved from the command (replaced with None values if missing)
"""
Expand All @@ -374,7 +370,7 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None,
return None, None

def debug_dump(self):
"""Called in response to SIGUSR2 by the scheduler"""
"""Called in response to SIGUSR2 by the scheduler."""
self.log.info(
"executor.queued (%d)\n\t%s",
len(self.queued_tasks),
Expand Down
30 changes: 17 additions & 13 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""CeleryExecutor
"""CeleryExecutor.
.. seealso::
For more information on how the CeleryExecutor works, take a look at the guide:
Expand Down Expand Up @@ -187,8 +187,7 @@ def send_task_to_executor(
@celery_import_modules.connect
def on_celery_import_modules(*args, **kwargs):
"""
Preload some "expensive" airflow modules so that every task process doesn't have to import it again and
again.
Preload some "expensive" airflow modules once, so other task processes won't have to import it again.
Loading these for each task adds 0.3-0.5s *per task* before the task can run. For long running tasks this
doesn't matter, but for short tasks this starts to be a noticeable impact.
Expand Down Expand Up @@ -219,8 +218,9 @@ class _CeleryPendingTaskTimeoutType(Enum):

class CeleryExecutor(BaseExecutor):
"""
CeleryExecutor is recommended for production use of Airflow. It allows
distributing the execution of task instances to multiple worker nodes.
CeleryExecutor is recommended for production use of Airflow.
It allows distributing the execution of task instances to multiple worker nodes.
Celery is a simple, flexible and reliable distributed system to process
vast amounts of messages, while providing operations with the tools
Expand Down Expand Up @@ -358,13 +358,14 @@ def _get_timedout_ti_keys(
self, task_timeouts: dict[TaskInstanceKey, datetime.datetime]
) -> list[TaskInstanceKey]:
"""
These timeouts exist to check to see if any of our tasks have not progressed
in the expected time. This can happen for few different reasons, usually related
to race conditions while shutting down schedulers and celery workers.
Evaluate whether other tasks have stalled during the expected time.
This can happen for few different reasons,
usually related to race conditions while shutting down schedulers and celery workers.
It is, of course, always possible that these tasks are not actually
stalled - they could just be waiting in a long celery queue.
Unfortunately there's no way for us to know for sure, so we'll just
Unfortunately, there's no way for us to know for sure, so we'll just
reschedule them and let the normal scheduler loop requeue them.
"""
now = utcnow()
Expand Down Expand Up @@ -421,7 +422,7 @@ def _send_stalled_tis_back_to_scheduler(
self.log.error("Error revoking task instance %s from celery: %s", key, ex)

def debug_dump(self) -> None:
"""Called in response to SIGUSR2 by the scheduler"""
"""Called in response to SIGUSR2 by the scheduler."""
super().debug_dump()
self.log.info(
"executor.tasks (%d)\n\t%s", len(self.tasks), "\n\t".join(map(repr, self.tasks.items()))
Expand Down Expand Up @@ -540,6 +541,8 @@ def _set_celery_pending_task_timeout(
self, key: TaskInstanceKey, timeout_type: _CeleryPendingTaskTimeoutType | None
) -> None:
"""
Set pending task timeout.
We use the fact that dicts maintain insertion order, and the the timeout for a
task is always "now + delta" to maintain the property that oldest item = first to
time out.
Expand All @@ -554,8 +557,9 @@ def _set_celery_pending_task_timeout(

def fetch_celery_task_state(async_result: AsyncResult) -> tuple[str, str | ExceptionWithTraceback, Any]:
"""
Fetch and return the state of the given celery task. The scope of this function is
global so that it can be called by subprocesses in the pool.
Fetch and return the state of the given celery task.
The scope of this function is global so that it can be called by subprocesses in the pool.
:param async_result: a tuple of the Celery task key and the async Celery object used
to fetch the task's state
Expand All @@ -575,7 +579,7 @@ def fetch_celery_task_state(async_result: AsyncResult) -> tuple[str, str | Excep

class BulkStateFetcher(LoggingMixin):
"""
Gets status for many Celery tasks using the best method available
Gets status for many Celery tasks using the best method available.
If BaseKeyValueStoreBackend is used as result backend, the mget method is used.
If DatabaseBackend is used as result backend, the SELECT ...WHERE task_id IN (...) query is used
Expand Down
32 changes: 17 additions & 15 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,40 +52,42 @@ def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: Kuberne

@property
def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
"""Return queued tasks from celery and kubernetes executor"""
"""Return queued tasks from celery and kubernetes executor."""
queued_tasks = self.celery_executor.queued_tasks.copy()
queued_tasks.update(self.kubernetes_executor.queued_tasks)

return queued_tasks

@property
def running(self) -> set[TaskInstanceKey]:
"""Return running tasks from celery and kubernetes executor"""
"""Return running tasks from celery and kubernetes executor."""
return self.celery_executor.running.union(self.kubernetes_executor.running)

@property
def job_id(self) -> int | None:
"""
This is a class attribute in BaseExecutor but since this is not really an executor, but a wrapper
of executors we implement as property so we can have custom setter.
Inherited attribute from BaseExecutor.
Since this is not really an executor, but a wrapper of executors
we implemented it as property, so we can have custom setter.
"""
return self._job_id

@job_id.setter
def job_id(self, value: int | None) -> None:
"""job_id is manipulated by SchedulerJob. We must propagate the job_id to wrapped executors."""
"""Expose job ID for SchedulerJob."""
self._job_id = value
self.kubernetes_executor.job_id = value
self.celery_executor.job_id = value

def start(self) -> None:
"""Start celery and kubernetes executor"""
"""Start celery and kubernetes executor."""
self.celery_executor.start()
self.kubernetes_executor.start()

@property
def slots_available(self) -> int:
"""Number of new tasks this executor instance can accept"""
"""Number of new tasks this executor instance can accept."""
return self.celery_executor.slots_available

def queue_command(
Expand All @@ -95,7 +97,7 @@ def queue_command(
priority: int = 1,
queue: str | None = None,
) -> None:
"""Queues command via celery or kubernetes executor"""
"""Queues command via celery or kubernetes executor."""
executor = self._router(task_instance)
self.log.debug("Using executor: %s for %s", executor.__class__.__name__, task_instance.key)
executor.queue_command(task_instance, command, priority, queue)
Expand All @@ -112,7 +114,7 @@ def queue_task_instance(
pool: str | None = None,
cfg_path: str | None = None,
) -> None:
"""Queues task instance via celery or kubernetes executor"""
"""Queues task instance via celery or kubernetes executor."""
executor = self._router(SimpleTaskInstance.from_ti(task_instance))
self.log.debug(
"Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
Expand Down Expand Up @@ -141,15 +143,15 @@ def has_task(self, task_instance: TaskInstance) -> bool:
)

def heartbeat(self) -> None:
"""Heartbeat sent to trigger new jobs in celery and kubernetes executor"""
"""Heartbeat sent to trigger new jobs in celery and kubernetes executor."""
self.celery_executor.heartbeat()
self.kubernetes_executor.heartbeat()

def get_event_buffer(
self, dag_ids: list[str] | None = None
) -> dict[TaskInstanceKey, EventBufferValueType]:
"""
Returns and flush the event buffer from celery and kubernetes executor
Return and flush the event buffer from celery and kubernetes executor.
:param dag_ids: dag_ids to return events for, if None returns all
:return: a dict of events
Expand All @@ -176,18 +178,18 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
]

def end(self) -> None:
"""End celery and kubernetes executor"""
"""End celery and kubernetes executor."""
self.celery_executor.end()
self.kubernetes_executor.end()

def terminate(self) -> None:
"""Terminate celery and kubernetes executor"""
"""Terminate celery and kubernetes executor."""
self.celery_executor.terminate()
self.kubernetes_executor.terminate()

def _router(self, simple_task_instance: SimpleTaskInstance) -> CeleryExecutor | KubernetesExecutor:
"""
Return either celery_executor or kubernetes_executor
Return either celery_executor or kubernetes_executor.
:param simple_task_instance: SimpleTaskInstance
:return: celery_executor or kubernetes_executor
Expand All @@ -197,7 +199,7 @@ def _router(self, simple_task_instance: SimpleTaskInstance) -> CeleryExecutor |
return self.celery_executor

def debug_dump(self) -> None:
"""Called in response to SIGUSR2 by the scheduler"""
"""Called in response to SIGUSR2 by the scheduler."""
self.log.info("Dumping CeleryExecutor state")
self.celery_executor.debug_dump()
self.log.info("Dumping KubernetesExecutor state")
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""
DaskExecutor
DaskExecutor.
.. seealso::
For more information on how the DaskExecutor works, take a look at the guide:
Expand Down
12 changes: 5 additions & 7 deletions airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""
DebugExecutor
DebugExecutor.
.. seealso::
For more information on how the DebugExecutor works, take a look at the guide:
Expand Down Expand Up @@ -112,8 +112,9 @@ def queue_task_instance(

def trigger_tasks(self, open_slots: int) -> None:
"""
Triggers tasks. Instead of calling exec_async we just
add task instance to tasks_to_run queue.
Triggers tasks.
Instead of calling exec_async we just add task instance to tasks_to_run queue.
:param open_slots: Number of open slots
"""
Expand All @@ -129,10 +130,7 @@ def trigger_tasks(self, open_slots: int) -> None:
self.tasks_to_run.append(ti) # type: ignore

def end(self) -> None:
"""
When the method is called we just set states of queued tasks
to UPSTREAM_FAILED marking them as not executed.
"""
"""Set states of queued tasks to UPSTREAM_FAILED marking them as not executed."""
for ti in self.tasks_to_run:
self.log.info("Setting %s to %s", ti.key, State.UPSTREAM_FAILED)
ti.set_state(State.UPSTREAM_FAILED)
Expand Down
Loading

0 comments on commit e6cd96b

Please sign in to comment.