forked from apache/airflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsettings.py
72 lines (57 loc) · 2.32 KB
/
settings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
import os
import sys
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine
from airflow.configuration import conf
HEADER = """\
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
"""
BASE_LOG_URL = '/admin/airflow/log'
AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME'))
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
LOGGING_LEVEL = logging.INFO
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
engine_args = {}
if 'sqlite' not in SQL_ALCHEMY_CONN:
# Engine args not supported by sqlite
engine_args['pool_size'] = 50
engine_args['pool_recycle'] = 3600
engine = create_engine(
SQL_ALCHEMY_CONN, **engine_args)
Session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine))
# can't move this to configuration due to ConfigParser interpolation
LOG_FORMAT = (
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'
def policy(task_instance):
"""
This policy setting allows altering task instances right before they
are executed. It allows administrator to rewire some task parameters.
Note that the ``TaskInstance`` object has an attribute ``task`` pointing
to its related task object, that in turns has a reference to the DAG
object. So you can use the attributes of all of these to define your
policy.
To define policy, add a ``airflow_local_settings`` module
to your PYTHONPATH that defines this ``policy`` function. It receives
a ``TaskInstance`` object and can alter it where needed.
Here are a few examples of how this can be useful:
* You could enforce a specific queue (say the ``spark`` queue)
for tasks using the ``SparkOperator`` to make sure that these
task instances get wired to the right workers
* You could force all task instances running on an
``execution_date`` older than a week old to run in a ``backfill``
pool.
* ...
"""
pass
try:
from airflow_local_settings import *
logging.info("Loaded airflow_local_settings.")
except:
pass