Skip to content

Commit

Permalink
fix DagPriorityParsingRequest unique constraint error when dataset al…
Browse files Browse the repository at this point in the history
…iases are resolved into new datasets (apache#41398)

* fix(datasets/manager): fix DagPriorityParsingRequest unique constraint error when dataset aliases are resolved into new datasets

this happens when dynamic task mapping is used

* refactor(dataset/manager): reword debug log

Co-authored-by: Ephraim Anierobi <[email protected]>

* refactor(dataset/manager): remove unnecessary logging

Co-authored-by: Ephraim Anierobi <[email protected]>

---------

Co-authored-by: Ephraim Anierobi <[email protected]>
  • Loading branch information
Lee-W and ephraimbuddy authored Aug 12, 2024
1 parent f25adf1 commit bf64cb6
Showing 1 changed file with 31 additions and 4 deletions.
35 changes: 31 additions & 4 deletions airflow/datasets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,8 @@ def register_dataset_change(

dags_to_reparse = dags_to_queue_from_dataset_alias - dags_to_queue_from_dataset
if dags_to_reparse:
session.add_all(
DagPriorityParsingRequest(fileloc=fileloc)
for fileloc in {dag.fileloc for dag in dags_to_reparse}
)
file_locs = {dag.fileloc for dag in dags_to_reparse}
cls._send_dag_priority_parsing_request(file_locs, session)
session.flush()

cls.notify_dataset_changed(dataset=dataset)
Expand Down Expand Up @@ -208,6 +206,35 @@ def _postgres_queue_dagruns(cls, dataset_id: int, dags_to_queue: set[DagModel],
stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
session.execute(stmt, values)

@classmethod
def _send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
if session.bind.dialect.name == "postgresql":
return cls._postgres_send_dag_priority_parsing_request(file_locs, session)
return cls._slow_path_send_dag_priority_parsing_request(file_locs, session)

@classmethod
def _slow_path_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str | None:
# Don't error whole transaction when a single DagPriorityParsingRequest item conflicts.
# https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
req = DagPriorityParsingRequest(fileloc=fileloc)
try:
with session.begin_nested():
session.merge(req)
except exc.IntegrityError:
cls.logger().debug("Skipping request %s, already present", req, exc_info=True)
return None
return req.fileloc

(_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in file_locs)

@classmethod
def _postgres_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
from sqlalchemy.dialects.postgresql import insert

stmt = insert(DagPriorityParsingRequest).on_conflict_do_nothing()
session.execute(stmt, {"fileloc": fileloc for fileloc in file_locs})


def resolve_dataset_manager() -> DatasetManager:
"""Retrieve the dataset manager."""
Expand Down

0 comments on commit bf64cb6

Please sign in to comment.