Skip to content

Commit

Permalink
[AIRFLOW-3478] Make sure that the session is closed (apache#4298)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored and kaxil committed Jan 8, 2019
1 parent 327860f commit 5d75028
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 272 deletions.
10 changes: 4 additions & 6 deletions airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

from sqlalchemy import or_

from airflow import models, settings
from airflow import models
from airflow.utils.db import provide_session
from airflow.exceptions import DagNotFound, DagFileExists


def delete_dag(dag_id, keep_records_in_log=True):
@provide_session
def delete_dag(dag_id, keep_records_in_log=True, session=None):
"""
:param dag_id: the dag_id of the DAG to delete
:type dag_id: str
Expand All @@ -34,8 +36,6 @@ def delete_dag(dag_id, keep_records_in_log=True):
The default value is True.
:type keep_records_in_log: bool
"""
session = settings.Session()

DM = models.DagModel
dag = session.query(DM).filter(DM.dag_id == dag_id).first()
if dag is None:
Expand All @@ -60,6 +60,4 @@ def delete_dag(dag_id, keep_records_in_log=True):
for m in models.DagRun, models.TaskFail, models.TaskInstance:
count += session.query(m).filter(m.dag_id == p, m.task_id == c).delete()

session.commit()

return count
22 changes: 8 additions & 14 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from airflow.jobs import BackfillJob
from airflow.models import DagRun, TaskInstance
from airflow.operators.subdag_operator import SubDagOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.state import State
Expand Down Expand Up @@ -54,8 +53,9 @@ def _create_dagruns(dag, execution_dates, state, run_id_template):
return drs


@provide_session
def set_state(task, execution_date, upstream=False, downstream=False,
future=False, past=False, state=State.SUCCESS, commit=False):
future=False, past=False, state=State.SUCCESS, commit=False, session=None):
"""
Set the state of a task instance and if needed its relatives. Can set state
for future tasks (calculated from execution_date) and retroactively
Expand All @@ -71,6 +71,7 @@ def set_state(task, execution_date, upstream=False, downstream=False,
:param past: Retroactively mark all tasks starting from start_date of the DAG
:param state: State to which the tasks need to be set
:param commit: Commit tasks to be altered to the database
:param session: database session
:return: list of tasks that have been created and updated
"""
assert timezone.is_localized(execution_date)
Expand Down Expand Up @@ -124,7 +125,6 @@ def set_state(task, execution_date, upstream=False, downstream=False,
# go through subdagoperators and create dag runs. We will only work
# within the scope of the subdag. We wont propagate to the parent dag,
# but we will propagate from parent to subdag.
session = Session()
dags = [dag]
sub_dag_ids = []
while len(dags) > 0:
Expand Down Expand Up @@ -180,18 +180,15 @@ def set_state(task, execution_date, upstream=False, downstream=False,
tis_altered += qry_sub_dag.with_for_update().all()
for ti in tis_altered:
ti.state = state
session.commit()
else:
tis_altered = qry_dag.all()
if len(sub_dag_ids) > 0:
tis_altered += qry_sub_dag.all()

session.expunge_all()
session.close()

return tis_altered


@provide_session
def _set_dag_run_state(dag_id, execution_date, state, session=None):
"""
Helper method that set dag run state in the DB.
Expand All @@ -211,12 +208,11 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None):
dr.end_date = None
else:
dr.end_date = timezone.utcnow()
session.commit()
session.merge(dr)


@provide_session
def set_dag_run_state_to_success(dag, execution_date, commit=False,
session=None):
def set_dag_run_state_to_success(dag, execution_date, commit=False, session=None):
"""
Set the dag run for a specific execution date and its task instances
to success.
Expand Down Expand Up @@ -248,8 +244,7 @@ def set_dag_run_state_to_success(dag, execution_date, commit=False,


@provide_session
def set_dag_run_state_to_failed(dag, execution_date, commit=False,
session=None):
def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
"""
Set the dag run for a specific execution date and its running task instances
to failed.
Expand Down Expand Up @@ -290,8 +285,7 @@ def set_dag_run_state_to_failed(dag, execution_date, commit=False,


@provide_session
def set_dag_run_state_to_running(dag, execution_date, commit=False,
session=None):
def set_dag_run_state_to_running(dag, execution_date, commit=False, session=None):
"""
Set the dag run for a specific execution date to running.
:param dag: the DAG of which to alter state
Expand Down
Loading

0 comments on commit 5d75028

Please sign in to comment.