Skip to content

Commit

Permalink
Simplify logic to resolve tasks stuck in queued despite stalled_task_…
Browse files Browse the repository at this point in the history
…timeout (apache#30375)

* simplify and consolidate logic for tasks stuck in queued

* simplify and consolidate logic for tasks stuck in queued

* simplify and consolidate logic for tasks stuck in queued

* fixed tests; updated fail stuck tasks to use run_with_db_retries

* mypy; fixed tests

* fix task_adoption_timeout in celery integration test

* addressing comments

* remove useless print

* fix typo

* move failure logic to executor

* fix scheduler job test

* adjustments for new scheduler job

* appeasing static checks

* fix test for new scheduler job paradigm

* Updating docs for deprecations

* news & small changes

* news & small changes

* Update newsfragments/30375.significant.rst

Co-authored-by: Jed Cunningham <[email protected]>

* Update newsfragments/30375.significant.rst

Co-authored-by: Jed Cunningham <[email protected]>

* added cleanup stuck task functionality to base executor

* fix sloppy mistakes & mypy

* removing self.fail from base_executor

* Update airflow/jobs/scheduler_job_runner.py

Co-authored-by: Jed Cunningham <[email protected]>

* Update airflow/jobs/scheduler_job_runner.py

Co-authored-by: Jed Cunningham <[email protected]>

* Fix job_id filter

* Don't even run query if executor doesn't support timing out queued tasks

* Add support for LocalKubernetesExecutor and CeleryKubernetesExecutor

* Add config option to control how often it runs - we want it quicker than
the timeout

* Fixup newsfragment

* mark old KE pending pod check interval as deprecated by new check interval

* Fixup deprecation warnings

This more closely mirrors how deprecations are raised for "normal"
deprecations.

I've removed the depth, as moving up the stack doesn't really help the
user at all in this situation.

* Another deprecation cleanup

* Remove db retries

* Fix test

---------

Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Ephraim Anierobi <[email protected]>
  • Loading branch information
4 people authored Apr 14, 2023
1 parent 1ebeb19 commit 6f2277c
Show file tree
Hide file tree
Showing 17 changed files with 301 additions and 544 deletions.
57 changes: 15 additions & 42 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2047,26 +2047,6 @@ celery:
type: boolean
example: ~
default: "True"
task_adoption_timeout:
description: |
Time in seconds after which adopted tasks which are queued in celery are assumed to be stalled,
and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but
applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting
also applies to adopted tasks. To calculate adoption time, subtract the
:ref:`task duration<ui:task-duration>` from the task's :ref:`landing time<ui:landing-times>`.
version_added: 2.0.0
type: integer
example: ~
default: "600"
stalled_task_timeout:
description: |
Time in seconds after which tasks queued in celery are assumed to be stalled, and are automatically
rescheduled. Adopted tasks will instead use the ``task_adoption_timeout`` setting if specified.
When set to 0, automatic clearing of stalled tasks is disabled.
version_added: 2.3.1
type: integer
example: ~
default: "0"
task_publish_max_retries:
description: |
The Maximum number of retries for publishing task messages to the broker when failing
Expand Down Expand Up @@ -2415,6 +2395,21 @@ scheduler:
type: string
example: ~
default: "15"
task_queued_timeout:
description: |
Amount of time a task can be in the queued state before being retried or set to failed.
version_added: 2.6.0
type: float
example: ~
default: "600.0"
task_queued_timeout_check_interval:
description: |
How often to check for tasks that have been in the queued state for
longer than `[scheduler] task_queued_timeout`.
version_added: 2.6.0
type: float
example: ~
default: "120.0"
triggerer:
description: ~
options:
Expand Down Expand Up @@ -2731,35 +2726,13 @@ kubernetes_executor:
type: boolean
example: ~
default: "True"
worker_pods_pending_timeout:
description: |
How long in seconds a worker can be in Pending before it is considered a failure
version_added: 2.1.0
type: integer
example: ~
default: "300"
worker_pods_pending_timeout_check_interval:
description: |
How often in seconds to check if Pending workers have exceeded their timeouts
version_added: 2.1.0
type: integer
example: ~
default: "120"
worker_pods_queued_check_interval:
description: |
How often in seconds to check for task instances stuck in "queued" status without a pod
version_added: 2.2.0
type: integer
example: ~
default: "60"
worker_pods_pending_timeout_batch_size:
description: |
How many pending pods to check for timeout violations in each check interval.
You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``.
version_added: 2.1.0
type: integer
example: ~
default: "100"
ssl_ca_cert:
description: |
Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate.
Expand Down
29 changes: 7 additions & 22 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1033,18 +1033,6 @@ operation_timeout = 1.0
# or run in HA mode, it can adopt the orphan tasks launched by previous SchedulerJob.
task_track_started = True

# Time in seconds after which adopted tasks which are queued in celery are assumed to be stalled,
# and are automatically rescheduled. This setting does the same thing as ``stalled_task_timeout`` but
# applies specifically to adopted tasks only. When set to 0, the ``stalled_task_timeout`` setting
# also applies to adopted tasks. To calculate adoption time, subtract the
# :ref:`task duration<ui:task-duration>` from the task's :ref:`landing time<ui:landing-times>`.
task_adoption_timeout = 600

# Time in seconds after which tasks queued in celery are assumed to be stalled, and are automatically
# rescheduled. Adopted tasks will instead use the ``task_adoption_timeout`` setting if specified.
# When set to 0, automatic clearing of stalled tasks is disabled.
stalled_task_timeout = 0

# The Maximum number of retries for publishing task messages to the broker when failing
# due to ``AirflowTaskTimeout`` error before giving up and marking Task as failed.
task_publish_max_retries = 3
Expand Down Expand Up @@ -1228,6 +1216,13 @@ allow_trigger_in_future = False
# How often to check for expired trigger requests that have not run yet.
trigger_timeout_check_interval = 15

# Amount of time a task can be in the queued state before being retried or set to failed.
task_queued_timeout = 600.0

# How often to check for tasks that have been in the queued state for
# longer than `[scheduler] task_queued_timeout`.
task_queued_timeout_check_interval = 120.0

[triggerer]
# How many triggers a single Triggerer will run at once, by default.
default_capacity = 1000
Expand Down Expand Up @@ -1372,19 +1367,9 @@ tcp_keep_cnt = 6
# Set this to false to skip verifying SSL certificate of Kubernetes python client.
verify_ssl = True

# How long in seconds a worker can be in Pending before it is considered a failure
worker_pods_pending_timeout = 300

# How often in seconds to check if Pending workers have exceeded their timeouts
worker_pods_pending_timeout_check_interval = 120

# How often in seconds to check for task instances stuck in "queued" status without a pod
worker_pods_queued_check_interval = 60

# How many pending pods to check for timeout violations in each check interval.
# You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``.
worker_pods_pending_timeout_batch_size = 100

# Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate.
ssl_ca_cert =

Expand Down
30 changes: 22 additions & 8 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ class AirflowConfigParser(ConfigParser):
("database", "load_default_connections"): ("core", "load_default_connections", "2.3.0"),
("database", "max_db_retries"): ("core", "max_db_retries", "2.3.0"),
("scheduler", "parsing_cleanup_interval"): ("scheduler", "deactivate_stale_dags_interval", "2.5.0"),
("scheduler", "task_queued_timeout_check_interval"): (
"kubernetes_executor",
"worker_pods_pending_timeout_check_interval",
"2.6.0",
),
}

