diff --git a/airflow/settings.py b/airflow/settings.py index 5df04281de084..2296cc3199c27 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -35,14 +35,6 @@ #engine = create_engine('mysql://airflow:airflow@localhost/airflow') engine = create_engine('sqlite:///' + BASE_FOLDER + '/airflow.db' ) Session.configure(bind=engine) -HEADER = """\ - .__ _____.__ -_____ |__|_______/ ____\ | ______ _ __ -\__ \ | \_ __ \ __\| | / _ \ \/ \/ / - / __ \| || | \/| | | |_( <_> ) / -(____ /__||__| |__| |____/\____/ \/\_/ - \/""" - CELERY_APP_NAME = "airflow.executors.celery_worker" CELERY_BROKER = "amqp" CELERY_RESULTS_BACKEND = "amqp://" diff --git a/airflow/utils.py b/airflow/utils.py index 68c0a18a297d7..675edf2079891 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -74,3 +74,9 @@ def alchemy_to_dict(obj): value = value.isoformat() d[c.name] = value return d + +def readfile(filepath): + f = open(filepath) + content = f.read() + f.close() + return content diff --git a/airflow/www/app.py b/airflow/www/app.py index c388874d791f5..ed3fef6feb5cc 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -202,17 +202,18 @@ def graph(self): 'value': {'label': task.task_id} }) - def get_downstream(task, edges): + def get_upstream(task): for t in task.upstream_list: - edges.append({ + edge = { 'u': t.task_id, 'v': task.task_id, - 'value': {'label': ''} - }) - get_downstream(t, edges) + } + if edge not in edges: + edges.append(edge) + get_upstream(t) for t in dag.roots: - get_downstream(t, edges) + get_upstream(t) dttm = request.args.get('execution_date') if dttm: diff --git a/dags/examples/example1.py b/dags/examples/example1.py index 2ceddb5215e99..6a23778e8d600 100644 --- a/dags/examples/example1.py +++ b/dags/examples/example1.py @@ -4,7 +4,7 @@ default_args = { 'owner': 'max', - 'start_date': datetime(2014, 9, 1), + 'start_date': datetime(2014, 10, 20), 'mysql_dbid': 'local_mysql', } @@ -12,7 +12,7 @@ cmd = 'ls -l' run_this_last = DummyOperator( - task_id='run_this_last', + task_id='run_this_last', **default_args) dag.add_task(run_this_last) @@ -24,8 +24,8 @@ for i in range(5): i = str(i) task = BashOperator( - task_id='runme_'+i, - bash_command='sleep {{ 10 + macros.random() * 10 }}', + task_id='runme_'+i, + bash_command='sleep {{ 10 + macros.random() * 10 }}', **default_args) task.set_downstream(run_this) dag.add_task(task)