Skip to content

Commit

Permalink
Fixed dag view to not repeat edges
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Oct 28, 2014
1 parent c5c00c6 commit 0d56f36
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
8 changes: 0 additions & 8 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@
#engine = create_engine('mysql://airflow:airflow@localhost/airflow')
engine = create_engine('sqlite:///' + BASE_FOLDER + '/airflow.db' )
Session.configure(bind=engine)
HEADER = """\
.__ _____.__
_____ |__|_______/ ____\ | ______ _ __
\__ \ | \_ __ \ __\| | / _ \ \/ \/ /
/ __ \| || | \/| | | |_( <_> ) /
(____ /__||__| |__| |____/\____/ \/\_/
\/"""

CELERY_APP_NAME = "airflow.executors.celery_worker"
CELERY_BROKER = "amqp"
CELERY_RESULTS_BACKEND = "amqp://"
6 changes: 6 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,9 @@ def alchemy_to_dict(obj):
value = value.isoformat()
d[c.name] = value
return d

def readfile(filepath):
f = open(filepath)
content = f.read()
f.close()
return content
13 changes: 7 additions & 6 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,18 @@ def graph(self):
'value': {'label': task.task_id}
})

def get_downstream(task, edges):
def get_upstream(task):
for t in task.upstream_list:
edges.append({
edge = {
'u': t.task_id,
'v': task.task_id,
'value': {'label': ''}
})
get_downstream(t, edges)
}
if edge not in edges:
edges.append(edge)
get_upstream(t)

for t in dag.roots:
get_downstream(t, edges)
get_upstream(t)

dttm = request.args.get('execution_date')
if dttm:
Expand Down
8 changes: 4 additions & 4 deletions dags/examples/example1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

default_args = {
'owner': 'max',
'start_date': datetime(2014, 9, 1),
'start_date': datetime(2014, 10, 20),
'mysql_dbid': 'local_mysql',
}

dag = DAG(dag_id='example_1')

cmd = 'ls -l'
run_this_last = DummyOperator(
task_id='run_this_last',
task_id='run_this_last',
**default_args)
dag.add_task(run_this_last)

Expand All @@ -24,8 +24,8 @@
for i in range(5):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='sleep {{ 10 + macros.random() * 10 }}',
task_id='runme_'+i,
bash_command='sleep {{ 10 + macros.random() * 10 }}',
**default_args)
task.set_downstream(run_this)
dag.add_task(task)
Expand Down

0 comments on commit 0d56f36

Please sign in to comment.