Skip to content

Commit

Permalink
Handle OverflowError on exponential backof in next_run_calculation (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk authored Dec 8, 2022
1 parent e948b55 commit 2cbe596
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 3 deletions.
9 changes: 8 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@
type: integer
example: ~
default: "300"
- name: max_task_retry_delay
description: |
The maximum delay (in seconds) each task is going to wait by default between retries.
This is a global setting and cannot be overridden at task or DAG level.
version_added: 2.6.0
type: integer
default: "86400"
example: ~
- name: default_task_weight_rule
description: |
The weighting method used for the effective total priority weight of the task
Expand Down Expand Up @@ -431,7 +439,6 @@
type: string
default: ~
example: 'http://localhost:8080'

- name: database
description: ~
options:
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ default_task_retries = 0
# dag or task level.
default_task_retry_delay = 300

# The maximum delay (in seconds) each task is going to wait by default between retries.
# This is a global setting and cannot be overridden at task or DAG level.
max_task_retry_delay = 86400

# The weighting method used for the effective total priority weight of the task
default_task_weight_rule = downstream

Expand Down
2 changes: 2 additions & 0 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
DEFAULT_RETRY_DELAY: datetime.timedelta = datetime.timedelta(
seconds=conf.getint("core", "default_task_retry_delay", fallback=300)
)
MAX_RETRY_DELAY: int = conf.getint("core", "max_task_retry_delay", fallback=24 * 60 * 60)

DEFAULT_WEIGHT_RULE: WeightRule = WeightRule(
conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM)
)
Expand Down
6 changes: 4 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,8 @@ def next_retry_datetime(self):
Get datetime of the next retry if the task instance fails. For exponential
backoff, retry_delay is used as base and will be converted to seconds.
"""
from airflow.models.abstractoperator import MAX_RETRY_DELAY

delay = self.task.retry_delay
if self.task.retry_exponential_backoff:
# If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,
Expand Down Expand Up @@ -1139,8 +1141,8 @@ def next_retry_datetime(self):
# here means this value can be exceeded after a certain number
# of tries (around 50 if the initial delay is 1s, even fewer if
# the delay is larger). Cap the value here before creating a
# timedelta object so the operation doesn't fail.
delay_backoff_in_seconds = min(modded_hash, timedelta.max.total_seconds() - 1)
# timedelta object so the operation doesn't fail with "OverflowError".
delay_backoff_in_seconds = min(modded_hash, MAX_RETRY_DELAY)
delay = timedelta(seconds=delay_backoff_in_seconds)
if self.task.max_retry_delay:
delay = min(self.task.max_retry_delay, delay)
Expand Down
1 change: 1 addition & 0 deletions newsfragments/28172.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Maximum retry task delay is set to be 24h (86400s) by default. You can change it globally via ``core.max_task_retry_delay`` parameter.

0 comments on commit 2cbe596

Please sign in to comment.