Skip to content

Commit

Permalink
Pessimistic pool connection handling for master and run
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Nov 30, 2014
1 parent 713f5c6 commit 6f91a67
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
3 changes: 3 additions & 0 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def backfill(args):

def run(args):

settings.pessimistic_connection_handling()

# Setting up logging
directory = getconf().get('core', 'BASE_LOG_FOLDER') + \
"/{args.dag_id}/{args.task_id}".format(args=args)
Expand Down Expand Up @@ -145,6 +147,7 @@ def webserver(args):


def master(args):
settings.pessimistic_connection_handling()

# Sleep time (seconds) between master runs
MASTER_SLEEP_INTERVAL = 60
Expand Down
19 changes: 18 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy.pool import Pool

from airflow.configuration import getconf


HEADER = """\
.__ _____.__
_____ |__|_______/ ____\ | ______ _ __
Expand All @@ -14,6 +16,21 @@
(____ /__||__| |__| |____/\____/ \/\_/
\/"""

def pessimistic_connection_handling():
@event.listens_for(Pool, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
'''
Disconnect Handling - Pessimistic, taken from:
http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html
'''
cursor = dbapi_connection.cursor()
try:
cursor.execute("SELECT 1")
except:
raise exc.DisconnectionError()
cursor.close()


BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
SQL_ALCHEMY_CONN = getconf().get('core', 'SQL_ALCHEMY_CONN')
if BASE_FOLDER not in sys.path:
Expand Down
2 changes: 1 addition & 1 deletion dags/examples/example1.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='sleep {{ 10 + macros.random() * 10 }}',
bash_command='sleep 20',
**default_args)
task.set_downstream(run_this)
dag.add_task(task)
Expand Down

0 comments on commit 6f91a67

Please sign in to comment.