Skip to content

Commit

Permalink
[AIRFLOW-4797] Use same zombies in all DAG file processors
Browse files Browse the repository at this point in the history
  • Loading branch information
KevinYang21 committed Oct 14, 2019
1 parent 8979607 commit cb0dbe3
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 26 deletions.
1 change: 0 additions & 1 deletion airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from datetime import datetime

from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter
from sqlalchemy import or_

from airflow import settings
from airflow.configuration import conf
Expand Down
22 changes: 10 additions & 12 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import time
import zipfile
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections import namedtuple
from datetime import datetime, timedelta
from importlib import import_module
from typing import Iterable, NamedTuple, Optional

import psutil
from setproctitle import setproctitle
from sqlalchemy import or_
from tabulate import tabulate

# To avoid circular imports
Expand All @@ -50,7 +49,6 @@
from airflow.utils.helpers import reap_process_group
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from sqlalchemy import or_


class SimpleDag(BaseDag):
Expand Down Expand Up @@ -769,13 +767,14 @@ def __init__(self,
# Map from file path to stats about the file
self._file_stats = {} # type: dict(str, DagFileStat)

self._last_zombie_query_time = timezone.utcnow()
self._last_zombie_query_time = None
# Last time that the DAG dir was traversed to look for files
self.last_dag_dir_refresh_time = timezone.utcnow()
# Last time stats were printed
self.last_stat_print_time = timezone.datetime(2000, 1, 1)
# TODO: Remove magic number
self._zombie_query_interval = 10
self._zombies = []
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout

Expand Down Expand Up @@ -845,6 +844,7 @@ def start(self):
continue

self._refresh_dag_dir()
self._find_zombies()

simple_dags = self.heartbeat()
for simple_dag in simple_dags:
Expand Down Expand Up @@ -1240,13 +1240,11 @@ def heartbeat(self):

self._file_path_queue.extend(files_paths_to_queue)

zombies = self._find_zombies()

# Start more processors if we have enough slots and files to process
while (self._parallelism - len(self._processors) > 0 and
len(self._file_path_queue) > 0):
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path, zombies)
processor = self._processor_factory(file_path, self._zombies)
Stats.incr('dag_processing.processes')

processor.start()
Expand All @@ -1264,13 +1262,13 @@ def heartbeat(self):
@provide_session
def _find_zombies(self, session):
"""
Find zombie task instances, which are tasks haven't heartbeated for too long.
:return: Zombie task instances in SimpleTaskInstance format.
Find zombie task instances, which are tasks haven't heartbeated for too long
and update the current zombie list.
"""
now = timezone.utcnow()
zombies = []
if (now - self._last_zombie_query_time).total_seconds() \
> self._zombie_query_interval:
if not self._last_zombie_query_time or \
(now - self._last_zombie_query_time).total_seconds() > self._zombie_query_interval:
# to avoid circular imports
from airflow.jobs import LocalTaskJob as LJ
self.log.info("Finding 'running' jobs without a recent heartbeat")
Expand Down Expand Up @@ -1298,7 +1296,7 @@ def _find_zombies(self, session):
sti.dag_id, sti.task_id, sti.execution_date.isoformat())
zombies.append(sti)

return zombies
self._zombies = zombies

