Skip to content

Commit

Permalink
Massively speed up the query returned by TI.filter_for_tis (apache#11147
Browse files Browse the repository at this point in the history
)

The previous query generated SQL like this:

```
WHERE (task_id = ? AND dag_id = ? AND execution_date = ?) OR (task_id = ? AND dag_id = ? AND execution_date = ?)
```

Which is fine for one or maybe even 100 TIs, but when testing DAGs at
extreme size (over 21k tasks!) this query was taking for ever (162s on
Postgres, 172s on MySQL 5.7)

By changing this query to this

```
WHERE task_id IN (?,?) AND dag_id = ? AND execution_date = ?
```

the time is reduced to 1s! (1.03s on Postgres, 1.19s on MySQL)

Even on 100 tis the reduction is large, but the overall time is not
significant (0.01451s -> 0.00626s on Postgres).

Times included SQLA query construction time (but not time for calling
filter_for_tis. So a like-for-like comparison), not just DB query time:

```python
ipdb> start_filter_20k = time.monotonic(); result_filter_20k = session.query(TI).filter(tis_filter).all(); end_filter_20k = time.monotonic()
ipdb> end_filter_20k - start_filter_20k
172.30647455298458
ipdb> in_filter = TI.dag_id == self.dag_id, TI.execution_date == self.execution_date, TI.task_id.in_([o.task_id for o in old_states.keys()]);
ipdb> start_20k_custom = time.monotonic(); result_custom_20k = session.query(TI).filter(in_filter).all(); end_20k_custom = time.monotonic()
ipdb> end_20k_custom - start_20k_custom
1.1882996069907676
```

I have also removed the check that was ensuring everything was of the
same type (all TaskInstance or all TaskInstanceKey) as it felt needless
- both types have the three required fields, so the "duck-typing"
approach at runtime (crash if doesn't have the required property)+mypy
checks felt Good Enough.
  • Loading branch information
ashb authored Sep 25, 2020
1 parent b92c60a commit ee90807
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1811,20 +1811,34 @@ def filter_for_tis(
"""Returns SQLAlchemy filter to query selected task instances"""
if not tis:
return None
if all(isinstance(t, TaskInstanceKey) for t in tis):
filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
TaskInstance.task_id == tik.task_id,
TaskInstance.execution_date == tik.execution_date)
for tik in tis])
return or_(*filter_for_tis)
if all(isinstance(t, TaskInstance) for t in tis):
filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
TaskInstance.task_id == ti.task_id,
TaskInstance.execution_date == ti.execution_date)
for ti in tis])
return or_(*filter_for_tis)

raise TypeError("All elements must have the same type: `TaskInstance` or `TaskInstanceKey`.")

# DictKeys type, (what we often pass here from the scheduler) is not directly indexable :(
first = list(tis)[0]

dag_id = first.dag_id
execution_date = first.execution_date
first_task_id = first.task_id
# Common path optimisations: when all TIs are for the same dag_id and execution_date, or same dag_id
# and task_id -- this can be over 150x for huge numbers of TIs (20k+)
if all(t.dag_id == dag_id and t.execution_date == execution_date for t in tis):
return and_(
TaskInstance.dag_id == dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id.in_(t.task_id for t in tis),
)
if all(t.dag_id == dag_id and t.task_id == first_task_id for t in tis):
return and_(
TaskInstance.dag_id == dag_id,
TaskInstance.execution_date.in_(t.execution_date for t in tis),
TaskInstance.task_id == first_task_id,
)
return or_(
and_(
TaskInstance.dag_id == ti.dag_id,
TaskInstance.task_id == ti.task_id,
TaskInstance.execution_date == ti.execution_date,
) for ti in tis
)


# State of the task instance.
Expand Down

0 comments on commit ee90807

Please sign in to comment.