Skip to content

Commit

Permalink
Merge pull request apache#848 from airbnb/del_dags
Browse files Browse the repository at this point in the history
Delete dags from dagbag
  • Loading branch information
mistercrunch committed Jan 8, 2016
2 parents 609b5f4 + 61188fb commit f74e690
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 23 deletions.
48 changes: 27 additions & 21 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,48 +149,52 @@ def get_dag(self, dag_id):
"""
Gets the DAG out of the dictionary, and refreshes it if expired
"""
# If asking for a known subdag, we want to refresh the parent
if dag_id in self.dags:
dag = self.dags[dag_id]
if dag.is_subdag:
orm_dag = DagModel.get_current(dag.parent_dag.dag_id)
else:
orm_dag = DagModel.get_current(dag_id)
if orm_dag and dag.last_loaded < (
orm_dag.last_expired or datetime(2100, 1, 1)):
self.process_file(
filepath=orm_dag.fileloc, only_if_updated=False)
dag = self.dags[dag_id]
else:
orm_dag = DagModel.get_current(dag_id)
if orm_dag:
self.process_file(filepath=orm_dag.fileloc,
only_if_updated=False)
if dag_id in self.dags:
dag = self.dags[dag_id]
else:
dag = None
return dag
dag_id = dag.parent_dag.dag_id

# If the dag is absent or expired
orm_dag = DagModel.get_current(dag_id)
found_dags = []
if orm_dag and (
dag_id not in self.dags or (
dag.last_loaded < (
orm_dag.last_expired or datetime(2100, 1, 1)
)
)):
# Reprocessing source file
found_dags = self.process_file(
filepath=orm_dag.fileloc, only_if_updated=False)

if dag_id in [dag.dag_id for dag in found_dags]:
return self.dags[dag_id]
elif dag_id in self.dags:
del self.dags[dag_id]

def process_file(self, filepath, only_if_updated=True, safe_mode=True):
"""
Given a path to a python module, this method imports the module and
look for dag objects within it.
"""
found_dags = []
try:
# This failed before in what may have been a git sync
# race condition
dttm = datetime.fromtimestamp(os.path.getmtime(filepath))
mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
mod_name = 'unusual_prefix_' + mod_name
except:
return
except Exception as e:
logging.exception(e)
return found_dags

if safe_mode and os.path.isfile(filepath):
# Skip file if no obvious references to airflow or DAG are found.
with open(filepath, 'r') as f:
content = f.read()
if not all([s in content for s in ('DAG', 'airflow')]):
return
return found_dags

if (not only_if_updated or
filepath not in self.file_last_changed or
Expand All @@ -212,8 +216,10 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
dag.full_filepath = filepath
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
found_dags.append(dag)

self.file_last_changed[filepath] = dttm
return found_dags

@provide_session
def kill_zombies(self, session):
Expand Down
3 changes: 2 additions & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ def pickle_info(self):
dag_id = request.args.get('dag_id')
dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values()
for dag in dags:
d[dag.dag_id] = dag.pickle_info()
if not dag.is_subdag:
d[dag.dag_id] = dag.pickle_info()
return wwwutils.json_response(d)

@expose('/login', methods=['GET', 'POST'])
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dill
filechunkio
flake8
flask
flask-admin==1.2.0
flask-admin
flask-cache
flask-login
flower
Expand Down

0 comments on commit f74e690

Please sign in to comment.