Skip to content

Commit

Permalink
clean up references to old session
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowin committed Mar 22, 2016
1 parent bb78151 commit d5e7dd9
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ def pool_full(self, session):

return open_slots <= 0

@provide_session
def run(
self,
verbose=True,
Expand All @@ -966,7 +967,8 @@ def run(
mark_success=False, # Don't run the task, act as if it succeeded
test_mode=False, # Doesn't record success or failure in the DB
job_id=None,
pool=None,):
pool=None,
session=None):
"""
Runs the task instance.
"""
Expand All @@ -989,7 +991,7 @@ def run(
" on {self.end_date}".format(**locals())
)
elif not ignore_dependencies and \
not self.are_dependencies_met(session, verbose=True):
not self.are_dependencies_met(session=session, verbose=True):
logging.warning("Dependencies not met yet")
elif self.state == State.UP_FOR_RETRY and \
not self.ready_for_retry():
Expand Down Expand Up @@ -1024,7 +1026,6 @@ def run(
self.queued_dttm = datetime.now()
session.merge(self)
session.commit()
session.close()
logging.info("Queuing into pool {}".format(self.pool))
return
if not test_mode:
Expand All @@ -1034,7 +1035,6 @@ def run(
if not test_mode:
session.merge(self)
session.commit()
session.close()

# Closing all pooled connections to prevent
# "max number of connections reached"
Expand Down Expand Up @@ -1086,7 +1086,6 @@ def signal_handler(signum, frame):
raise

# Recording SUCCESS
session = settings.Session()
self.end_date = datetime.now()
self.set_duration()
self.state = State.SUCCESS
Expand All @@ -1104,7 +1103,6 @@ def signal_handler(signum, frame):
logging.exception(e3)

session.commit()
session.close()

def dry_run(self):
task = self.task
Expand Down

0 comments on commit d5e7dd9

Please sign in to comment.