Skip to content

Commit

Permalink
Merge branch 'process_dag'
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Jun 1, 2016
2 parents 2c79099 + b18c995 commit c2384cb
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 212 deletions.
174 changes: 90 additions & 84 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from past.builtins import basestring
from collections import defaultdict, Counter
from datetime import datetime
from itertools import product
import getpass
import logging
import socket
Expand All @@ -40,7 +39,9 @@
from airflow.utils.email import send_email
from airflow.utils.logging import LoggingMixin
from airflow.utils import asciiart
from airflow.settings import Stats

DagRun = models.DagRun
Base = models.Base
ID_LEN = models.ID_LEN
Stats = settings.Stats
Expand Down Expand Up @@ -401,11 +402,16 @@ def schedule_dag(self, dag):
dr.end_date = datetime.now()
session.commit()

qry = session.query(func.max(DagRun.execution_date)).filter_by(
dag_id = dag.dag_id).filter(
or_(DagRun.external_trigger == False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX+'%')))
# this query should be replace by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False,
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX+'%')
))
)
last_scheduled_run = qry.scalar()

# don't schedule @once again
Expand Down Expand Up @@ -464,7 +470,6 @@ def process_dag(self, dag, queue):
function takes a lock on the DAG and timestamps the last run
in ``last_scheduler_run``.
"""
TI = models.TaskInstance
DagModel = models.DagModel
session = settings.Session()

Expand All @@ -474,84 +479,78 @@ def process_dag(self, dag, queue):
executors.LocalExecutor, executors.SequentialExecutor):
pickle_id = dag.pickle(session).id

db_dag = session.query(DagModel).filter_by(dag_id=dag.dag_id).first()
# obtain db lock
db_dag = session.query(DagModel).filter_by(
dag_id=dag.dag_id
).with_for_update().one()

last_scheduler_run = db_dag.last_scheduler_run or datetime(2000, 1, 1)
secs_since_last = (
datetime.now() - last_scheduler_run).total_seconds()
# if db_dag.scheduler_lock or
secs_since_last = (datetime.now() - last_scheduler_run).total_seconds()

if secs_since_last < self.heartrate:
# release db lock
session.commit()
session.close()
return None
else:
# Taking a lock
db_dag.scheduler_lock = True
db_dag.last_scheduler_run = datetime.now()
session.commit()

active_runs = dag.get_active_runs()
# Release the db lock
# the assumption here is that process_dag will take less
# time than self.heartrate otherwise we might unlock too
# quickly and this should moved below, but that would increase
# the time the record is locked and is blocking for other calls.
db_dag.last_scheduler_run = datetime.now()
session.commit()

self.logger.info('Getting list of tasks to skip for active runs.')
skip_tis = set()
if active_runs:
qry = (
session.query(TI.task_id, TI.execution_date)
.filter(
TI.dag_id == dag.dag_id,
TI.execution_date.in_(active_runs),
TI.state.in_((State.RUNNING, State.SUCCESS, State.FAILED)),
)
)
skip_tis = {(ti[0], ti[1]) for ti in qry.all()}
# update the state of the previously active dag runs
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING)
active_dag_runs = []
for run in dag_runs:
# do not consider runs that are executed in the future
if run.execution_date > datetime.now():
continue

descartes = [obj for obj in product(dag.tasks, active_runs)]
could_not_run = set()
self.logger.info('Checking dependencies on {} tasks instances, minus {} '
'skippable ones'.format(len(descartes), len(skip_tis)))
# todo: run.task is transient but needs to be set
run.dag = dag
# todo: preferably the integrity check happens at dag collection time
run.verify_integrity()
run.update_state()
if run.state == State.RUNNING:
active_dag_runs.append(run)

for run in active_dag_runs:
tis = run.get_task_instances(session=session, state=(State.NONE,
State.UP_FOR_RETRY))

# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
# update_state above which has already checked these tasks
for ti in tis:
task = dag.get_task(ti.task_id)

for task, dttm in descartes:
if task.adhoc or (task.task_id, dttm) in skip_tis:
continue
ti = TI(task, dttm)
# fixme: ti.task is transient but needs to be set
ti.task = task

ti.refresh_from_db()
if ti.state in (
State.RUNNING, State.QUEUED, State.SUCCESS, State.FAILED):
continue
elif ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Queuing task: {}'.format(ti))
queue.put((ti.key, pickle_id))
elif ti.is_premature():
continue
else:
self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti))
could_not_run.add(ti)

# this type of deadlock happens when dagruns can't even start and so
# the TI's haven't been persisted to the database.
if len(could_not_run) == len(descartes) and len(could_not_run) > 0:
self.logger.error(
'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id))
(session
.query(models.DagRun)
.filter(
models.DagRun.dag_id == dag.dag_id,
models.DagRun.state == State.RUNNING,
models.DagRun.execution_date.in_(active_runs))
.update(
{models.DagRun.state: State.FAILED},
synchronize_session='fetch'))

# Releasing the lock
self.logger.debug("Unlocking DAG (scheduler_lock)")
db_dag = (
session.query(DagModel)
.filter(DagModel.dag_id == dag.dag_id)
.first()
)
db_dag.scheduler_lock = False
session.merge(db_dag)
session.commit()
# future: remove adhoc
if task.adhoc:
continue

if ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Queuing task: {}'.format(ti))

ti.refresh_from_db(session=session, lock_for_update=True)
# another scheduler could have picked this task
# todo: UP_FOR_RETRY still could create a race condition
if ti.state is State.SCHEDULED:
session.commit()
self.logger.debug("Task {} was picked up by another scheduler"
.format(ti))
continue
elif ti.state is State.NONE:
ti.state = State.SCHEDULED
session.merge(ti)

session.commit()
queue.put((ti.key, pickle_id))

session.close()

Expand Down Expand Up @@ -643,13 +642,13 @@ def prioritize_queued(self, session, executor, dagbag):

session.commit()

def _split_dags(self, dags, size):
def _split(self, items, size):
"""
This function splits a list of dags into chunks of int size.
_split_dags([1,2,3,4,5,6], 3) becomes [[1,2,3],[4,5,6]]
This function splits a list of items into chunks of int size.
_split([1,2,3,4,5,6], 3) becomes [[1,2,3],[4,5,6]]
"""
size = max(1, size)
return [dags[i:i + size] for i in range(0, len(dags), size)]
return [items[i:i + size] for i in range(0, len(items), size)]

def _do_dags(self, dagbag, dags, tis_out):
"""
Expand Down Expand Up @@ -713,7 +712,7 @@ def _execute(self):
format(multiprocessing.cpu_count(),
self.max_threads,
len(dags)))
dags = self._split_dags(dags, math.ceil(len(dags) / self.max_threads))
dags = self._split(dags, math.ceil(len(dags) / self.max_threads))
tis_q = multiprocessing.Queue()
jobs = [multiprocessing.Process(target=self._do_dags,
args=(dagbag, dags[i], tis_q))
Expand All @@ -738,6 +737,7 @@ def _execute(self):
"heartbeat")
duration_sec = (datetime.now() - loop_start_dttm).total_seconds()
self.logger.info("Loop took: {} seconds".format(duration_sec))
Stats.timing("scheduler_loop", duration_sec * 1000)
try:
self.import_errors(dagbag)
except Exception as e:
Expand Down Expand Up @@ -843,22 +843,22 @@ def _execute(self):
while tasks_to_run and not deadlocked:
not_ready.clear()
for key, ti in list(tasks_to_run.items()):

