Skip to content

Commit

Permalink
[AIRFLOW-6055] Option for exponential backoff in Sensors (apache#6654)
Browse files Browse the repository at this point in the history
A new option "exponential_backoff" in Sensors, will increase the next poke or next reschedule time for sensors exponentially. Turned off by default.
  • Loading branch information
msumit authored Nov 27, 2019
1 parent 3f3b428 commit 5d08c54
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
40 changes: 37 additions & 3 deletions airflow/sensors/base_sensor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.


import hashlib
from datetime import timedelta
from time import sleep
from typing import Dict, Iterable
Expand Down Expand Up @@ -58,6 +58,9 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
quite long. The poke interval should be more than one minute to
prevent too much load on the scheduler.
:type mode: str
:param exponential_backoff: allow progressive longer waits between
pokes by using exponential backoff algorithm
:type exponential_backoff: bool
"""
ui_color = '#e6f1f2' # type: str
valid_modes = ['poke', 'reschedule'] # type: Iterable[str]
Expand All @@ -68,13 +71,15 @@ def __init__(self,
timeout: float = 60 * 60 * 24 * 7,
soft_fail: bool = False,
mode: str = 'poke',
exponential_backoff: bool = False,
*args,
**kwargs) -> None:
super().__init__(*args, **kwargs)
self.poke_interval = poke_interval
self.soft_fail = soft_fail
self.timeout = timeout
self.mode = mode
self.exponential_backoff = exponential_backoff
self._validate_input_values()

def _validate_input_values(self) -> None:
Expand All @@ -101,11 +106,13 @@ def poke(self, context: Dict) -> bool:

def execute(self, context: Dict) -> None:
started_at = timezone.utcnow()
try_number = 1
if self.reschedule:
# If reschedule, use first start date of current try
task_reschedules = TaskReschedule.find_for_task_instance(context['ti'])
if task_reschedules:
started_at = task_reschedules[0].start_date
try_number = len(task_reschedules) + 1
while not self.poke(context):
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
# If sensor is in soft fail mode but will be retried then
Expand All @@ -118,10 +125,11 @@ def execute(self, context: Dict) -> None:
raise AirflowSensorTimeout('Snap. Time is OUT.')
if self.reschedule:
reschedule_date = timezone.utcnow() + timedelta(
seconds=self.poke_interval)
seconds=self._get_next_poke_interval(started_at, try_number))
raise AirflowRescheduleException(reschedule_date)
else:
sleep(self.poke_interval)
sleep(self._get_next_poke_interval(started_at, try_number))
try_number += 1
self.log.info("Success criteria met. Exiting.")

def _do_skip_downstream_tasks(self, context: Dict) -> None:
Expand All @@ -130,6 +138,32 @@ def _do_skip_downstream_tasks(self, context: Dict) -> None:
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

def _get_next_poke_interval(self, started_at, try_number):
"""
Using the similar logic which is used for exponential backoff retry delay for operators.
"""
if self.exponential_backoff:
min_backoff = int(self.poke_interval * (2 ** (try_number - 2)))
current_time = timezone.utcnow()

hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id,
self.task_id,
started_at,
try_number)
.encode('utf-8')).hexdigest(), 16)
modded_hash = min_backoff + hash % min_backoff

delay_backoff_in_seconds = min(
modded_hash,
timedelta.max.total_seconds() - 1
)
new_interval = min(self.timeout - int((current_time - started_at).total_seconds()),
delay_backoff_in_seconds)
self.log.info("new {} interval is {}".format(self.mode, new_interval))
return new_interval
else:
return self.poke_interval

@property
def reschedule(self):
return self.mode == 'reschedule'
Expand Down
35 changes: 34 additions & 1 deletion tests/sensors/test_base_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import unittest
from datetime import timedelta
from time import sleep
from unittest.mock import Mock
from unittest.mock import Mock, patch

from freezegun import freeze_time

Expand Down Expand Up @@ -502,3 +502,36 @@ def test_sensor_with_invalid_timeout(self):
return_value=None,
poke_interval=10,
timeout=positive_timeout)

def test_sensor_with_exponential_backoff_off(self):
sensor = self._make_sensor(
return_value=None,
poke_interval=5,
timeout=60,
exponential_backoff=False)

started_at = timezone.utcnow() - timedelta(seconds=10)
self.assertEqual(sensor._get_next_poke_interval(started_at, 1), sensor.poke_interval)
self.assertEqual(sensor._get_next_poke_interval(started_at, 2), sensor.poke_interval)

def test_sensor_with_exponential_backoff_on(self):

sensor = self._make_sensor(
return_value=None,
poke_interval=5,
timeout=60,
exponential_backoff=True)

with patch('airflow.utils.timezone.utcnow') as mock_utctime:
mock_utctime.return_value = DEFAULT_DATE

started_at = timezone.utcnow() - timedelta(seconds=10)
print(started_at)

interval1 = sensor._get_next_poke_interval(started_at, 1)
interval2 = sensor._get_next_poke_interval(started_at, 2)

self.assertTrue(interval1 >= 0)
self.assertTrue(interval1 <= sensor.poke_interval)
self.assertTrue(interval2 >= sensor.poke_interval)
self.assertTrue(interval2 > interval1)

0 comments on commit 5d08c54

Please sign in to comment.