Skip to content

Commit

Permalink
[AIRFLOW-842] do not query the DB with an empty IN clause
Browse files Browse the repository at this point in the history
This is done to silence warnings coming from
sqlachemy, e.g.:

sqlalchemy/sql/default_comparator.py:161:
SAWarning: The IN-predicate on
"dag_run.dag_id" was invoked with an empty
sequence. This results in a
contradiction, which nonetheless can be expensive
to evaluate. Consider
alternative strategies for improved performance.

Closes apache#2072 from imbaczek/bug842
  • Loading branch information
Marek Baczynski authored and jlowin committed Feb 13, 2017
1 parent 2ce7556 commit 485280a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
8 changes: 8 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3734,6 +3734,10 @@ def clean_dirty(dag_ids, session=None):
:param full_query: whether to check dag_runs for new drs not in dag_stats
:type full_query: bool
"""
# avoid querying with an empty IN clause
if not dag_ids:
return

dag_ids = set(dag_ids)

qry = (
Expand All @@ -3745,6 +3749,10 @@ def clean_dirty(dag_ids, session=None):
qry.delete(synchronize_session='fetch')
session.commit()

# avoid querying with an empty IN clause
if not dirty_ids:
return

qry = (
session.query(DagRun.dag_id, DagRun.state, func.count('*'))
.filter(DagRun.dag_id.in_(dirty_ids))
Expand Down
16 changes: 13 additions & 3 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,12 +978,18 @@ def test_dag_stats(self):
session.query(models.DagStat).delete()
session.commit()

with warnings.catch_warnings(record=True) as caught_warnings:
models.DagStat.clean_dirty([], session=session)
self.assertEqual([], caught_warnings)

run1 = self.dag_bash.create_dagrun(
run_id="run1",
execution_date=DEFAULT_DATE,
state=State.RUNNING)

models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
with warnings.catch_warnings(record=True) as caught_warnings:
models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
self.assertEqual([], caught_warnings)

qry = session.query(models.DagStat).all()

Expand All @@ -998,7 +1004,9 @@ def test_dag_stats(self):
execution_date=DEFAULT_DATE+timedelta(days=1),
state=State.RUNNING)

models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
with warnings.catch_warnings(record=True) as caught_warnings:
models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
self.assertEqual([], caught_warnings)

qry = session.query(models.DagStat).all()

Expand All @@ -1011,7 +1019,9 @@ def test_dag_stats(self):
session.query(models.DagRun).first().state = State.SUCCESS
session.commit()

models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
with warnings.catch_warnings(record=True) as caught_warnings:
models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
self.assertEqual([], caught_warnings)

qry = session.query(models.DagStat).filter(models.DagStat.state == State.SUCCESS).all()
self.assertEqual(1, len(qry))
Expand Down

0 comments on commit 485280a

Please sign in to comment.