ti.refresh_from_db()
ti.refresh_from_db(session=session, lock_for_update=True)
ignore_depends_on_past = (
self.ignore_first_depends_on_past and
ti.execution_date == (start_date or ti.start_date))

# The task was already marked successful or skipped by a
# different Job. Don't rerun it.
if key not in started:
if ti.state == State.SUCCESS:
succeeded.add(key)
tasks_to_run.pop(key)
session.commit()
continue
elif ti.state == State.SKIPPED:
skipped.add(key)
tasks_to_run.pop(key)
session.commit()
continue

# Is the task runnable? -- then run it
Expand All @@ -867,6 +867,10 @@ def _execute(self):
ignore_depends_on_past=ignore_depends_on_past,
flag_upstream_failed=True):
self.logger.debug('Sending {} to executor'.format(ti))
if ti.state == State.NONE:
ti.state = State.SCHEDULED
session.merge(ti)
session.commit()
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
Expand All @@ -880,6 +884,8 @@ def _execute(self):
elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
not_ready.add(key)

session.commit()

self.heartbeat()
executor.heartbeat()

Expand Down Expand Up @@ -950,7 +956,7 @@ def _execute(self):

# executor reports success but task does not - this is weird
elif ti.state not in (
State.SUCCESS,
State.SCHEDULED,
State.QUEUED,
State.UP_FOR_RETRY):
self.logger.error(
Expand Down
Loading

0 comments on commit c2384cb

Please sign in to comment.