Skip to content

Commit

Permalink
[AIRFLOW-2521] backfill - make variable name and logging messages mor…
Browse files Browse the repository at this point in the history
…e acurate

[AIRFLOW-2521] backfill - make variable name and
logging messages more accurate

The term kicked_off in logging and the variable
started are used to
refer to `running` task instances. Let's clarify
the variable names and
messages here.

Fixing unit tests

Closes apache#3416 from mistercrunch/kicked_off_running
  • Loading branch information
mistercrunch authored and Fokko Driesprong committed May 28, 2018
1 parent 45c0c54 commit 32d15a3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 62 deletions.
103 changes: 56 additions & 47 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,7 @@ class _DagRunTaskStatus(object):
"""
Internal status of the backfill job. This class is intended to be instantiated
only within a BackfillJob instance and will track the execution of tasks,
e.g. started, skipped, succeeded, failed, etc. Information about the dag runs
e.g. running, skipped, succeeded, failed, etc. Information about the dag runs
related to the backfill job are also being tracked in this structure,
.e.g finished runs, etc. Any other status related information related to the
execution of dag runs / tasks can be included in this structure since it makes
Expand All @@ -1896,7 +1896,7 @@ class _DagRunTaskStatus(object):
# TODO(edgarRd): AIRFLOW-1444: Add consistency check on counts
def __init__(self,
to_run=None,
started=None,
running=None,
skipped=None,
succeeded=None,
failed=None,
Expand All @@ -1910,8 +1910,8 @@ def __init__(self,
"""
:param to_run: Tasks to run in the backfill
:type to_run: dict[Tuple[String, String, DateTime], TaskInstance]
:param started: Maps started task instance key to task instance object
:type started: dict[Tuple[String, String, DateTime], TaskInstance]
:param running: Maps running task instance key to task instance object
:type running: dict[Tuple[String, String, DateTime], TaskInstance]
:param skipped: Tasks that have been skipped
:type skipped: set[Tuple[String, String, DateTime]]
:param succeeded: Tasks that have succeeded so far
Expand All @@ -1932,7 +1932,7 @@ def __init__(self,
:type total_runs: int
"""
self.to_run = to_run or dict()
self.started = started or dict()
self.running = running or dict()
self.skipped = skipped or set()
self.succeeded = succeeded or set()
self.failed = failed or set()
Expand Down Expand Up @@ -1976,27 +1976,27 @@ def _update_counters(self, ti_status):
:param ti_status: the internal status of the backfill job tasks
:type ti_status: BackfillJob._DagRunTaskStatus
"""
for key, ti in list(ti_status.started.items()):
for key, ti in list(ti_status.running.items()):
ti.refresh_from_db()
if ti.state == State.SUCCESS:
ti_status.succeeded.add(key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.started.pop(key)
ti_status.running.pop(key)
continue
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.started.pop(key)
ti_status.running.pop(key)
continue
elif ti.state == State.FAILED:
self.log.error("Task instance %s failed", ti)
ti_status.failed.add(key)
ti_status.started.pop(key)
ti_status.running.pop(key)
continue
# special case: if the task needs to run again put it back
elif ti.state == State.UP_FOR_RETRY:
self.log.warning("Task instance %s is up for retry", ti)
ti_status.started.pop(key)
ti_status.running.pop(key)
ti_status.to_run[key] = ti
# special case: The state of the task can be set to NONE by the task itself
# when it reaches concurrency limits. It could also happen when the state
Expand All @@ -2010,26 +2010,26 @@ def _update_counters(self, ti_status):
ti
)
ti.set_state(State.SCHEDULED)
ti_status.started.pop(key)
ti_status.running.pop(key)
ti_status.to_run[key] = ti

def _manage_executor_state(self, started):
def _manage_executor_state(self, running):
"""
Checks if the executor agrees with the state of task instances
that are running
:param started: dict of key, task to verify
:param running: dict of key, task to verify
"""
executor = self.executor

for key, state in list(executor.get_event_buffer().items()):
if key not in started:
if key not in running:
self.log.warning(
"%s state %s not in started=%s",
key, state, started.values()
"%s state %s not in running=%s",
key, state, running.values()
)
continue

ti = started[key]
ti = running[key]
ti.refresh_from_db()

self.log.debug("Executor state: %s task %s", state, ti)
Expand Down Expand Up @@ -2139,7 +2139,7 @@ def _log_progress(self, ti_status):
"finished run {0} of {1}",
"tasks waiting: {2}",
"succeeded: {3}",
"kicked_off: {4}",
"running: {4}",
"failed: {5}",
"skipped: {6}",
"deadlocked: {7}",
Expand All @@ -2149,7 +2149,7 @@ def _log_progress(self, ti_status):
ti_status.total_runs,
len(ti_status.to_run),
len(ti_status.succeeded),
len(ti_status.started),
len(ti_status.running),
len(ti_status.failed),
len(ti_status.skipped),
len(ti_status.deadlocked),
Expand Down Expand Up @@ -2187,7 +2187,7 @@ def _process_backfill_task_instances(self,

executed_run_dates = []

while ((len(ti_status.to_run) > 0 or len(ti_status.started) > 0) and
while ((len(ti_status.to_run) > 0 or len(ti_status.running) > 0) and
len(ti_status.deadlocked) == 0):
self.log.debug("*** Clearing out not_ready list ***")
ti_status.not_ready.clear()
Expand All @@ -2209,13 +2209,15 @@ def _process_backfill_task_instances(self,
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))
self.log.debug("Task instance to run %s state %s", ti, ti.state)
self.log.debug(
"Task instance to run %s state %s", ti, ti.state)

# guard against externally modified tasks instances or
# in case max concurrency has been reached at task runtime
if ti.state == State.NONE:
self.log.warning(
"FIXME: task instance {} state was set to None externally. This should not happen"
"FIXME: task instance {} state was set to None "
"externally. This should not happen"
)
ti.set_state(State.SCHEDULED, session=session)

Expand All @@ -2225,29 +2227,29 @@ def _process_backfill_task_instances(self,
ti_status.succeeded.add(key)
self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue
elif ti.state == State.SKIPPED:
ti_status.skipped.add(key)
self.log.debug("Task instance %s skipped. Don't rerun.", ti)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue
elif ti.state == State.FAILED:
self.log.error("Task instance %s failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue
elif ti.state == State.UPSTREAM_FAILED:
self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue

backfill_context = DepContext(
Expand All @@ -2266,7 +2268,8 @@ def _process_backfill_task_instances(self,
if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
if executor.has_task(ti):
self.log.debug(
"Task Instance %s already in executor waiting for queue to clear",
"Task Instance %s already in executor "
"waiting for queue to clear",
ti
)
else:
Expand All @@ -2288,7 +2291,7 @@ def _process_backfill_task_instances(self,
ignore_depends_on_past=ignore_depends_on_past,
pool=self.pool,
cfg_path=cfg_path)
ti_status.started[key] = ti
ti_status.running[key] = ti
ti_status.to_run.pop(key)
session.commit()
continue
Expand All @@ -2297,15 +2300,17 @@ def _process_backfill_task_instances(self,
self.log.error("Task instance %s upstream failed", ti)
ti_status.failed.add(key)
ti_status.to_run.pop(key)
if key in ti_status.started:
ti_status.started.pop(key)
if key in ti_status.running:
ti_status.running.pop(key)
continue

# special case
if ti.state == State.UP_FOR_RETRY:
self.log.debug("Task instance %s retry period not expired yet", ti)
if key in ti_status.started:
ti_status.started.pop(key)
self.log.debug(
"Task instance %s retry period not "
"expired yet", ti)
if key in ti_status.running:
ti_status.running.pop(key)
ti_status.to_run[key] = ti
continue

Expand All @@ -2322,7 +2327,7 @@ def _process_backfill_task_instances(self,
# is deadlocked
if (ti_status.not_ready and
ti_status.not_ready == set(ti_status.to_run) and
len(ti_status.started) == 0):
len(ti_status.running) == 0):
self.log.warning(
"Deadlock discovered for ti_status.to_run=%s",
ti_status.to_run.values()
Expand All @@ -2331,7 +2336,7 @@ def _process_backfill_task_instances(self,
ti_status.to_run.clear()

# check executor state
self._manage_executor_state(ti_status.started)
self._manage_executor_state(ti_status.running)

# update the task counters
self._update_counters(ti_status=ti_status)
Expand Down Expand Up @@ -2382,7 +2387,7 @@ def _collect_errors(self, ti_status, session=None):
'"ignore_first_depends_on_past=True" or passing "-I" at '
'the command line.')
err += ' These tasks have succeeded:\n{}\n'.format(ti_status.succeeded)
err += ' These tasks have started:\n{}\n'.format(ti_status.started)
err += ' These tasks are running:\n{}\n'.format(ti_status.running)
err += ' These tasks have failed:\n{}\n'.format(ti_status.failed)
err += ' These tasks are skipped:\n{}\n'.format(ti_status.skipped)
err += ' These tasks are deadlocked:\n{}\n'.format(ti_status.deadlocked)
Expand Down Expand Up @@ -2608,18 +2613,22 @@ def heartbeat_callback(self, session=None):
if ti.state == State.RUNNING:
if not same_hostname:
self.log.warning("The recorded hostname {ti.hostname} "
"does not match this instance's hostname "
"{fqdn}".format(**locals()))
"does not match this instance's hostname "
"{fqdn}".format(**locals()))
raise AirflowException("Hostname of job runner does not match")
elif not same_process:
current_pid = os.getpid()
self.log.warning("Recorded pid {ti.pid} does not match the current pid "
"{current_pid}".format(**locals()))
self.log.warning("Recorded pid {ti.pid} does not match "
"the current pid "
"{current_pid}".format(**locals()))
raise AirflowException("PID of job runner does not match")
elif (self.task_runner.return_code() is None
and hasattr(self.task_runner, 'process')):
elif (
self.task_runner.return_code() is None and
hasattr(self.task_runner, 'process')
):
self.log.warning(
"State of this instance has been externally set to %s. Taking the poison pill.",
"State of this instance has been externally set to %s. "
"Taking the poison pill.",
ti.state
)
self.task_runner.terminate()
Expand Down
30 changes: 15 additions & 15 deletions tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -33,6 +33,8 @@
import unittest
from tempfile import mkdtemp

import sqlalchemy

from airflow import AirflowException, settings, models
from airflow.bin import cli
from airflow.executors import BaseExecutor, SequentialExecutor
Expand All @@ -51,15 +53,13 @@
from airflow.utils.net import get_hostname

from mock import Mock, patch, MagicMock, PropertyMock
from sqlalchemy.orm.session import make_transient
from tests.executors.test_executor import TestExecutor

from tests.core import TEST_DAG_FOLDER

from airflow import configuration
configuration.load_test_config()

import sqlalchemy

try:
from unittest import mock
Expand All @@ -72,7 +72,8 @@
DEV_NULL = '/dev/null'
DEFAULT_DATE = timezone.datetime(2016, 1, 1)

# Include the words "airflow" and "dag" in the file contents, tricking airflow into thinking these
# Include the words "airflow" and "dag" in the file contents,
# tricking airflow into thinking these
# files contain a DAG (otherwise Airflow will skip them)
PARSEABLE_DAG_FILE_CONTENTS = '"airflow DAG"'
UNPARSEABLE_DAG_FILE_CONTENTS = 'airflow DAG'
Expand Down Expand Up @@ -135,7 +136,7 @@ def test_backfill_multi_dates(self):

session = settings.Session()
drs = session.query(DagRun).filter(
DagRun.dag_id=='example_bash_operator'
DagRun.dag_id == 'example_bash_operator'
).order_by(DagRun.execution_date).all()

self.assertTrue(drs[0].execution_date == DEFAULT_DATE)
Expand Down Expand Up @@ -714,7 +715,6 @@ def test_backfill_execute_subdag(self):
subdag.clear()
dag.clear()


def test_backfill_execute_subdag_with_removed_task(self):
"""
Ensure that subdag operators execute properly in the case where
Expand Down Expand Up @@ -783,9 +783,9 @@ def test_update_counters(self):

# test for success
ti.set_state(State.SUCCESS, session)
ti_status.started[ti.key] = ti
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status)
self.assertTrue(len(ti_status.started) == 0)
self.assertTrue(len(ti_status.running) == 0)
self.assertTrue(len(ti_status.succeeded) == 1)
self.assertTrue(len(ti_status.skipped) == 0)
self.assertTrue(len(ti_status.failed) == 0)
Expand All @@ -795,9 +795,9 @@ def test_update_counters(self):

# test for skipped
ti.set_state(State.SKIPPED, session)
ti_status.started[ti.key] = ti
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status)
self.assertTrue(len(ti_status.started) == 0)
self.assertTrue(len(ti_status.running) == 0)
self.assertTrue(len(ti_status.succeeded) == 0)
self.assertTrue(len(ti_status.skipped) == 1)
self.assertTrue(len(ti_status.failed) == 0)
Expand All @@ -807,9 +807,9 @@ def test_update_counters(self):

# test for failed
ti.set_state(State.FAILED, session)
ti_status.started[ti.key] = ti
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status)
self.assertTrue(len(ti_status.started) == 0)
self.assertTrue(len(ti_status.running) == 0)
self.assertTrue(len(ti_status.succeeded) == 0)
self.assertTrue(len(ti_status.skipped) == 0)
self.assertTrue(len(ti_status.failed) == 1)
Expand All @@ -820,9 +820,9 @@ def test_update_counters(self):
# test for reschedule
# test for failed
ti.set_state(State.NONE, session)
ti_status.started[ti.key] = ti
ti_status.running[ti.key] = ti
job._update_counters(ti_status=ti_status)
self.assertTrue(len(ti_status.started) == 0)
self.assertTrue(len(ti_status.running) == 0)
self.assertTrue(len(ti_status.succeeded) == 0)
self.assertTrue(len(ti_status.skipped) == 0)
self.assertTrue(len(ti_status.failed) == 0)
Expand Down

0 comments on commit 32d15a3

Please sign in to comment.