Skip to content

Commit

Permalink
[AIRFLOW-4797] Improve performance and behaviour of zombie detection (a…
Browse files Browse the repository at this point in the history
…pache#5511)

Moved query to fetch zombies from DagFileProcessorManager to DagBag class. Changed query to only look for DAGs of the current DAG bag. The query now uses index ti_dag_state instead of ti_state. Removed no longer required zombies parameters from many function signatures.

The query is now executed on every call to DagBag.kill_zombies which is called when the DAG file is processed which frequency depends on scheduler_heartbeat_sec and processor_poll_interval (AFAIU). The query is faster than the previous one (see also stats below). It's also negligible IMHO because during DAG file processing many other queries (DAG runs and task instances are created, task instance dependencies are checked) are executed.
  • Loading branch information
seelmann authored and ashb committed Jul 4, 2019
1 parent 977af46 commit 2bdb053
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 151 deletions.
28 changes: 8 additions & 20 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,14 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
# Counter that increments everytime an instance of this class is created
class_creation_counter = 0

def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
def __init__(self, file_path, pickle_dags, dag_id_white_list):
"""
:param file_path: a Python file containing Airflow DAG definitions
:type file_path: unicode
:param pickle_dags: whether to serialize the DAG objects to the DB
:type pickle_dags: bool
:param dag_id_whitelist: If specified, only look at these DAG ID's
:type dag_id_whitelist: list[unicode]
:param zombies: zombie task instances to kill
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
"""
self._file_path = file_path
# Queue that's used to pass results from the child process.
Expand All @@ -78,7 +76,6 @@ def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
self._process = None
self._dag_id_white_list = dag_id_white_list
self._pickle_dags = pickle_dags
self._zombies = zombies
# The result of Scheduler.process_file(file_path).
self._result = None
# Whether the process is done running.
Expand All @@ -99,8 +96,7 @@ def _launch_process(result_queue,
file_path,
pickle_dags,
dag_id_white_list,
thread_name,
zombies):
thread_name):
"""
Launch a process to process the given file.
Expand All @@ -118,8 +114,6 @@ def _launch_process(result_queue,
:type thread_name: unicode
:return: the process that was launched
:rtype: multiprocessing.Process
:param zombies: zombie task instances to kill
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
"""
def helper():
# This helper runs in the newly created process
Expand Down Expand Up @@ -147,9 +141,7 @@ def helper():
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
result = scheduler_job.process_file(file_path,
zombies,
pickle_dags)
result = scheduler_job.process_file(file_path, pickle_dags)
result_queue.put(result)
end_time = time.time()
log.info(
Expand Down Expand Up @@ -181,8 +173,7 @@ def start(self):
self.file_path,
self._pickle_dags,
self._dag_id_white_list,
"DagFileProcessor{}".format(self._instance_id),
self._zombies)
"DagFileProcessor{}".format(self._instance_id))
self._start_time = timezone.utcnow()

def terminate(self, sigkill=False):
Expand Down Expand Up @@ -1275,11 +1266,10 @@ def _execute(self):
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)

def processor_factory(file_path, zombies):
def processor_factory(file_path):
return DagFileProcessor(file_path,
pickle_dags,
self.dag_ids,
zombies)
self.dag_ids)

# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
Expand Down Expand Up @@ -1442,7 +1432,7 @@ def _execute_helper(self):
settings.Session.remove()

@provide_session
def process_file(self, file_path, zombies, pickle_dags=False, session=None):
def process_file(self, file_path, pickle_dags=False, session=None):
"""
Process a Python file containing Airflow DAGs.
Expand All @@ -1461,8 +1451,6 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
:param file_path: the path to the Python file that should be executed
:type file_path: unicode
:param zombies: zombie task instances to kill.
:type zombies: list[airflow.utils.dag_processing.SimpleTaskInstance]
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
:type pickle_dags: bool
Expand Down Expand Up @@ -1556,7 +1544,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
except Exception:
self.log.exception("Error logging import errors!")
try:
dagbag.kill_zombies(zombies)
dagbag.kill_zombies()
except Exception:
self.log.exception("Error killing zombies!")

Expand Down
61 changes: 37 additions & 24 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import textwrap
import zipfile
from collections import namedtuple
from datetime import datetime
from datetime import datetime, timedelta

import six
from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError
from sqlalchemy import or_

from airflow import configuration, settings
from airflow.dag.base_dag import BaseDagBag
Expand All @@ -40,6 +41,7 @@
from airflow.utils.db import provide_session
from airflow.utils.helpers import pprinttable
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow.utils.timeout import timeout


Expand Down Expand Up @@ -268,35 +270,46 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
return found_dags

@provide_session
def kill_zombies(self, zombies, session=None):
def kill_zombies(self, session=None):
"""
Fail given zombie tasks, which are tasks that haven't
Fail zombie tasks, which are tasks that haven't
had a heartbeat for too long, in the current DagBag.
:param zombies: zombie task instances to kill.
:type zombies: airflow.utils.dag_processing.SimpleTaskInstance
:param session: DB session.
:type session: sqlalchemy.orm.session.Session
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import

