diff --git a/airflow/models.py b/airflow/models.py index 8fa711762b683..b07ab5975ff52 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -149,48 +149,52 @@ def get_dag(self, dag_id): """ Gets the DAG out of the dictionary, and refreshes it if expired """ + # If asking for a known subdag, we want to refresh the parent if dag_id in self.dags: dag = self.dags[dag_id] if dag.is_subdag: - orm_dag = DagModel.get_current(dag.parent_dag.dag_id) - else: - orm_dag = DagModel.get_current(dag_id) - if orm_dag and dag.last_loaded < ( - orm_dag.last_expired or datetime(2100, 1, 1)): - self.process_file( - filepath=orm_dag.fileloc, only_if_updated=False) - dag = self.dags[dag_id] - else: - orm_dag = DagModel.get_current(dag_id) - if orm_dag: - self.process_file(filepath=orm_dag.fileloc, - only_if_updated=False) - if dag_id in self.dags: - dag = self.dags[dag_id] - else: - dag = None - return dag + dag_id = dag.parent_dag.dag_id + + # If the dag is absent or expired + orm_dag = DagModel.get_current(dag_id) + found_dags = [] + if orm_dag and ( + dag_id not in self.dags or ( + dag.last_loaded < ( + orm_dag.last_expired or datetime(2100, 1, 1) + ) + )): + # Reprocessing source file + found_dags = self.process_file( + filepath=orm_dag.fileloc, only_if_updated=False) + + if dag_id in [dag.dag_id for dag in found_dags]: + return self.dags[dag_id] + elif dag_id in self.dags: + del self.dags[dag_id] def process_file(self, filepath, only_if_updated=True, safe_mode=True): """ Given a path to a python module, this method imports the module and look for dag objects within it. """ + found_dags = [] try: # This failed before in what may have been a git sync # race condition dttm = datetime.fromtimestamp(os.path.getmtime(filepath)) mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1]) mod_name = 'unusual_prefix_' + mod_name - except: - return + except Exception as e: + logging.exception(e) + return found_dags if safe_mode and os.path.isfile(filepath): # Skip file if no obvious references to airflow or DAG are found. with open(filepath, 'r') as f: content = f.read() if not all([s in content for s in ('DAG', 'airflow')]): - return + return found_dags if (not only_if_updated or filepath not in self.file_last_changed or @@ -212,8 +216,10 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): dag.full_filepath = filepath dag.is_subdag = False self.bag_dag(dag, parent_dag=dag, root_dag=dag) + found_dags.append(dag) self.file_last_changed[filepath] = dttm + return found_dags @provide_session def kill_zombies(self, session): diff --git a/airflow/www/views.py b/airflow/www/views.py index ffaaade5da2a5..d22137fdad74a 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -672,7 +672,8 @@ def pickle_info(self): dag_id = request.args.get('dag_id') dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values() for dag in dags: - d[dag.dag_id] = dag.pickle_info() + if not dag.is_subdag: + d[dag.dag_id] = dag.pickle_info() return wwwutils.json_response(d) @expose('/login', methods=['GET', 'POST']) diff --git a/requirements.txt b/requirements.txt index 66595e4885ea8..45127ac028f37 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,7 @@ dill filechunkio flake8 flask -flask-admin==1.2.0 +flask-admin flask-cache flask-login flower