From 1aceb19a25febb67eb0ea63242a64314d226858f Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 10 Sep 2015 00:37:34 +0000 Subject: [PATCH] Making the scheduler more resilient --- airflow/jobs.py | 89 ++++++++++++++++++++++++----------------------- airflow/models.py | 2 +- 2 files changed, 47 insertions(+), 44 deletions(-) diff --git a/airflow/jobs.py b/airflow/jobs.py index bfaa992bde2b3..427230bc45c05 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -473,55 +473,58 @@ def signal_handler(signum, frame): executor.start() i = 0 while not self.num_runs or self.num_runs > i: - loop_start_dttm = datetime.now() try: - self.prioritize_queued(executor=executor, dagbag=dagbag) - except Exception as e: - logging.exception(e) + loop_start_dttm = datetime.now() + try: + self.prioritize_queued(executor=executor, dagbag=dagbag) + except Exception as e: + logging.exception(e) - i += 1 - try: - if i % self.refresh_dags_every == 0: - dagbag = models.DagBag(self.subdir, sync_to_db=True) + i += 1 + try: + if i % self.refresh_dags_every == 0: + dagbag = models.DagBag(self.subdir, sync_to_db=True) + else: + dagbag.collect_dags(only_if_updated=True) + except: + logging.error("Failed at reloading the dagbag") + if statsd: + statsd.incr('dag_refresh_error', 1, 1) + sleep(5) + + if dag_id: + dags = [dagbag.dags[dag_id]] else: - dagbag.collect_dags(only_if_updated=True) - except: - logging.error("Failed at reloading the dagbag") - if statsd: - statsd.incr('dag_refresh_error', 1, 1) - sleep(5) - - if dag_id: - dags = [dagbag.dags[dag_id]] - else: - dags = [ - dag for dag in dagbag.dags.values() if not dag.parent_dag] - paused_dag_ids = dagbag.paused_dags() - for dag in dags: - logging.debug("Scheduling {}".format(dag.dag_id)) - dag = dagbag.get_dag(dag.dag_id) - if not dag or (dag.dag_id in paused_dag_ids): - continue + dags = [ + dag for dag in dagbag.dags.values() if not dag.parent_dag] + paused_dag_ids = dagbag.paused_dags() + for dag in dags: + logging.debug("Scheduling {}".format(dag.dag_id)) + dag = dagbag.get_dag(dag.dag_id) + if not dag or (dag.dag_id in paused_dag_ids): + continue + try: + self.process_dag(dag, executor) + self.manage_slas(dag) + except Exception as e: + logging.exception(e) + logging.info( + "Done queuing tasks, calling the executor's heartbeat") + duration_sec = (datetime.now() - loop_start_dttm).total_seconds() + logging.info("Loop took: {} seconds".format(duration_sec)) try: - self.process_dag(dag, executor) - self.manage_slas(dag) + self.import_errors(dagbag) except Exception as e: logging.exception(e) - logging.info( - "Done queuing tasks, calling the executor's heartbeat") - duration_sec = (datetime.now() - loop_start_dttm).total_seconds() - logging.info("Loop took: {} seconds".format(duration_sec)) - try: - self.import_errors(dagbag) - except Exception as e: - logging.exception(e) - try: - # We really just want the scheduler to never ever stop. - executor.heartbeat() - self.heartbeat() - except Exception as e: - logging.exception(e) - logging.error("Tachycardia!") + try: + # We really just want the scheduler to never ever stop. + executor.heartbeat() + self.heartbeat() + except Exception as e: + logging.exception(e) + logging.error("Tachycardia!") + except Exception as deep_e: + logging.exception(deep_e) def heartbeat_callback(self): if statsd: diff --git a/airflow/models.py b/airflow/models.py index c774b5c1488ae..4162bcc4f9329 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -178,7 +178,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): m = imp.load_source(mod_name, filepath) except Exception as e: logging.error("Failed to import: " + filepath) - self.import_errors[filepath] = e + self.import_errors[filepath] = str(e) logging.exception(e) self.file_last_changed[filepath] = dttm return