for zombie in zombies:
if zombie.dag_id in self.dags:
dag = self.dags[zombie.dag_id]
if zombie.task_id in dag.task_ids:
task = dag.get_task(zombie.task_id)
ti = TaskInstance(task, zombie.execution_date)
# Get properties needed for failure handling from SimpleTaskInstance.
ti.start_date = zombie.start_date
ti.end_date = zombie.end_date
ti.try_number = zombie.try_number
ti.state = zombie.state
ti.test_mode = configuration.getboolean('core', 'unit_test_mode')
ti.handle_failure("{} detected as zombie".format(ti),
ti.test_mode, ti.get_template_context())
self.log.info(
'Marked zombie job %s as %s', ti, ti.state)
Stats.incr('zombies_killed')
# Avoid circular import
from airflow.models.taskinstance import TaskInstance as TI
from airflow.jobs import LocalTaskJob as LJ

# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
zombie_threshold_secs = (
configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
limit_dttm = timezone.utcnow() - timedelta(
seconds=zombie_threshold_secs)
self.log.debug("Failing jobs without heartbeat after %s", limit_dttm)

tis = (
session.query(TI)
.join(LJ, TI.job_id == LJ.id)
.filter(TI.state == State.RUNNING)
.filter(TI.dag_id.in_(self.dags))
.filter(
or_(
LJ.state != State.RUNNING,
LJ.latest_heartbeat < limit_dttm,
)
).all()
)
for ti in tis:
self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s",
ti.dag_id, ti.task_id, ti.execution_date.isoformat())
ti.test_mode = configuration.getboolean('core', 'unit_test_mode')
ti.task = self.dags[ti.dag_id].get_task(ti.task_id)
ti.handle_failure("{} detected as zombie".format(ti),
ti.test_mode, ti.get_template_context())
self.log.info('Marked zombie job %s as %s', ti, ti.state)
Stats.incr('zombies_killed')
session.commit()

def bag_dag(self, dag, parent_dag, root_dag):
Expand Down
48 changes: 1 addition & 47 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections import namedtuple
from datetime import timedelta
from importlib import import_module
import enum
from queue import Empty

import psutil
from six.moves import reload_module
from sqlalchemy import or_
from tabulate import tabulate

# To avoid circular imports
Expand All @@ -48,7 +46,6 @@
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State


class SimpleDag(BaseDag):
Expand Down Expand Up @@ -757,9 +754,6 @@ def __init__(self,
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler',
'print_stats_interval')
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
self._zombie_threshold_secs = (
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
# Map from file path to the processor
self._processors = {}
# Map from file path to the last runtime
Expand Down Expand Up @@ -1217,13 +1211,11 @@ def heartbeat(self):

self._file_path_queue.extend(files_paths_to_queue)

zombies = self._find_zombies()

# Start more processors if we have enough slots and files to process
while (self._parallelism - len(self._processors) > 0 and
len(self._file_path_queue) > 0):
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path, zombies)
processor = self._processor_factory(file_path)

processor.start()
self.log.debug(
Expand All @@ -1237,44 +1229,6 @@ def heartbeat(self):

return simple_dags

@provide_session
def _find_zombies(self, session):
"""
Find zombie task instances, which are tasks haven't heartbeated for too long.
:return: Zombie task instances in SimpleTaskInstance format.
"""
now = timezone.utcnow()
zombies = []
if (now - self._last_zombie_query_time).total_seconds() \
> self._zombie_query_interval:
# to avoid circular imports
from airflow.jobs import LocalTaskJob as LJ
self.log.info("Finding 'running' jobs without a recent heartbeat")
TI = airflow.models.TaskInstance
limit_dttm = timezone.utcnow() - timedelta(
seconds=self._zombie_threshold_secs)
self.log.info("Failing jobs without heartbeat after %s", limit_dttm)

tis = (
session.query(TI)
.join(LJ, TI.job_id == LJ.id)
.filter(TI.state == State.RUNNING)
.filter(
or_(
LJ.state != State.RUNNING,
LJ.latest_heartbeat < limit_dttm,
)
).all()
)
self._last_zombie_query_time = timezone.utcnow()
for ti in tis:
sti = SimpleTaskInstance(ti)
self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s",
sti.dag_id, sti.task_id, sti.execution_date.isoformat())
zombies.append(sti)

return zombies

def max_runs_reached(self):
"""
:return: whether all file paths have been processed max_runs times
Expand Down
Loading

0 comments on commit 2bdb053

Please sign in to comment.