Skip to content

Commit

Permalink
Revert "[AIRFLOW-4797] Improve performance and behaviour of zombie de…
Browse files Browse the repository at this point in the history
…tection (apache#5511)"

This reverts commit 2bdb053.
  • Loading branch information
KevinYang21 committed Oct 14, 2019
1 parent bb93a75 commit 8979607
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 122 deletions.
26 changes: 19 additions & 7 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,21 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
:type pickle_dags: bool
:param dag_id_white_list: If specified, only look at these DAG ID's
:type dag_id_white_list: list[unicode]
:param zombies: zombie task instances to kill
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
"""

# Counter that increments every time an instance of this class is created
class_creation_counter = 0

def __init__(self, file_path, pickle_dags, dag_id_white_list):
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
self._file_path = file_path

# The process that was launched to process the given .
self._process = None
self._dag_id_white_list = dag_id_white_list
self._pickle_dags = pickle_dags
self._zombies = zombies
# The result of Scheduler.process_file(file_path).
self._result = None
# Whether the process is done running.
Expand All @@ -94,7 +97,8 @@ def _run_file_processor(result_channel,
file_path,
pickle_dags,
dag_id_white_list,
thread_name):
thread_name,
zombies):
"""
Process the given file.
Expand All @@ -112,6 +116,8 @@ def _run_file_processor(result_channel,
:type thread_name: unicode
:return: the process that was launched
:rtype: multiprocessing.Process
:param zombies: zombie task instances to kill
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
"""
# This helper runs in the newly created process
log = logging.getLogger("airflow.processor")
Expand Down Expand Up @@ -139,7 +145,9 @@ def _run_file_processor(result_channel,
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
result = scheduler_job.process_file(file_path, pickle_dags)
result = scheduler_job.process_file(file_path,
zombies,
pickle_dags)
result_channel.send(result)
end_time = time.time()
log.info(
Expand Down Expand Up @@ -170,6 +178,7 @@ def start(self):
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id),
self._zombies
),
name="DagFileProcessor{}-Process".format(self._instance_id)
)
Expand Down Expand Up @@ -1285,10 +1294,11 @@ def _execute(self):
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

def processor_factory(file_path):
def processor_factory(file_path, zombies):
return DagFileProcessor(file_path,
pickle_dags,
self.dag_ids)
self.dag_ids,
zombies)

# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
Expand Down Expand Up @@ -1465,7 +1475,7 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]):
return dags

@provide_session
def process_file(self, file_path, pickle_dags=False, session=None):
def process_file(self, file_path, zombies, pickle_dags=False, session=None):
"""
Process a Python file containing Airflow DAGs.
Expand All @@ -1484,6 +1494,8 @@ def process_file(self, file_path, pickle_dags=False, session=None):
:param file_path: the path to the Python file that should be executed
:type file_path: unicode
:param zombies: zombie task instances to kill.
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
:type pickle_dags: bool
Expand Down Expand Up @@ -1567,7 +1579,7 @@ def process_file(self, file_path, pickle_dags=False, session=None):
except Exception:
self.log.exception("Error logging import errors!")
try:
dagbag.kill_zombies()
dagbag.kill_zombies(zombies)
except Exception:
self.log.exception("Error killing zombies!")

Expand Down
56 changes: 23 additions & 33 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import textwrap
import zipfile
from collections import namedtuple
from datetime import datetime, timedelta
from datetime import datetime

from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter
from sqlalchemy import or_
Expand All @@ -41,7 +41,6 @@
from airflow.utils.db import provide_session
from airflow.utils.helpers import pprinttable
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow.utils.timeout import timeout


Expand Down Expand Up @@ -273,43 +272,34 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
return found_dags

@provide_session
def kill_zombies(self, session=None):
def kill_zombies(self, zombies, session=None):
"""
Fail zombie tasks, which are tasks that haven't
Fail given zombie tasks, which are tasks that haven't
had a heartbeat for too long, in the current DagBag.
:param zombies: zombie task instances to kill.
:type zombies: airflow.utils.dag_processing.SimpleTaskInstance
:param session: DB session.
:type session: sqlalchemy.orm.session.Session
"""
# Avoid circular import
from airflow.models.taskinstance import TaskInstance as TI
from airflow.jobs import LocalTaskJob as LJ

# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
limit_dttm = timezone.utcnow() - timedelta(seconds=self.SCHEDULER_ZOMBIE_TASK_THRESHOLD)
self.log.debug("Failing jobs without heartbeat after %s", limit_dttm)

tis = (
session.query(TI)
.join(LJ, TI.job_id == LJ.id)
.filter(TI.state == State.RUNNING)
.filter(TI.dag_id.in_(self.dags))
.filter(
or_(
LJ.state != State.RUNNING,
LJ.latest_heartbeat < limit_dttm,
)
).all()
)
for ti in tis:
self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s",
ti.dag_id, ti.task_id, ti.execution_date.isoformat())
ti.test_mode = self.UNIT_TEST_MODE
ti.task = self.dags[ti.dag_id].get_task(ti.task_id)
ti.handle_failure("{} detected as zombie".format(ti),
ti.test_mode, ti.get_template_context())
self.log.info('Marked zombie job %s as %s', ti, ti.state)
Stats.incr('zombies_killed')
from airflow.models.taskinstance import TaskInstance # Avoid circular import

for zombie in zombies:
if zombie.dag_id in self.dags:
dag = self.dags[zombie.dag_id]
if zombie.task_id in dag.task_ids:
task = dag.get_task(zombie.task_id)
ti = TaskInstance(task, zombie.execution_date)
# Get properties needed for failure handling from SimpleTaskInstance.
ti.start_date = zombie.start_date
ti.end_date = zombie.end_date
ti.try_number = zombie.try_number
ti.state = zombie.state
ti.test_mode = self.UNIT_TEST_MODE
ti.handle_failure("{} detected as zombie".format(ti),
ti.test_mode, ti.get_template_context())
self.log.info('Marked zombie job %s as %s', ti, ti.state)
Stats.incr('zombies_killed')
session.commit()

def bag_dag(self, dag, parent_dag, root_dag):
Expand Down
52 changes: 50 additions & 2 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import time
import zipfile
from abc import ABCMeta, abstractmethod
from datetime import datetime
from collections import defaultdict
from collections import namedtuple
from datetime import datetime, timedelta
from importlib import import_module
from typing import Iterable, NamedTuple, Optional

Expand All @@ -47,6 +49,8 @@
from airflow.utils.db import provide_session
from airflow.utils.helpers import reap_process_group
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from sqlalchemy import or_


class SimpleDag(BaseDag):
Expand Down Expand Up @@ -754,6 +758,9 @@ def __init__(self,
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler',
'print_stats_interval')
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
self._zombie_threshold_secs = (
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
# Map from file path to the processor
self._processors = {}

Expand Down Expand Up @@ -1233,11 +1240,13 @@ def heartbeat(self):

self._file_path_queue.extend(files_paths_to_queue)

zombies = self._find_zombies()

# Start more processors if we have enough slots and files to process
while (self._parallelism - len(self._processors) > 0 and
len(self._file_path_queue) > 0):
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path)
processor = self._processor_factory(file_path, zombies)
Stats.incr('dag_processing.processes')

processor.start()
Expand All @@ -1252,6 +1261,45 @@ def heartbeat(self):

return simple_dags

@provide_session
def _find_zombies(self, session):
"""
Find zombie task instances, which are tasks haven't heartbeated for too long.
:return: Zombie task instances in SimpleTaskInstance format.
"""
now = timezone.utcnow()
zombies = []
if (now - self._last_zombie_query_time).total_seconds() \
> self._zombie_query_interval:
# to avoid circular imports
from airflow.jobs import LocalTaskJob as LJ
self.log.info("Finding 'running' jobs without a recent heartbeat")
TI = airflow.models.TaskInstance
limit_dttm = timezone.utcnow() - timedelta(
seconds=self._zombie_threshold_secs)
self.log.info("Failing jobs without heartbeat after %s", limit_dttm)

tis = (
session.query(TI)
.join(LJ, TI.job_id == LJ.id)
.filter(TI.state == State.RUNNING)
.filter(
or_(
LJ.state != State.RUNNING,
LJ.latest_heartbeat < limit_dttm,
)
).all()
)
self._last_zombie_query_time = timezone.utcnow()
for ti in tis:
sti = SimpleTaskInstance(ti)
self.log.info(
"Detected zombie job with dag_id %s, task_id %s, and execution date %s",
sti.dag_id, sti.task_id, sti.execution_date.isoformat())
zombies.append(sti)

return zombies

def _kill_timed_out_processors(self):
"""
Kill any file processors that timeout to defend against process hangs.
Expand Down
73 changes: 6 additions & 67 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timezone
import inspect
import os
import shutil
Expand All @@ -29,6 +30,7 @@
import airflow.example_dags
from airflow import models
from airflow.configuration import conf
from airflow.utils.dag_processing import SimpleTaskInstance
from airflow.jobs import LocalTaskJob as LJ
from airflow.models import DagBag, DagModel, TaskInstance as TI
from airflow.utils.db import create_session
Expand Down Expand Up @@ -605,92 +607,29 @@ def test_process_file_with_none(self):
self.assertEqual([], dagbag.process_file(None))

@patch.object(TI, 'handle_failure')
def test_kill_zombies_when_job_state_is_not_running(self, mock_ti_handle_failure):
def test_kill_zombies(self, mock_ti_handle_failure):
"""
Test that kill zombies calls TI's failure handler with proper context
Test that kill zombies call TIs failure handler with proper context
"""
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
with create_session() as session:
session.query(TI).delete()
session.query(LJ).delete()
dag = dagbag.get_dag('example_branch_operator')
task = dag.get_task(task_id='run_this_first')

ti = TI(task, DEFAULT_DATE, State.RUNNING)
lj = LJ(ti)
lj.state = State.SHUTDOWN
lj.id = 1
ti.job_id = lj.id

session.add(lj)
session.add(ti)
session.commit()

dagbag.kill_zombies()
zombies = [SimpleTaskInstance(ti)]
dagbag.kill_zombies(zombies)
mock_ti_handle_failure.assert_called_once_with(
ANY,
conf.getboolean('core', 'unit_test_mode'),
ANY
)

@patch.object(TI, 'handle_failure')
def test_kill_zombie_when_job_received_no_heartbeat(self, mock_ti_handle_failure):
"""
Test that kill zombies calls TI's failure handler with proper context
"""
zombie_threshold_secs = (
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
with create_session() as session:
session.query(TI).delete()
session.query(LJ).delete()
dag = dagbag.get_dag('example_branch_operator')
task = dag.get_task(task_id='run_this_first')

ti = TI(task, DEFAULT_DATE, State.RUNNING)
lj = LJ(ti)
lj.latest_heartbeat = utcnow() - timedelta(seconds=zombie_threshold_secs)
lj.state = State.RUNNING
lj.id = 1
ti.job_id = lj.id

session.add(lj)
session.add(ti)
session.commit()

dagbag.kill_zombies()
mock_ti_handle_failure.assert_called_once_with(
ANY,
conf.getboolean('core', 'unit_test_mode'),
ANY
)

@patch.object(TI, 'handle_failure')
def test_kill_zombies_doesn_nothing(self, mock_ti_handle_failure):
"""
Test that kill zombies does nothing when job is running and received heartbeat
"""
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
with create_session() as session:
session.query(TI).delete()
session.query(LJ).delete()
dag = dagbag.get_dag('example_branch_operator')
task = dag.get_task(task_id='run_this_first')

ti = TI(task, DEFAULT_DATE, State.RUNNING)
lj = LJ(ti)
lj.latest_heartbeat = utcnow()
lj.state = State.RUNNING
lj.id = 1
ti.job_id = lj.id

session.add(lj)
session.add(ti)
session.commit()

dagbag.kill_zombies()
mock_ti_handle_failure.assert_not_called()

def test_deactivate_unknown_dags(self):
"""
Test that dag_ids not passed into deactivate_unknown_dags
Expand Down
Loading

0 comments on commit 8979607

Please sign in to comment.