Skip to content

Commit

Permalink
Remove processor_subdir (apache#45088)
Browse files Browse the repository at this point in the history
This is in preparation for supporting bundles; the subdir concept will
be superseded by separate local bundles. And things like the dag
processor will be able to operate on a subset of bundles.
  • Loading branch information
jedcunningham authored Dec 19, 2024
1 parent 7fe46e1 commit 776ebe4
Show file tree
Hide file tree
Showing 26 changed files with 1,922 additions and 2,138 deletions.
3 changes: 0 additions & 3 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ class CallbackRequest(BaseModel):
Base Class with information about the callback to be executed.
:param msg: Additional Message that can be used for logging
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
"""

full_filepath: str
"""File Path to use to run the callback"""
processor_subdir: str | None = None
"""Directory used by Dag Processor when parsed the dag"""
msg: str | None = None
"""Additional Message that can be used for logging to determine failure/zombie"""

Expand Down
19 changes: 5 additions & 14 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def _update_dag_owner_links(dag_owner_links: dict[str, str], dm: DagModel, *, se
)


def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session, processor_subdir: str | None):
def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session):
"""
Try to serialize the dag to the DB, but make a note of any errors.
Expand All @@ -186,7 +186,6 @@ def _serialize_dag_capturing_errors(dag: MaybeSerializedDAG, session: Session, p
dag,
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
session=session,
processor_subdir=processor_subdir,
)
if dag_was_updated:
_sync_dag_perms(dag, session=session)
Expand Down Expand Up @@ -234,18 +233,15 @@ def _update_dag_warnings(
session.merge(warning_to_add)


def _update_import_errors(
files_parsed: set[str], import_errors: dict[str, str], processor_subdir: str | None, session: Session
):
def _update_import_errors(files_parsed: set[str], import_errors: dict[str, str], session: Session):
from airflow.listeners.listener import get_listener_manager

# We can remove anything from files parsed in this batch that doesn't have an error. We need to remove old
# errors (i.e. from files that are removed) separately

session.execute(delete(ParseImportError).where(ParseImportError.filename.in_(list(files_parsed))))

query = select(ParseImportError.filename).where(ParseImportError.processor_subdir == processor_subdir)
existing_import_error_files = set(session.scalars(query))
existing_import_error_files = set(session.scalars(select(ParseImportError.filename)))

# Add the errors of the processed files
for filename, stacktrace in import_errors.items():
Expand All @@ -261,7 +257,6 @@ def _update_import_errors(
filename=filename,
timestamp=utcnow(),
stacktrace=stacktrace,
processor_subdir=processor_subdir,
)
)
# sending notification when a new dag import error occurs
Expand All @@ -272,7 +267,6 @@ def _update_import_errors(
def update_dag_parsing_results_in_db(
dags: Collection[MaybeSerializedDAG],
import_errors: dict[str, str],
processor_subdir: str | None,
warnings: set[DagWarning],
session: Session,
*,
Expand Down Expand Up @@ -308,11 +302,11 @@ def update_dag_parsing_results_in_db(
)
log.debug("Calling the DAG.bulk_sync_to_db method")
try:
DAG.bulk_write_to_db(dags, processor_subdir=processor_subdir, session=session)
DAG.bulk_write_to_db(dags, session=session)
# Write Serialized DAGs to DB, capturing errors
# Write Serialized DAGs to DB, capturing errors
for dag in dags:
serialize_errors.extend(_serialize_dag_capturing_errors(dag, session, processor_subdir))
serialize_errors.extend(_serialize_dag_capturing_errors(dag, session))
except OperationalError:
session.rollback()
raise
Expand All @@ -329,7 +323,6 @@ def update_dag_parsing_results_in_db(
_update_import_errors(
files_parsed=good_dag_filelocs,
import_errors=import_errors,
processor_subdir=processor_subdir,
session=session,
)
except Exception:
Expand Down Expand Up @@ -377,7 +370,6 @@ def update_dags(
self,
orm_dags: dict[str, DagModel],
*,
processor_subdir: str | None = None,
session: Session,
) -> None:
from airflow.configuration import conf
Expand Down Expand Up @@ -433,7 +425,6 @@ def update_dags(
dm.timetable_summary = dag.timetable.summary
dm.timetable_description = dag.timetable.description
dm.asset_expression = dag.timetable.asset_condition.as_expression()
dm.processor_subdir = processor_subdir

last_automated_run: DagRun | None = run_info.latest_runs.get(dag.dag_id)
if last_automated_run is None:
Expand Down
18 changes: 2 additions & 16 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,6 @@ def deactivate_stale_dags(
"""Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
query = select(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).where(DagModel.is_active)
if self.standalone_dag_processor:
query = query.where(DagModel.processor_subdir == dag_directory)
dags_parsed = session.execute(query)

for dag in dags_parsed:
Expand Down Expand Up @@ -595,13 +593,8 @@ def _fetch_callbacks(
self.log.debug("Fetching callbacks from the database.")

callback_queue: list[CallbackRequest] = []
dag_directory = self.get_dag_directory()
with prohibit_commit(session) as guard:
query = select(DbCallbackRequest)
if self.standalone_dag_processor:
query = query.where(
DbCallbackRequest.processor_subdir == dag_directory,
)
query = query.order_by(DbCallbackRequest.priority_weight.asc()).limit(self.max_callbacks_per_loop)
query = with_row_locks(query, of=DbCallbackRequest, session=session, skip_locked=True)
callbacks = session.scalars(query)
Expand Down Expand Up @@ -677,10 +670,7 @@ def _iter_dag_filelocs(fileloc: str) -> Iterator[str]:

dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}

DagModel.deactivate_deleted_dags(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagModel.deactivate_deleted_dags(dag_filelocs)

return True

Expand All @@ -700,12 +690,11 @@ def clear_nonexistent_import_errors(self, session=NEW_SESSION):
:param session: session for ORM operations
"""
self.log.debug("Removing old import errors")
query = delete(ParseImportError).where(ParseImportError.processor_subdir == self.get_dag_directory())
query = delete(ParseImportError)

if self._file_paths:
query = query.where(
ParseImportError.filename.notin_(self._file_paths),
ParseImportError.processor_subdir == self.get_dag_directory(),
)

session.execute(query.execution_options(synchronize_session="fetch"))
Expand Down Expand Up @@ -862,7 +851,6 @@ def _collect_results(self, session: Session = NEW_SESSION):
run_count=self._file_stats[path].run_count,
parsing_result=proc.parsing_result,
path=path,
processor_subdir=self.get_dag_directory(),
session=session,
)

Expand Down Expand Up @@ -1105,7 +1093,6 @@ def process_parse_results(
run_count: int,
path: str,
parsing_result: DagFileParsingResult | None,
processor_subdir: str | None,
session: Session,
) -> DagFileStat:
"""Take the parsing result and stats about the parser process and convert it into a DagFileState."""
Expand All @@ -1127,7 +1114,6 @@ def process_parse_results(
dags=parsing_result.serialized_dags,
import_errors=parsing_result.import_errors or {},
warnings=set(parsing_result.warnings or []),
processor_subdir=processor_subdir,
session=session,
)
stat.num_dags = len(parsing_result.serialized_dags)
Expand Down
11 changes: 4 additions & 7 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,6 @@ def process_executor_events(
full_filepath=ti.dag_model.fileloc,
ti=ti,
msg=msg,
processor_subdir=ti.dag_model.processor_subdir,
)
executor.send_callback(request)
else:
Expand Down Expand Up @@ -1707,7 +1706,6 @@ def _schedule_dag_run(
dag_id=dag.dag_id,
run_id=dag_run.run_id,
is_failure_callback=True,
processor_subdir=dag_model.processor_subdir,
msg="timed_out",
)

Expand Down Expand Up @@ -2066,11 +2064,11 @@ def _find_and_purge_zombies(self) -> None:
if zombies := self._find_zombies(session=session):
self._purge_zombies(zombies, session=session)

def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
def _find_zombies(self, *, session: Session) -> list[tuple[TI, str]]:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = session.execute(
select(TI, DM.fileloc, DM.processor_subdir)
select(TI, DM.fileloc)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(DM, TI.dag_id == DM.dag_id)
.where(
Expand All @@ -2083,12 +2081,11 @@ def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
return zombies

def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session: Session) -> None:
for ti, file_loc, processor_subdir in zombies:
def _purge_zombies(self, zombies: list[tuple[TI, str]], *, session: Session) -> None:
for ti, file_loc in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=processor_subdir,
ti=ti,
msg=str(zombie_message_details),
)
Expand Down
74 changes: 74 additions & 0 deletions airflow/migrations/versions/0053_3_0_0_remove_processor_subdir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Remove processor_subdir.
Revision ID: 5c9c0231baa2
Revises: 237cef8dfea1
Create Date: 2024-12-18 19:10:26.962464
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

revision = "5c9c0231baa2"
down_revision = "237cef8dfea1"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Remove processor_subdir."""
with op.batch_alter_table("callback_request", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")

with op.batch_alter_table("import_error", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")

with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
batch_op.drop_column("processor_subdir")


def downgrade():
"""Unapply Remove processor_subdir."""
with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)

with op.batch_alter_table("import_error", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)

with op.batch_alter_table("callback_request", schema=None) as batch_op:
batch_op.add_column(
sa.Column("processor_subdir", sa.VARCHAR(length=2000), autoincrement=False, nullable=True)
)
15 changes: 3 additions & 12 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,6 @@ def create_dagrun(
def bulk_write_to_db(
cls,
dags: Collection[MaybeSerializedDAG],
processor_subdir: str | None = None,
session: Session = NEW_SESSION,
):
"""
Expand All @@ -1851,7 +1850,7 @@ def bulk_write_to_db(
dag_op = DagModelOperation({dag.dag_id: dag for dag in dags}) # type: ignore[misc]

orm_dags = dag_op.add_dags(session=session)
dag_op.update_dags(orm_dags, processor_subdir=processor_subdir, session=session)
dag_op.update_dags(orm_dags, session=session)

asset_op = AssetModelOperation.collect(dag_op.dags)

Expand All @@ -1867,13 +1866,13 @@ def bulk_write_to_db(
session.flush()

@provide_session
def sync_to_db(self, processor_subdir: str | None = None, session=NEW_SESSION):
def sync_to_db(self, session=NEW_SESSION):
"""
Save attributes about this DAG to the DB.
:return: None
"""
self.bulk_write_to_db([self], processor_subdir=processor_subdir, session=session)
self.bulk_write_to_db([self], session=session)

def get_default_view(self):
"""Allow backward compatible jinja2 templates."""
Expand Down Expand Up @@ -2048,8 +2047,6 @@ class DagModel(Base):
# packaged DAG, it will point to the subpath of the DAG within the
# associated zip.
fileloc = Column(String(2000))
# The base directory used by Dag Processor that parsed this dag.
processor_subdir = Column(String(2000), nullable=True)
bundle_name = Column(StringID(), ForeignKey("dag_bundle.name"), nullable=True)
# The version of the bundle the last time the DAG was parsed
latest_bundle_version = Column(String(200), nullable=True)
Expand Down Expand Up @@ -2262,24 +2259,18 @@ def dag_display_name(self) -> str:
def deactivate_deleted_dags(
cls,
alive_dag_filelocs: Container[str],
processor_subdir: str | None,
session: Session = NEW_SESSION,
) -> None:
"""
Set ``is_active=False`` on the DAGs for which the DAG files have been removed.
:param alive_dag_filelocs: file paths of alive DAGs
:param processor_subdir: dag processor subdir
:param session: ORM Session
"""
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__)
dag_models = session.scalars(
select(cls).where(
cls.fileloc.is_not(None),
or_(
cls.processor_subdir.is_(None),
cls.processor_subdir == processor_subdir,
),
)
)

Expand Down
3 changes: 1 addition & 2 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,14 +626,13 @@ def dagbag_report(self):
return report

@provide_session
def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW_SESSION):
def sync_to_db(self, session: Session = NEW_SESSION):
"""Save attributes about list of DAG to the DB."""
from airflow.dag_processing.collection import update_dag_parsing_results_in_db

update_dag_parsing_results_in_db(
self.dags.values(), # type: ignore[arg-type] # We should create a proto for DAG|LazySerializedDAG
self.import_errors,
processor_subdir,
self.dag_warnings,
session=session,
)
Expand Down
Loading

0 comments on commit 776ebe4

Please sign in to comment.