Skip to content

Commit

Permalink
D205 Support - Dag Processing (apache#32449)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi authored Jul 9, 2023
1 parent 156b85a commit ea420ee
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
25 changes: 15 additions & 10 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,10 @@ def start(self) -> None:
def run_single_parsing_loop(self) -> None:
"""
Should only be used when launched DAG file processor manager in sync mode.
Send agent heartbeat signal to the manager, requesting that it runs one
processing "loop".
Call wait_until_finished to ensure that any launched processors have
finished before continuing
Send agent heartbeat signal to the manager, requesting that it runs one processing "loop".
Call wait_until_finished to ensure that any launched processors have finished before continuing.
"""
if not self._parent_signal_conn or not self._process:
raise ValueError("Process not started.")
Expand Down Expand Up @@ -477,6 +476,7 @@ def _exit_gracefully(self, signum, frame):
def start(self):
"""
Use multiple processes to parse and generate tasks for the DAGs in parallel.
By processing them in separate processes, we can get parallelism and isolation
from potentially harmful user code.
"""
Expand Down Expand Up @@ -519,6 +519,7 @@ def deactivate_stale_dags(
):
"""
Detects DAGs which are no longer present in files.
Deactivate them and remove them in the serialized_dag table.
"""
to_deactivate = set()
Expand Down Expand Up @@ -888,6 +889,7 @@ def _log_file_processing_stats(self, known_file_paths):
def get_pid(self, file_path) -> int | None:
"""
Retrieve the PID of the process processing the given file or None if the file is not being processed.
:param file_path: the path to the file that's being processed.
"""
if file_path in self._processors:
Expand All @@ -905,6 +907,7 @@ def get_all_pids(self) -> list[int]:
def get_last_runtime(self, file_path) -> float | None:
"""
Retrieve the last processing time of a specific path.
:param file_path: the path to the file that was processed
:return: the runtime (in seconds) of the process of the last run, or
None if the file was never processed.
Expand All @@ -915,36 +918,37 @@ def get_last_runtime(self, file_path) -> float | None:
def get_last_dag_count(self, file_path) -> int | None:
"""
Retrieve the total DAG count at a specific path.
:param file_path: the path to the file that was processed
:return: the number of dags loaded from that file, or None if the file
was never processed.
:return: the number of dags loaded from that file, or None if the file was never processed.
"""
stat = self._file_stats.get(file_path)
return stat.num_dags if stat else None

def get_last_error_count(self, file_path) -> int | None:
"""
Retrieve the total number of errors from processing a specific path.
:param file_path: the path to the file that was processed
:return: the number of import errors from processing, or None if the file
was never processed.
:return: the number of import errors from processing, or None if the file was never processed.
"""
stat = self._file_stats.get(file_path)
return stat.import_errors if stat else None

def get_last_finish_time(self, file_path) -> datetime | None:
"""
Retrieve the last completion time for processing a specific path.
:param file_path: the path to the file that was processed
:return: the finish time of the process of the last run, or None if the
file was never processed.
:return: the finish time of the process of the last run, or None if the file was never processed.
"""
stat = self._file_stats.get(file_path)
return stat.last_finish_time if stat else None

def get_start_time(self, file_path) -> datetime | None:
"""
Retrieve the last start time for processing a specific path.
:param file_path: the path to the file that's being processed
:return: the start time of the process that's processing the
specified file or None if the file is not currently being processed.
Expand All @@ -956,6 +960,7 @@ def get_start_time(self, file_path) -> datetime | None:
def get_run_count(self, file_path) -> int:
"""
The number of times the given file has been parsed.
:param file_path: the path to the file that's being processed.
"""
stat = self._file_stats.get(file_path)
Expand Down
3 changes: 3 additions & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ def update_import_errors(
) -> None:
"""
Update any import errors to be displayed in the UI.
For the DAGs in the given DagBag, record any associated import errors and clears
errors for files that no longer have them. These are usually displayed through the
Airflow UI so that users know that there are issues parsing DAGs.
Expand Down Expand Up @@ -664,6 +665,7 @@ def check_pools(dag):
def update_dag_warnings(self, *, session: Session, dagbag: DagBag) -> None:
"""
Update any import warnings to be displayed in the UI.
For the DAGs in the given DagBag, record any associated configuration warnings and clear
warnings for files that no longer have them. These are usually displayed through the
Airflow UI so that users know that there are issues parsing DAGs.
Expand Down Expand Up @@ -691,6 +693,7 @@ def execute_callbacks(
) -> None:
"""
Execute on failure callbacks.
These objects can come from SchedulerJobRunner or from DagProcessorJobRunner.
:param dagbag: Dag Bag of dags
Expand Down

0 comments on commit ea420ee

Please sign in to comment.