def _kill_timed_out_processors(self):
"""
Expand Down
7 changes: 2 additions & 5 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,22 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timezone
import inspect
import os
import shutil
import textwrap
import unittest
from datetime import datetime, timedelta, timezone
from datetime import datetime, timezone
from tempfile import NamedTemporaryFile, mkdtemp
from unittest.mock import ANY, patch

import airflow.example_dags
from airflow import models
from airflow.configuration import conf
from airflow.utils.dag_processing import SimpleTaskInstance
from airflow.jobs import LocalTaskJob as LJ
from airflow.models import DagBag, DagModel, TaskInstance as TI
from airflow.utils.dag_processing import SimpleTaskInstance
from airflow.utils.db import create_session
from airflow.utils.state import State
from airflow.utils.timezone import utcnow
from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER
from tests.test_utils.config import conf_vars

Expand Down
100 changes: 92 additions & 8 deletions tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@
# under the License.

import os
import pathlib
import sys
import tempfile
import unittest
from datetime import datetime, timedelta
from unittest import mock

import pathlib
from unittest.mock import (MagicMock, PropertyMock)
from unittest.mock import MagicMock, PropertyMock

from airflow.configuration import conf
from airflow.jobs import DagFileProcessor
from airflow.jobs import LocalTaskJob as LJ
from airflow.jobs import DagFileProcessor, LocalTaskJob as LJ
from airflow.models import DagBag, TaskInstance as TI
from airflow.utils import timezone
from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager, DagFileStat,
SimpleTaskInstance, correct_maybe_zipped)
from airflow.utils.dag_processing import (
DagFileProcessorAgent, DagFileProcessorManager, DagFileStat, SimpleTaskInstance, correct_maybe_zipped,
)
from airflow.utils.db import create_session
from airflow.utils.state import State
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs

TEST_DAG_FOLDER = os.path.join(
os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags')
Expand Down Expand Up @@ -134,6 +135,9 @@ def __exit__(self, *exc_info):


class TestDagFileProcessorManager(unittest.TestCase):
def setUp(self):
clear_db_runs()

def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
manager = DagFileProcessorManager(
dag_directory='directory',
Expand Down Expand Up @@ -203,7 +207,8 @@ def test_find_zombies(self):

manager._last_zombie_query_time = timezone.utcnow() - timedelta(
seconds=manager._zombie_threshold_secs + 1)
zombies = manager._find_zombies()
manager._find_zombies()
zombies = manager._zombies
self.assertEqual(1, len(zombies))
self.assertIsInstance(zombies[0], SimpleTaskInstance)
self.assertEqual(ti.dag_id, zombies[0].dag_id)
Expand All @@ -213,6 +218,85 @@ def test_find_zombies(self):
session.query(TI).delete()
session.query(LJ).delete()

def test_zombies_are_correctly_passed_to_dag_file_processor(self):
"""
Check that the same set of zombies are passed to the dag
file processors until the next zombie detection logic is invoked.
"""
with conf_vars({('scheduler', 'max_threads'): '1',
('core', 'load_examples'): 'False'}):
dagbag = DagBag(os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py'))
with create_session() as session:
session.query(LJ).delete()
dag = dagbag.get_dag('test_example_bash_operator')
task = dag.get_task(task_id='run_this_last')

ti = TI(task, DEFAULT_DATE, State.RUNNING)
lj = LJ(ti)
lj.state = State.SHUTDOWN
lj.id = 1
ti.job_id = lj.id

session.add(lj)
session.add(ti)
session.commit()
fake_zombies = [SimpleTaskInstance(ti)]

class FakeDagFIleProcessor(DagFileProcessor):
# This fake processor will return the zombies it received in constructor
# as its processing result w/o actually parsing anything.
def __init__(self, file_path, pickle_dags, dag_id_white_list, zombies):
super().__init__(file_path, pickle_dags, dag_id_white_list, zombies)

self._result = zombies, 0

def start(self):
pass

@property
def start_time(self):
return DEFAULT_DATE

@property
def pid(self):
return 1234

@property
def done(self):
return True

@property
def result(self):
return self._result

def processor_factory(file_path, zombies):
return FakeDagFIleProcessor(file_path,
False,
[],
zombies)

test_dag_path = os.path.join(TEST_DAG_FOLDER,
'test_example_bash_operator.py')
async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
processor_agent = DagFileProcessorAgent(test_dag_path,
[],
1,
processor_factory,
timedelta.max,
async_mode)
processor_agent.start()
parsing_result = []
if not async_mode:
processor_agent.heartbeat()
while not processor_agent.done:
if not async_mode:
processor_agent.wait_until_finished()
parsing_result.extend(processor_agent.harvest_simple_dags())

self.assertEqual(len(fake_zombies), len(parsing_result))
self.assertEqual(set([zombie.key for zombie in fake_zombies]),
set([result.key for result in parsing_result]))

@mock.patch("airflow.jobs.DagFileProcessor.pid", new_callable=PropertyMock)
@mock.patch("airflow.jobs.DagFileProcessor.kill")
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
Expand Down

0 comments on commit cb0dbe3

Please sign in to comment.