Skip to content

Commit

Permalink
[AIRFLOW-214] Fix occasion of detached taskinstance
Browse files Browse the repository at this point in the history
For some reason occasionely taskinstanced could become
detached from the database session. Now it uses a fresh session
to ensure the taskinstances stay attached.
  • Loading branch information
bolkedebruin committed Jun 6, 2016
1 parent 89edb6f commit 1e48c2b
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ def process_dag(self, dag, queue):
session.commit()

# update the state of the previously active dag runs
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING)
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
active_dag_runs = []
for run in dag_runs:
# do not consider runs that are executed in the future
Expand All @@ -513,14 +513,15 @@ def process_dag(self, dag, queue):
# todo: run.task is transient but needs to be set
run.dag = dag
# todo: preferably the integrity check happens at dag collection time
run.verify_integrity()
run.update_state()
run.verify_integrity(session=session)
run.update_state(session=session)
if run.state == State.RUNNING:
active_dag_runs.append(run)

for run in active_dag_runs:
tis = run.get_task_instances(session=session, state=(State.NONE,
State.UP_FOR_RETRY))
# this needs a fresh session sometimes tis get detached
tis = run.get_task_instances(state=(State.NONE,
State.UP_FOR_RETRY))

# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
Expand Down

0 comments on commit 1e48c2b

Please sign in to comment.