From 8051aa988d315ab5c54bf5dd8f6e931356c07c2e Mon Sep 17 00:00:00 2001 From: jlowin Date: Mon, 28 Mar 2016 13:47:13 -0400 Subject: [PATCH] Properly measure number of task retry attempts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A task’s try_number is unbounded (incremented by 1 on every run) so it needs to be adjusted both for logging and for seeing if a task has eclipsed the retry cap. Rerunning a task (either because it failed or with the `force` option) not only leads to nonsensical error messages (“Attempt 2 of 1”) but also would never kick off a retry attempt (because try_number > retries). The solution is to mod the `try_number` with `retries` to keep everything sensible. Fixed: use the correct attempt number when logging Fixed: log when tasks are queued (log message was being created but not logged) Fixed: situation where tasks being run after the first time would not be put up for retry --- airflow/models.py | 34 ++++++++----- tests/models.py | 118 ++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 130 insertions(+), 22 deletions(-) diff --git a/airflow/models.py b/airflow/models.py index 69c6d930fc78c..5421d7a42cd07 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1000,15 +1000,13 @@ def run( ) elif force or self.state in State.runnable(): HR = "\n" + ("-" * 80) + "\n" # Line break - tot_tries = task.retries + 1 + # For reporting purposes, we report based on 1-indexed, # not 0-indexed lists (i.e. Attempt 1 instead of - # Attempt 0 for the first attempt) - msg = "Attempt {} out of {}".format(self.try_number+1, - tot_tries) - self.try_number += 1 - msg = msg.format(**locals()) - logging.info(HR + msg + HR) + # Attempt 0 for the first attempt). + msg = "Starting attempt {attempt} of {total}".format( + attempt=self.try_number % (task.retries + 1) + 1, + total=task.retries + 1) self.start_date = datetime.now() if not mark_success and self.state != State.QUEUED and ( @@ -1016,16 +1014,21 @@ def run( # If a pool is set for this task, marking the task instance # as QUEUED self.state = State.QUEUED - # Since we are just getting enqueued, we need to undo - # the try_number increment above and update the message as well - self.try_number -= 1 - msg = "Queuing attempt {} out of {}".format(self.try_number+1, - tot_tries) + msg = "Queuing attempt {attempt} of {total}".format( + attempt=self.try_number % (task.retries + 1) + 1, + total=task.retries + 1) + logging.info(HR + msg + HR) + self.queued_dttm = datetime.now() session.merge(self) session.commit() logging.info("Queuing into pool {}".format(self.pool)) return + + # print status message + logging.info(HR + msg + HR) + self.try_number += 1 + if not test_mode: session.add(Log(State.RUNNING, self)) self.state = State.RUNNING @@ -1121,12 +1124,17 @@ def handle_failure(self, error, test_mode=False, context=None): # Let's go deeper try: - if self.try_number <= task.retries: + if task.retries and self.try_number % (task.retries + 1) != 0: self.state = State.UP_FOR_RETRY + logging.info('Marking task as UP_FOR_RETRY') if task.email_on_retry and task.email: self.email_alert(error, is_retry=True) else: self.state = State.FAILED + if task.retries: + logging.info('All retries failed; marking task as FAILED') + else: + logging.info('Marking task as FAILED.') if task.email_on_failure and task.email: self.email_alert(error, is_retry=False) except Exception as e2: diff --git a/tests/models.py b/tests/models.py index fead9ea121681..635e90a961ed9 100644 --- a/tests/models.py +++ b/tests/models.py @@ -1,14 +1,29 @@ # -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals -import unittest import datetime +import unittest +import time from airflow import models -from airflow.operators.dummy_operator import DummyOperator +from airflow.operators import DummyOperator, BashOperator +from airflow.utils import State, AirflowException class DagTest(unittest.TestCase): @@ -95,14 +110,15 @@ class TaskInstanceTest(unittest.TestCase): def test_run_pooling_task(self): """ - test that running task with mark_success param update task state as SUCCESS - without running task. + test that running task with mark_success param update task state as + SUCCESS without running task. """ dag = models.DAG(dag_id='test_run_pooling_task') task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, pool='test_run_pooling_task_pool', owner='airflow', start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) - ti = models.TaskInstance(task=task, execution_date=datetime.datetime.now()) + ti = models.TaskInstance( + task=task, execution_date=datetime.datetime.now()) ti.run() assert ti.state == models.State.QUEUED @@ -112,9 +128,93 @@ def test_run_pooling_task_with_mark_success(self): without running task. """ dag = models.DAG(dag_id='test_run_pooling_task_with_mark_success') - task = DummyOperator(task_id='test_run_pooling_task_with_mark_success_op', dag=dag, - pool='test_run_pooling_task_with_mark_success_pool', owner='airflow', - start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) - ti = models.TaskInstance(task=task, execution_date=datetime.datetime.now()) + task = DummyOperator( + task_id='test_run_pooling_task_with_mark_success_op', + dag=dag, + pool='test_run_pooling_task_with_mark_success_pool', + owner='airflow', + start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + ti = models.TaskInstance( + task=task, execution_date=datetime.datetime.now()) ti.run(mark_success=True) assert ti.state == models.State.SUCCESS + + def test_retry_delay(self): + """ + Test that retry delays are respected + """ + dag = models.DAG(dag_id='test_retry_handling') + task = BashOperator( + task_id='test_retry_handling_op', + bash_command='exit 1', + retries=1, + retry_delay=datetime.timedelta(seconds=3), + dag=dag, + owner='airflow', + start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + + def run_with_error(ti): + try: + ti.run() + except AirflowException: + pass + + ti = models.TaskInstance( + task=task, execution_date=datetime.datetime.now()) + + # first run -- up for retry + run_with_error(ti) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + self.assertEqual(ti.try_number, 1) + + # second run -- still up for retry because retry_delay hasn't expired + run_with_error(ti) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + + # third run -- failed + time.sleep(3) + run_with_error(ti) + self.assertEqual(ti.state, State.FAILED) + + def test_retry_handling(self): + """ + Test that task retries are handled properly + """ + dag = models.DAG(dag_id='test_retry_handling') + task = BashOperator( + task_id='test_retry_handling_op', + bash_command='exit 1', + retries=1, + retry_delay=datetime.timedelta(seconds=0), + dag=dag, + owner='airflow', + start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + + def run_with_error(ti): + try: + ti.run() + except AirflowException: + pass + + ti = models.TaskInstance( + task=task, execution_date=datetime.datetime.now()) + + # first run -- up for retry + run_with_error(ti) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + self.assertEqual(ti.try_number, 1) + + # second run -- fail + run_with_error(ti) + self.assertEqual(ti.state, State.FAILED) + self.assertEqual(ti.try_number, 2) + + # third run -- up for retry + run_with_error(ti) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + self.assertEqual(ti.try_number, 3) + + # fourth run -- fail + run_with_error(ti) + self.assertEqual(ti.state, State.FAILED) + self.assertEqual(ti.try_number, 4)