Skip to content

Commit

Permalink
[AIRFLOW-747] Fix retry_delay not honoured
Browse files Browse the repository at this point in the history
Dag runs were marked deadlocked although a task was
still up for retry and in its retry_delay period. Next to
that _execute_task_instances was picking up tasks in
UP_FOR_RETRY state directly from the database, while
tasks that pass the dependency check will be set to scheduled.
  • Loading branch information
bolkedebruin committed Jan 13, 2017
1 parent e0f5c0c commit 68f484c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 6 deletions.
11 changes: 9 additions & 2 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ def _change_state_for_tis_without_dagrun(self,
.query(models.TaskInstance)
.filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
.filter(models.TaskInstance.state.in_(old_states))
.with_for_update()
.all()
)
""":type: list[TaskInstance]"""
Expand Down Expand Up @@ -1050,6 +1051,13 @@ def _execute_task_instances(self,
.format(task_instance.key, priority, queue))

# Set the state to queued
task_instance.refresh_from_db(lock_for_update=True, session=session)
if task_instance.state not in states:
self.logger.info("Task {} was set to {} outside this scheduler."
.format(task_instance.key, task_instance.state))
session.commit()
continue

self.logger.info("Setting state of {} to {}".format(
task_instance.key, State.QUEUED))
task_instance.state = State.QUEUED
Expand Down Expand Up @@ -1393,8 +1401,7 @@ def _execute_helper(self, processor_manager):
State.NONE)

self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,
State.UP_FOR_RETRY))
(State.SCHEDULED,))

# Call hearbeats
self.logger.info("Heartbeating the executor")
Expand Down
10 changes: 8 additions & 2 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3693,6 +3693,7 @@ class DagRun(Base):

ID_PREFIX = 'scheduled__'
ID_FORMAT_PREFIX = ID_PREFIX + '{0}'
DEADLOCK_CHECK_DEP_CONTEXT = DepContext(ignore_in_retry_period=True)

id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
Expand Down Expand Up @@ -3879,8 +3880,13 @@ def update_state(self, session=None):
# small speed up
if unfinished_tasks and none_depends_on_past:
# todo: this can actually get pretty slow: one task costs between 0.01-015s
no_dependencies_met = all(not t.are_dependencies_met(session=session)
for t in unfinished_tasks)
no_dependencies_met = all(
# Use a special dependency context that ignores task's up for retry
# dependency, since a task that is up for retry is not necessarily
# deadlocked.
not t.are_dependencies_met(dep_context=self.DEADLOCK_CHECK_DEP_CONTEXT,
session=session)
for t in unfinished_tasks)

duration = (datetime.now() - start_dttm).total_seconds() * 1000
Stats.timing("dagrun.dependency-check.{}.{}".
Expand Down
4 changes: 4 additions & 0 deletions airflow/ti_deps/dep_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class DepContext(object):
:param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for
Backfills)
:type ignore_depends_on_past: boolean
:param ignore_in_retry_period: Ignore the retry period for task instances
:type ignore_in_retry_period: boolean
:param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and
trigger rule
:type ignore_task_deps: boolean
Expand All @@ -61,12 +63,14 @@ def __init__(
flag_upstream_failed=False,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_in_retry_period=False,
ignore_task_deps=False,
ignore_ti_state=False):
self.deps = deps or set()
self.flag_upstream_failed = flag_upstream_failed
self.ignore_all_deps = ignore_all_deps
self.ignore_depends_on_past = ignore_depends_on_past
self.ignore_in_retry_period = ignore_in_retry_period
self.ignore_task_deps = ignore_task_deps
self.ignore_ti_state = ignore_ti_state

Expand Down
6 changes: 6 additions & 0 deletions airflow/ti_deps/deps/not_in_retry_period_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ class NotInRetryPeriodDep(BaseTIDep):

@provide_session
def _get_dep_statuses(self, ti, session, dep_context):
if dep_context.ignore_in_retry_period:
yield self._passing_status(
reason="The context specified that being in a retry period was "
"permitted.")
raise StopIteration

if ti.state != State.UP_FOR_RETRY:
yield self._passing_status(
reason="The task instance was not marked for retrying.")
Expand Down
10 changes: 8 additions & 2 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,8 @@ def test_retry_still_in_executor(self):

dag = DAG(
dag_id='test_retry_still_in_executor',
start_date=DEFAULT_DATE)
start_date=DEFAULT_DATE,
schedule_interval="@once")
dag_task1 = BashOperator(
task_id='test_retry_handling_op',
bash_command='exit 1',
Expand Down Expand Up @@ -963,11 +964,16 @@ def run_with_error(task):
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 1)

ti.refresh_from_db(lock_for_update=True, session=session)
ti.state = State.SCHEDULED
session.merge(ti)
session.commit()

# do not schedule
do_schedule()
self.assertTrue(executor.has_task(ti))
ti.refresh_from_db()
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.state, State.SCHEDULED)

# now the executor has cleared and it should be allowed the re-queue
executor.queued_tasks.clear()
Expand Down

0 comments on commit 68f484c

Please sign in to comment.