Skip to content

Commit

Permalink
Support multiple DagProcessors parsing files from different locations. (
Browse files Browse the repository at this point in the history
apache#25935)

* Separate dag processors

* Introduce [scheduler]stalled_dags_update_timeout configuration option

* Remove DagProcessorDirectory class and pass dag_directory as parameter

* Rename dag_directory column to processor_subdir in CallbackRequests

Co-authored-by: Ash Berlin-Taylor <[email protected]>
  • Loading branch information
mhenc and ashb authored Sep 6, 2022
1 parent 8538a72 commit f878854
Show file tree
Hide file tree
Showing 24 changed files with 613 additions and 132 deletions.
28 changes: 23 additions & 5 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ class CallbackRequest:
:param full_filepath: File Path to use to run the callback
:param msg: Additional Message that can be used for logging
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
"""

def __init__(self, full_filepath: str, msg: Optional[str] = None):
def __init__(
self,
full_filepath: str,
processor_subdir: Optional[str] = None,
msg: Optional[str] = None,
):
self.full_filepath = full_filepath
self.processor_subdir = processor_subdir
self.msg = msg

def __eq__(self, other):
Expand Down Expand Up @@ -60,16 +67,18 @@ class TaskCallbackRequest(CallbackRequest):
:param simple_task_instance: Simplified Task Instance representation
:param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback
:param msg: Additional Message that can be used for logging to determine failure/zombie
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
"""

def __init__(
self,
full_filepath: str,
simple_task_instance: "SimpleTaskInstance",
is_failure_callback: Optional[bool] = True,
processor_subdir: Optional[str] = None,
msg: Optional[str] = None,
):
super().__init__(full_filepath=full_filepath, msg=msg)
super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
self.simple_task_instance = simple_task_instance
self.is_failure_callback = is_failure_callback

Expand All @@ -94,6 +103,7 @@ class DagCallbackRequest(CallbackRequest):
:param full_filepath: File Path to use to run the callback
:param dag_id: DAG ID
:param run_id: Run ID for the DagRun
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
:param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback
:param msg: Additional Message that can be used for logging
"""
Expand All @@ -103,10 +113,11 @@ def __init__(
full_filepath: str,
dag_id: str,
run_id: str,
processor_subdir: Optional[str],
is_failure_callback: Optional[bool] = True,
msg: Optional[str] = None,
):
super().__init__(full_filepath=full_filepath, msg=msg)
super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
self.dag_id = dag_id
self.run_id = run_id
self.is_failure_callback = is_failure_callback
Expand All @@ -118,8 +129,15 @@ class SlaCallbackRequest(CallbackRequest):
:param full_filepath: File Path to use to run the callback
:param dag_id: DAG ID
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
"""

def __init__(self, full_filepath: str, dag_id: str, msg: Optional[str] = None):
super().__init__(full_filepath, msg)
def __init__(
self,
full_filepath: str,
dag_id: str,
processor_subdir: Optional[str],
msg: Optional[str] = None,
):
super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg)
self.dag_id = dag_id
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2121,6 +2121,14 @@
type: integer
example: ~
default: "20"
- name: dag_stale_not_seen_duration
description: |
Only applicable if `[scheduler]standalone_dag_processor` is true.
Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
version_added: 2.4.0
type: integer
example: ~
default: "600"
- name: use_job_schedule
description: |
Turn off scheduler use of cron intervals by setting this to False.
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,10 @@ standalone_dag_processor = False
# in database. Contains maximum number of callbacks that are fetched during a single loop.
max_callbacks_per_loop = 20

# Only applicable if `[scheduler]standalone_dag_processor` is true.
# Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
dag_stale_not_seen_duration = 600

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True
Expand Down
71 changes: 47 additions & 24 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from datetime import datetime, timedelta
from importlib import import_module
from multiprocessing.connection import Connection as MultiprocessingConnection
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union, cast
from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional, Union, cast

from setproctitle import setproctitle
from sqlalchemy.orm import Session
Expand All @@ -57,9 +58,6 @@
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks

if TYPE_CHECKING:
import pathlib


class DagParsingStat(NamedTuple):
"""Information on processing progress"""
Expand Down Expand Up @@ -107,7 +105,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):