# A mapping of new configurations to a list of old configurations for when one configuration
# deprecates more than one other deprecation. The deprecation logic for these configurations
# is defined in SchedulerJobRunner.
many_to_one_deprecated_options: dict[tuple[str, str], list[tuple[str, str, str]]] = {
("scheduler", "task_queued_timeout"): [
("celery", "stalled_task_timeout", "2.6.0"),
("celery", "task_adoption_timeout", "2.6.0"),
("kubernetes_executor", "worker_pods_pending_timeout", "2.6.0"),
]
}

# A mapping of new section -> (old section, since_version).
Expand Down Expand Up @@ -548,12 +564,10 @@ def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:

@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: # type: ignore[override]

...

@overload # type: ignore[override]
def get(self, section: str, key: str, **kwargs) -> str | None: # type: ignore[override]

...

def get( # type: ignore[override, misc]
Expand Down Expand Up @@ -1070,7 +1084,7 @@ def as_dict(
# This ensures the ones from config file is hidden too
# if they are not provided through env, cmd and secret
hidden = "< hidden >"
for (section, key) in self.sensitive_config_values:
for section, key in self.sensitive_config_values:
if not config_sources.get(section):
continue
if config_sources[section].get(key, None):
Expand All @@ -1089,7 +1103,7 @@ def _include_secrets(
display_source: bool,
raw: bool,
):
for (section, key) in self.sensitive_config_values:
for section, key in self.sensitive_config_values:
value: str | None = self._get_secret_option_from_config_sources(config_sources, section, key)
if value:
if not display_sensitive:
Expand All @@ -1110,7 +1124,7 @@ def _include_commands(
display_source: bool,
raw: bool,
):
for (section, key) in self.sensitive_config_values:
for section, key in self.sensitive_config_values:
opt = self._get_cmd_option_from_config_sources(config_sources, section, key)
if not opt:
continue
Expand Down Expand Up @@ -1188,7 +1202,7 @@ def _filter_by_source(
:return: None, the given config_sources is filtered if necessary,
otherwise untouched.
"""
for (section, key) in self.sensitive_config_values:
for section, key in self.sensitive_config_values:
# Don't bother if we don't have section / key
if section not in config_sources or key not in config_sources[section]:
continue
Expand Down Expand Up @@ -1222,7 +1236,7 @@ def _replace_config_with_display_sources(
include_cmds: bool,
include_secret: bool,
):
for (source_name, config) in configs:
for source_name, config in configs:
for section in config.sections():
AirflowConfigParser._replace_section_config_with_display_sources(
config,
Expand All @@ -1249,7 +1263,7 @@ def _deprecated_value_is_set_in_config(
continue
try:
deprecated_section_array = config.items(section=deprecated_section, raw=True)
for (key_candidate, _) in deprecated_section_array:
for key_candidate, _ in deprecated_section_array:
if key_candidate == deprecated_key:
return True
except NoSectionError:
Expand Down
12 changes: 12 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,18 @@ def terminate(self):
"""This method is called when the daemon receives a SIGTERM."""
raise NotImplementedError()

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
"""
Handle remnants of tasks that were failed because they were stuck in queued.
Tasks can get stuck in queued. If such a task is detected, it will be marked
as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED`
if it doesn't.
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
raise NotImplementedError()

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
"""
Try to adopt running task instances that have been abandoned by a SchedulerJob dying.
Expand Down
Loading

0 comments on commit 6f2277c

Please sign in to comment.