Skip to content

Commit

Permalink
closes #8.
Browse files Browse the repository at this point in the history
implement execute and add locks.  fix locking race condition.  tests
  • Loading branch information
adgaudio committed Oct 14, 2014
1 parent b3b16ec commit 764be6b
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 88 deletions.
66 changes: 21 additions & 45 deletions scheduler/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,20 @@ def main(ns):
q=q, app_name=ns.app_name, job_id=ns.job_id, zk=zk)
return

parents_completed, consume_queue = ensure_parents_completed(
app_name=ns.app_name, job_id=ns.job_id, zk=zk)
parents_completed, consume_queue, parent_locks = \
zookeeper_tools.ensure_parents_completed(
app_name=ns.app_name, job_id=ns.job_id, zk=zk, timeout=ns.timeout)
if parents_completed is False:
if consume_queue:
q.consume()
else:
_send_to_back_of_queue(
q=q, app_name=ns.app_name, job_id=ns.job_id, zk=zk)
lock.release()
[l.release() for l in parent_locks]
return
else:
assert not parent_locks

log.info(
"Job starting!", extra=dict(app_name=ns.app_name, job_id=ns.job_id))
Expand Down Expand Up @@ -121,7 +125,7 @@ def _handle_manually_given_job_id(ns, zk):
log.critical(
msg, extra=dict(app_name=ns.app_name, job_id=ns.job_id))
raise UserWarning(msg)
lock = zookeeper_tools.obtain_lock(
lock = zookeeper_tools.obtain_execute_lock(
ns.app_name, ns.job_id, zk=zk, safe=False, raise_on_error=True,
timeout=ns.timeout)
zookeeper_tools.set_state(ns.app_name, ns.job_id, zk=zk, pending=True)
Expand Down Expand Up @@ -153,10 +157,11 @@ def get_lock_if_job_is_runnable(app_name, job_id, zk, timeout, lock):
if lock:
l = lock
else:
l = zookeeper_tools.obtain_lock(app_name, job_id, zk, timeout=timeout)
l = zookeeper_tools.obtain_execute_lock(
app_name, job_id, zk, timeout=timeout)
if l is False:
log.warn('Could not obtain lock for task most likely because'
' the job_id appears more than once in the queue',
log.warn('Could not obtain execute lock for task because'
' something is already processing this job_id',
extra=dict(app_name=app_name, job_id=job_id))
return False
return l
Expand All @@ -165,45 +170,16 @@ def get_lock_if_job_is_runnable(app_name, job_id, zk, timeout, lock):
def _send_to_back_of_queue(q, app_name, job_id, zk):
# this exists so un-runnable tasks don't hog the front of the queue
# and soak up resources
q.put(job_id)
q.consume()
log.info(
"Job sent to back of queue",
extra=dict(app_name=app_name, job_id=job_id))


def ensure_parents_completed(app_name, job_id, zk):
"""
Check that the parent tasks for this (app_name, job_id) pair have completed
If they haven't completed and aren't pending, maybe create the
parent task in its appropriate queue.
"""
parents_completed = True
consume_queue = False
for parent, pjob_id, dep_grp in dag_tools.get_parents(app_name,
job_id, True):
if not zookeeper_tools.check_state(
app_name=parent, job_id=pjob_id, zk=zk, completed=True):
log.info(
'Must wait for parent task to complete before executing'
' child task. Removing job from queue. It will get re-added'
' when parent tasks complete', extra=dict(
parent_app_name=parent, parent_job_id=pjob_id,
app_name=app_name, job_id=job_id))
if zookeeper_tools.maybe_add_subtask(parent, pjob_id, zk):
consume_queue = True
else:
parent_pending = zookeeper_tools.check_state(
parent, pjob_id, zk=zk, pending=True)
if parent_pending:
log.info(
('Parent is or was previously queued.'
' Removing from queue'), extra=dict(
parent_app_name=parent, parent_job_id=pjob_id,
app_name=app_name, job_id=job_id))
consume_queue = True
parents_completed = False
return parents_completed, consume_queue
try:
zookeeper_tools.readd_subtask(app_name, job_id, zk=zk, _force=True)
q.consume()
log.info(
"Job sent to back of queue",
extra=dict(app_name=app_name, job_id=job_id))
except exceptions.JobAlreadyQueued:
log.info(
"Job already queued. Cannot send to back of queue.",
extra=dict(app_name=app_name, job_id=job_id))


def _handle_failure(ns, zk, q, lock):
Expand Down
68 changes: 59 additions & 9 deletions scheduler/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,24 +636,58 @@ def test_readd_change_child_state_while_child_running():
raise nose.plugins.skip.SkipTest()


@with_setup
def test_child_running_while_parent_pending_but_not_executing():
enqueue(app1, job_id1)
enqueue(app2, job_id1)
parents_completed, consume_queue, parent_locks = \
zkt.ensure_parents_completed(app2, job_id1, zk=zk, timeout=1)
# ensure lock is obtained by ensure_parents_completed
validate_one_queued_executing_task(app1, job_id1)
validate_one_queued_task(app2, job_id1)
nose.tools.assert_equal(parents_completed, False)
# child should promise to remove itself from queue
nose.tools.assert_equal(consume_queue, True)
nose.tools.assert_equal(len(parent_locks), 1)


@with_setup
def test_child_running_while_parent_pending_and_executing():
enqueue(app1, job_id1)
enqueue(app2, job_id1)
lock = zkt.obtain_execute_lock(app1, job_id1, zk=zk)
assert lock
parents_completed, consume_queue, parent_locks = \
zkt.ensure_parents_completed(app2, job_id1, zk=zk, timeout=1)
validate_one_queued_executing_task(app1, job_id1)
validate_one_queued_task(app2, job_id1)
nose.tools.assert_equal(parents_completed, False)
# child should not promise to remove itself from queue
nose.tools.assert_equal(consume_queue, False)
nose.tools.assert_equal(parent_locks, set())


@with_setup
def test_race_condition_when_parent_queues_child():
# The parent queues the child and the child runs before the parent gets
# a chance to mark itself as completed
zkt.set_state(app1, job_id1, zk=zk, pending=True)
lock = zkt.obtain_execute_lock(app1, job_id1, zk=zk)
assert lock
zkt._maybe_queue_children(
parent_app_name=app1, parent_job_id=job_id1, zk=zk)
validate_one_queued_task(app2, job_id1)
validate_zero_queued_task(app1)

# should not complete child.
# should not complete child. should de-queue child
# should not queue parent.
# should exit gracefully
run_spark_code(app2)
validate_zero_queued_task(app1)
validate_zero_queued_task(app2)
validate_one_queued_task(app2, job_id1)

zkt.set_state(app1, job_id1, zk=zk, completed=True)
lock.release()
validate_one_completed_task(app1, job_id1)
validate_one_queued_task(app2, job_id1)

Expand Down Expand Up @@ -777,31 +811,44 @@ def validate_zero_completed_task(app_name):

def validate_one_failed_task(app_name, job_id):
status = get_zk_status(app_name, job_id)
nose.tools.assert_equal(status['num_locks'], 0)
nose.tools.assert_equal(status['num_execute_locks'], 0)
nose.tools.assert_equal(status['num_add_locks'], 0)
nose.tools.assert_equal(status['in_queue'], False)
# nose.tools.assert_equal(status['app_qsize'], 1)
nose.tools.assert_equal(status['state'], 'failed')


def validate_one_queued_executing_task(app_name, job_id):
status = get_zk_status(app_name, job_id)
nose.tools.assert_equal(status['num_execute_locks'], 1)
nose.tools.assert_equal(status['num_add_locks'], 0)
nose.tools.assert_equal(status['in_queue'], True)
nose.tools.assert_equal(status['app_qsize'], 1)
nose.tools.assert_equal(status['state'], 'pending')


def validate_one_queued_task(app_name, job_id):
status = get_zk_status(app_name, job_id)
nose.tools.assert_equal(status['num_locks'], 0)
nose.tools.assert_equal(status['num_execute_locks'], 0)
nose.tools.assert_equal(status['num_add_locks'], 0)
nose.tools.assert_equal(status['in_queue'], True)
nose.tools.assert_equal(status['app_qsize'], 1)
nose.tools.assert_equal(status['state'], 'pending')


def validate_one_completed_task(app_name, job_id):
status = get_zk_status(app_name, job_id)
nose.tools.assert_equal(status['num_locks'], 0)
nose.tools.assert_equal(status['num_execute_locks'], 0)
nose.tools.assert_equal(status['num_add_locks'], 0)
nose.tools.assert_equal(status['in_queue'], False)
nose.tools.assert_equal(status['app_qsize'], 0)
nose.tools.assert_equal(status['state'], 'completed')


def validate_one_skipped_task(app_name, job_id):
status = get_zk_status(app_name, job_id)
nose.tools.assert_equal(status['num_locks'], 0)
nose.tools.assert_equal(status['num_execute_locks'], 0)
nose.tools.assert_equal(status['num_add_locks'], 0)
nose.tools.assert_equal(status['in_queue'], False)
nose.tools.assert_equal(status['app_qsize'], 0)
nose.tools.assert_equal(status['state'], 'skipped')
Expand All @@ -825,11 +872,14 @@ def consume_queue(app_name, timeout=1):

def get_zk_status(app_name, job_id):
path = zkt._get_zookeeper_path(app_name, job_id)
lockpath = join(path, 'lock')
elockpath = join(path, 'execute_lock')
alockpath = join(path, 'add_lock')
entriespath = join(app_name, 'entries')
return {
'num_locks': len(zk.exists(lockpath)
and zk.get_children(lockpath) or []),
'num_add_locks': len(
zk.exists(alockpath) and zk.get_children(alockpath) or []),
'num_execute_locks': len(
zk.exists(elockpath) and zk.get_children(elockpath) or []),
'in_queue': (
any(zk.get(join(app_name, 'entries', x))[0] == job_id
for x in zk.get_children(entriespath))
Expand Down
Loading

0 comments on commit 764be6b

Please sign in to comment.