def __init__(
self,
dag_directory: str,
dag_directory: os.PathLike,
max_runs: int,
processor_timeout: timedelta,
dag_ids: Optional[List[str]],
Expand All @@ -116,7 +114,7 @@ def __init__(
):
super().__init__()
self._file_path_queue: List[str] = []
self._dag_directory: str = dag_directory
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
self._dag_ids = dag_ids
Expand Down Expand Up @@ -205,7 +203,7 @@ def wait_until_finished(self) -> None:

@staticmethod
def _run_processor_manager(
dag_directory: str,
dag_directory: os.PathLike,
max_runs: int,
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
Expand Down Expand Up @@ -368,7 +366,7 @@ class DagFileProcessorManager(LoggingMixin):

def __init__(
self,
dag_directory: Union[str, "pathlib.Path"],
dag_directory: os.PathLike,
max_runs: int,
processor_timeout: timedelta,
dag_ids: Optional[List[str]],
Expand All @@ -379,14 +377,14 @@ def __init__(
super().__init__()
self._file_paths: List[str] = []
self._file_path_queue: List[str] = []
self._dag_directory = dag_directory
self._max_runs = max_runs
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
self._pickle_dags = pickle_dags
self._dag_ids = dag_ids
self._async_mode = async_mode
self._parsing_start_time: Optional[int] = None
self._dag_directory = dag_directory

# Set the signal conn in to non-blocking mode, so that attempting to
# send when the buffer is full errors, rather than hangs for-ever
Expand All @@ -397,6 +395,7 @@ def __init__(
if self._async_mode and self._direct_scheduler_conn is not None:
os.set_blocking(self._direct_scheduler_conn.fileno(), False)

self.standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
self._parallelism = conf.getint('scheduler', 'parsing_processes')
if (
conf.get_mandatory_value('database', 'sql_alchemy_conn').startswith('sqlite')
Expand Down Expand Up @@ -498,11 +497,13 @@ def _deactivate_stale_dags(self, session=None):
fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
}
to_deactivate = set()
dags_parsed = (
session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time)
.filter(DagModel.is_active)
.all()
query = session.query(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).filter(
DagModel.is_active
)
if self.standalone_dag_processor:
query = query.filter(DagModel.processor_subdir == self.get_dag_directory())
dags_parsed = query.all()

for dag in dags_parsed:
# The largest valid difference between a DagFileStat's last_finished_time and a DAG's
# last_parsed_time is _processor_timeout. Longer than that indicates that the DAG is
Expand Down Expand Up @@ -540,7 +541,7 @@ def _run_parsing_loop(self):
self._refresh_dag_dir()
self.prepare_file_path_queue()
max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop")
standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")

if self._async_mode:
# If we're in async mode, we can start up straight away. If we're
# in sync mode we need to be told to start a "loop"
Expand Down Expand Up @@ -591,7 +592,7 @@ def _run_parsing_loop(self):
self.waitables.pop(sentinel)
self._processors.pop(processor.file_path)

if standalone_dag_processor:
if self.standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
self._deactivate_stale_dags()
DagWarning.purge_inactive_dag_warnings()
Expand Down Expand Up @@ -661,11 +662,12 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
"""Fetches callbacks from database and add them to the internal queue for execution."""
self.log.debug("Fetching callbacks from the database.")
with prohibit_commit(session) as guard:
query = (
session.query(DbCallbackRequest)
.order_by(DbCallbackRequest.priority_weight.asc())
.limit(max_callbacks)
)
query = session.query(DbCallbackRequest)
if self.standalone_dag_processor:
query = query.filter(
DbCallbackRequest.processor_subdir == self.get_dag_directory(),
)
query = query.order_by(DbCallbackRequest.priority_weight.asc()).limit(max_callbacks)
callbacks = with_row_locks(
query, of=DbCallbackRequest, session=session, **skip_locked(session=session)
).all()
Expand Down Expand Up @@ -743,7 +745,10 @@ def _refresh_dag_dir(self):
else:
dag_filelocs.append(fileloc)

SerializedDagModel.remove_deleted_dags(dag_filelocs)
SerializedDagModel.remove_deleted_dags(
alive_dag_filelocs=dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagModel.deactivate_deleted_dags(self._file_paths)

from airflow.models.dagcode import DagCode
Expand Down Expand Up @@ -913,6 +918,16 @@ def get_run_count(self, file_path):
stat = self._file_stats.get(file_path)
return stat.run_count if stat else 0

def get_dag_directory(self) -> str:
"""
Returns the dag_director as a string.
:rtype: str
"""
if isinstance(self._dag_directory, Path):
return str(self._dag_directory.resolve())
else:
return str(self._dag_directory)

def set_file_paths(self, new_file_paths):
"""
Update this with a new set of paths to DAG definition files.
Expand Down Expand Up @@ -986,10 +1001,14 @@ def collect_results(self) -> None:
self.log.debug("%s file paths queued for processing", len(self._file_path_queue))

@staticmethod
def _create_process(file_path, pickle_dags, dag_ids, callback_requests):
def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests):
"""Creates DagFileProcessorProcess instance."""
return DagFileProcessorProcess(
file_path=file_path, pickle_dags=pickle_dags, dag_ids=dag_ids, callback_requests=callback_requests
file_path=file_path,
pickle_dags=pickle_dags,
dag_ids=dag_ids,
dag_directory=dag_directory,
callback_requests=callback_requests,
)

def start_new_processes(self):
Expand All @@ -1002,7 +1021,11 @@ def start_new_processes(self):

callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._create_process(
file_path, self._pickle_dags, self._dag_ids, callback_to_execute_for_file
file_path,
self._pickle_dags,
self._dag_ids,
self.get_dag_directory(),
callback_to_execute_for_file,
)

del self._callback_to_execute[file_path]
Expand Down
15 changes: 12 additions & 3 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ def __init__(
file_path: str,
pickle_dags: bool,
dag_ids: Optional[List[str]],
dag_directory: str,
callback_requests: List[CallbackRequest],
):
super().__init__()
self._file_path = file_path
self._pickle_dags = pickle_dags
self._dag_ids = dag_ids
self._dag_directory = dag_directory
self._callback_requests = callback_requests

# The process that was launched to process the given .
Expand Down Expand Up @@ -111,6 +113,7 @@ def _run_file_processor(
pickle_dags: bool,
dag_ids: Optional[List[str]],
thread_name: str,
dag_directory: str,
callback_requests: List[CallbackRequest],
) -> None:
"""
Expand Down Expand Up @@ -154,7 +157,11 @@ def _run_file_processor(
threading.current_thread().name = thread_name

log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
dag_file_processor = DagFileProcessor(dag_ids=dag_ids, log=log)
dag_file_processor = DagFileProcessor(
dag_ids=dag_ids,
dag_directory=dag_directory,
log=log,
)
result: Tuple[int, int] = dag_file_processor.process_file(
file_path=file_path,
pickle_dags=pickle_dags,
Expand Down Expand Up @@ -188,6 +195,7 @@ def start(self) -> None:
self._pickle_dags,
self._dag_ids,
f"DagFileProcessor{self._instance_id}",
self._dag_directory,
self._callback_requests,
),
name=f"DagFileProcessor{self._instance_id}-Process",
Expand Down Expand Up @@ -356,10 +364,11 @@ class DagFileProcessor(LoggingMixin):

UNIT_TEST_MODE: bool = conf.getboolean('core', 'UNIT_TEST_MODE')

def __init__(self, dag_ids: Optional[List[str]], log: logging.Logger):
def __init__(self, dag_ids: Optional[List[str]], dag_directory: str, log: logging.Logger):
super().__init__()
self.dag_ids = dag_ids
self._log = log
self._dag_directory = dag_directory
self.dag_warnings: Set[Tuple[str, str]] = set()

@provide_session
Expand Down Expand Up @@ -766,7 +775,7 @@ def process_file(
session.commit()

# Save individual DAGs in the ORM
dagbag.sync_to_db(session)
dagbag.sync_to_db(processor_subdir=self._dag_directory, session=session)
session.commit()

if pickle_dags:
Expand Down
Loading

0 comments on commit f878854

Please sign in to comment.