diff --git a/TODO.md b/TODO.md index a505dab5defc9..67735d24453cc 100644 --- a/TODO.md +++ b/TODO.md @@ -28,6 +28,7 @@ TODO #### Backend * Callbacks +* Master auto dag refresh at time intervals * Set default args at the DAG level? * Prevent timezone chagne on import * Add decorator to timeout imports on master process [lib](https://github.com/pnpnpn/timeout-decorator) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 445b8449950f4..e470db371cc92 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -64,8 +64,8 @@ def run(args): utils.pessimistic_connection_handling() # Setting up logging - directory = conf.get('core', 'BASE_LOG_FOLDER') + \ - "/{args.dag_id}/{args.task_id}".format(args=args) + log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) + directory = log + "/{args.dag_id}/{args.task_id}".format(args=args) if not os.path.exists(directory): os.makedirs(directory) args.execution_date = dateutil.parser.parse(args.execution_date) @@ -230,14 +230,12 @@ def serve_logs(args): @flask_app.route('/log/') def serve_logs(filename): - conf.get('core', 'BASE_LOG_FOLDER') - print filename + log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) return flask.send_from_directory( - conf.get('core', 'BASE_LOG_FOLDER'), + log, filename, mimetype="application/json", as_attachment=False) - print(conf.get('core', 'BASE_LOG_FOLDER')) WORKER_LOG_SERVER_PORT = \ int(conf.get('celery', 'WORKER_LOG_SERVER_PORT')) flask_app.run( diff --git a/airflow/configuration.py b/airflow/configuration.py index b53252237cc41..98a42c92bf2ca 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -9,18 +9,20 @@ 'statsd_host': 'localhost', 'statsd_port': 8125, }, + 'core': { + 'authenticate': False, + 'unit_test_mode': False, + }, } DEFAULT_CONFIG = """\ [core] airflow_home = {AIRFLOW_HOME} -authenticate = False dags_folder = {AIRFLOW_HOME}/dags base_log_folder = {AIRFLOW_HOME}/logs base_url = http://localhost:8080 executor = SequentialExecutor sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db -unit_test_mode = False [server] web_server_host = 0.0.0.0 @@ -44,7 +46,6 @@ [misc] job_heartbeat_sec = 5 master_heartbeat_sec = 60 -id_len = 250 statsd_on = false statsd_host = localhost statsd_port = 8125 @@ -83,7 +84,6 @@ [misc] job_heartbeat_sec = 1 master_heartbeat_sec = 30 -id_len = 250 [statsd] statsd_on = false @@ -126,7 +126,7 @@ def mkdir_p(path): if 'AIRFLOW_HOME' not in os.environ: AIRFLOW_HOME = os.path.expanduser('~/airflow') else: - AIRFLOW_HOME = os.environ['AIRFLOW_HOME'] + AIRFLOW_HOME = os.path.expanduser(os.environ['AIRFLOW_HOME']) mkdir_p(AIRFLOW_HOME) diff --git a/airflow/jobs.py b/airflow/jobs.py index a69f67a7369ed..f672dd364c225 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -21,7 +21,7 @@ Base = models.Base -ID_LEN = conf.getint('misc', 'ID_LEN') +ID_LEN = models.ID_LEN # Setting up a statsd client if needed statsd = None diff --git a/airflow/models.py b/airflow/models.py index 30296bff514bc..a0dde5a7175d6 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -25,7 +25,7 @@ from airflow.utils import apply_defaults Base = declarative_base() -ID_LEN = conf.getint('misc', 'ID_LEN') +ID_LEN = 250 SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN') if 'mysql' in SQL_ALCHEMY_CONN: LongText = LONGTEXT @@ -326,8 +326,9 @@ def command( @property def log_filepath(self): iso = self.execution_date.isoformat() - return conf.get('core', 'BASE_LOG_FOLDER') + \ - "/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()) + log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) + return ( + "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals())) @property def log_url(self): diff --git a/airflow/www/app.py b/airflow/www/app.py index 1dd37e9581eb4..40630ee32c970 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -3,6 +3,7 @@ import dateutil.parser import json import logging +import os import re import socket import sys @@ -684,7 +685,8 @@ def rendered(self): @expose('/log') def log(self): - BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') + BASE_LOG_FOLDER = os.path.expanduser( + conf.get('core', 'BASE_LOG_FOLDER')) dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date')