Skip to content

Commit

Permalink
Making the scheduler more resilient
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Sep 10, 2015
1 parent 6c517af commit 1aceb19
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 44 deletions.
89 changes: 46 additions & 43 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,55 +473,58 @@ def signal_handler(signum, frame):
executor.start()
i = 0
while not self.num_runs or self.num_runs > i:
loop_start_dttm = datetime.now()
try:
self.prioritize_queued(executor=executor, dagbag=dagbag)
except Exception as e:
logging.exception(e)
loop_start_dttm = datetime.now()
try:
self.prioritize_queued(executor=executor, dagbag=dagbag)
except Exception as e:
logging.exception(e)

i += 1
try:
if i % self.refresh_dags_every == 0:
dagbag = models.DagBag(self.subdir, sync_to_db=True)
i += 1
try:
if i % self.refresh_dags_every == 0:
dagbag = models.DagBag(self.subdir, sync_to_db=True)
else:
dagbag.collect_dags(only_if_updated=True)
except:
logging.error("Failed at reloading the dagbag")
if statsd:
statsd.incr('dag_refresh_error', 1, 1)
sleep(5)

if dag_id:
dags = [dagbag.dags[dag_id]]
else:
dagbag.collect_dags(only_if_updated=True)
except:
logging.error("Failed at reloading the dagbag")
if statsd:
statsd.incr('dag_refresh_error', 1, 1)
sleep(5)

if dag_id:
dags = [dagbag.dags[dag_id]]
else:
dags = [
dag for dag in dagbag.dags.values() if not dag.parent_dag]
paused_dag_ids = dagbag.paused_dags()
for dag in dags:
logging.debug("Scheduling {}".format(dag.dag_id))
dag = dagbag.get_dag(dag.dag_id)
if not dag or (dag.dag_id in paused_dag_ids):
continue
dags = [
dag for dag in dagbag.dags.values() if not dag.parent_dag]
paused_dag_ids = dagbag.paused_dags()
for dag in dags:
logging.debug("Scheduling {}".format(dag.dag_id))
dag = dagbag.get_dag(dag.dag_id)
if not dag or (dag.dag_id in paused_dag_ids):
continue
try:
self.process_dag(dag, executor)
self.manage_slas(dag)
except Exception as e:
logging.exception(e)
logging.info(
"Done queuing tasks, calling the executor's heartbeat")
duration_sec = (datetime.now() - loop_start_dttm).total_seconds()
logging.info("Loop took: {} seconds".format(duration_sec))
try:
self.process_dag(dag, executor)
self.manage_slas(dag)
self.import_errors(dagbag)
except Exception as e:
logging.exception(e)
logging.info(
"Done queuing tasks, calling the executor's heartbeat")
duration_sec = (datetime.now() - loop_start_dttm).total_seconds()
logging.info("Loop took: {} seconds".format(duration_sec))
try:
self.import_errors(dagbag)
except Exception as e:
logging.exception(e)
try:
# We really just want the scheduler to never ever stop.
executor.heartbeat()
self.heartbeat()
except Exception as e:
logging.exception(e)
logging.error("Tachycardia!")
try:
# We really just want the scheduler to never ever stop.
executor.heartbeat()
self.heartbeat()
except Exception as e:
logging.exception(e)
logging.error("Tachycardia!")
except Exception as deep_e:
logging.exception(deep_e)

def heartbeat_callback(self):
if statsd:
Expand Down
2 changes: 1 addition & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
m = imp.load_source(mod_name, filepath)
except Exception as e:
logging.error("Failed to import: " + filepath)
self.import_errors[filepath] = e
self.import_errors[filepath] = str(e)
logging.exception(e)
self.file_last_changed[filepath] = dttm
return
Expand Down

0 comments on commit 1aceb19

Please sign in to comment.