Skip to content

Commit

Permalink
Merge pull request apache#1547 from artwr/artwr_fix_sensor_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
artwr committed Jun 20, 2016
2 parents 45b735b + c38a5c2 commit 26c31d9
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
4 changes: 2 additions & 2 deletions airflow/operators/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ def poke(self, context):
def execute(self, context):
started_at = datetime.now()
while not self.poke(context):
sleep(self.poke_interval)
if (datetime.now() - started_at).seconds > self.timeout:
if (datetime.now() - started_at).total_seconds() > self.timeout:
if self.soft_fail:
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
logging.info("Success criteria met. Exiting.")


Expand Down
77 changes: 75 additions & 2 deletions tests/operators/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,84 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import logging
import os
import time
import unittest

from airflow.operators.sensors import HttpSensor
from airflow.exceptions import AirflowException
from datetime import datetime, timedelta

from airflow import DAG, configuration
from airflow.operators.sensors import HttpSensor, BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import (AirflowException,
AirflowSensorTimeout,
AirflowSkipException)
configuration.test_mode()

DEFAULT_DATE = datetime(2015, 1, 1)
TEST_DAG_ID = 'unit_test_dag'


class TimeoutTestSensor(BaseSensorOperator):
"""
Sensor that always returns the return_value provided
:param return_value: Set to true to mark the task as SKIPPED on failure
:type return_value: any
"""

@apply_defaults
def __init__(
self,
return_value=False,
*args,
**kwargs):
self.return_value = return_value
super(TimeoutTestSensor, self).__init__(*args, **kwargs)

def poke(self, context):
return self.return_value

def execute(self, context):
started_at = datetime.now()
time_jump = self.params.get('time_jump')
while not self.poke(context):
if time_jump:
started_at -= time_jump
if (datetime.now() - started_at).total_seconds() > self.timeout:
if self.soft_fail:
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
time.sleep(self.poke_interval)
logging.info("Success criteria met. Exiting.")


class SensorTimeoutTest(unittest.TestCase):
def setUp(self):
configuration.test_mode()
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag

def test_timeout(self):
t = TimeoutTestSensor(
task_id='test_timeout',
execution_timeout=timedelta(days=2),
return_value=False,
poke_interval=5,
params={'time_jump': timedelta(days=2, seconds=1)},
dag=self.dag
)
self.assertRaises(
AirflowSensorTimeout,
t.run,
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)


class HttpSensorTests(unittest.TestCase):
Expand Down

0 comments on commit 26c31d9

Please sign in to comment.