Skip to content

Commit

Permalink
Properly measure number of task retry attempts
Browse files Browse the repository at this point in the history
A task’s try_number is unbounded (incremented by 1 on every run) so
it needs to be adjusted both for logging and for seeing if a task
has eclipsed the retry cap. Rerunning a task (either because it
failed or with the `force` option) not only leads to nonsensical
error messages (“Attempt 2 of 1”) but also would never kick off a
retry attempt (because try_number > retries). The solution is to mod
the `try_number` with `retries` to keep everything sensible.

Fixed: use the correct attempt number when logging
Fixed: log when tasks are queued (log message was being created but
not logged)
Fixed: situation where tasks being run after the first time would
not be put up for retry
  • Loading branch information
jlowin committed Mar 28, 2016
1 parent 58029df commit 8051aa9
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 22 deletions.
34 changes: 21 additions & 13 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,32 +1000,35 @@ def run(
)
elif force or self.state in State.runnable():
HR = "\n" + ("-" * 80) + "\n" # Line break
tot_tries = task.retries + 1

# For reporting purposes, we report based on 1-indexed,
# not 0-indexed lists (i.e. Attempt 1 instead of
# Attempt 0 for the first attempt)
msg = "Attempt {} out of {}".format(self.try_number+1,
tot_tries)
self.try_number += 1
msg = msg.format(**locals())
logging.info(HR + msg + HR)
# Attempt 0 for the first attempt).
msg = "Starting attempt {attempt} of {total}".format(
attempt=self.try_number % (task.retries + 1) + 1,
total=task.retries + 1)
self.start_date = datetime.now()

if not mark_success and self.state != State.QUEUED and (
self.pool or self.task.dag.concurrency_reached):
# If a pool is set for this task, marking the task instance
# as QUEUED
self.state = State.QUEUED
# Since we are just getting enqueued, we need to undo
# the try_number increment above and update the message as well
self.try_number -= 1
msg = "Queuing attempt {} out of {}".format(self.try_number+1,
tot_tries)
msg = "Queuing attempt {attempt} of {total}".format(
attempt=self.try_number % (task.retries + 1) + 1,
total=task.retries + 1)
logging.info(HR + msg + HR)

self.queued_dttm = datetime.now()
session.merge(self)
session.commit()
logging.info("Queuing into pool {}".format(self.pool))
return

# print status message
logging.info(HR + msg + HR)
self.try_number += 1

if not test_mode:
session.add(Log(State.RUNNING, self))
self.state = State.RUNNING
Expand Down Expand Up @@ -1121,12 +1124,17 @@ def handle_failure(self, error, test_mode=False, context=None):

# Let's go deeper
try:
if self.try_number <= task.retries:
if task.retries and self.try_number % (task.retries + 1) != 0:
self.state = State.UP_FOR_RETRY
logging.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
self.email_alert(error, is_retry=True)
else:
self.state = State.FAILED
if task.retries:
logging.info('All retries failed; marking task as FAILED')
else:
logging.info('Marking task as FAILED.')
if task.email_on_failure and task.email:
self.email_alert(error, is_retry=False)
except Exception as e2:
Expand Down
118 changes: 109 additions & 9 deletions tests/models.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import unittest
import datetime
import unittest
import time

from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import DummyOperator, BashOperator
from airflow.utils import State, AirflowException


class DagTest(unittest.TestCase):
Expand Down Expand Up @@ -95,14 +110,15 @@ class TaskInstanceTest(unittest.TestCase):

def test_run_pooling_task(self):
"""
test that running task with mark_success param update task state as SUCCESS
without running task.
test that running task with mark_success param update task state as
SUCCESS without running task.
"""
dag = models.DAG(dag_id='test_run_pooling_task')
task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag,
pool='test_run_pooling_task_pool', owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = models.TaskInstance(task=task, execution_date=datetime.datetime.now())
ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())
ti.run()
assert ti.state == models.State.QUEUED

Expand All @@ -112,9 +128,93 @@ def test_run_pooling_task_with_mark_success(self):
without running task.
"""
dag = models.DAG(dag_id='test_run_pooling_task_with_mark_success')
task = DummyOperator(task_id='test_run_pooling_task_with_mark_success_op', dag=dag,
pool='test_run_pooling_task_with_mark_success_pool', owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = models.TaskInstance(task=task, execution_date=datetime.datetime.now())
task = DummyOperator(
task_id='test_run_pooling_task_with_mark_success_op',
dag=dag,
pool='test_run_pooling_task_with_mark_success_pool',
owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())
ti.run(mark_success=True)
assert ti.state == models.State.SUCCESS

def test_retry_delay(self):
"""
Test that retry delays are respected
"""
dag = models.DAG(dag_id='test_retry_handling')
task = BashOperator(
task_id='test_retry_handling_op',
bash_command='exit 1',
retries=1,
retry_delay=datetime.timedelta(seconds=3),
dag=dag,
owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))

def run_with_error(ti):
try:
ti.run()
except AirflowException:
pass

ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())

# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 1)

# second run -- still up for retry because retry_delay hasn't expired
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)

# third run -- failed
time.sleep(3)
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)

def test_retry_handling(self):
"""
Test that task retries are handled properly
"""
dag = models.DAG(dag_id='test_retry_handling')
task = BashOperator(
task_id='test_retry_handling_op',
bash_command='exit 1',
retries=1,
retry_delay=datetime.timedelta(seconds=0),
dag=dag,
owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))

def run_with_error(ti):
try:
ti.run()
except AirflowException:
pass

ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())

# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 1)

# second run -- fail
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti.try_number, 2)

# third run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 3)

# fourth run -- fail
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti.try_number, 4)

0 comments on commit 8051aa9

Please sign in to comment.