Skip to content

Commit

Permalink
attempting to fix and simplify a troublesome and confusing spot in co…
Browse files Browse the repository at this point in the history
…de that may have had a bug.
  • Loading branch information
adgaudio committed Jan 29, 2015
1 parent e231f56 commit 0232bdb
Showing 1 changed file with 47 additions and 43 deletions.
90 changes: 47 additions & 43 deletions stolos/zookeeper_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def maybe_add_subtask(app_name, job_id, zk=None, zookeeper_hosts=None,
zk = get_zkclient(zookeeper_hosts)
if zk.exists(_get_zookeeper_path(app_name, job_id)):
return False
# get a lock so we guarantee this task isn't being added twice
# get a lock so we guarantee this task isn't being added twice concurrently
lock = obtain_add_lock(app_name, job_id, zk, timeout=timeout, safe=False)
if not lock:
return False
Expand Down Expand Up @@ -416,51 +416,55 @@ def ensure_parents_completed(app_name, job_id, zk, timeout):
parent_locks = set()
for parent, pjob_id, dep_grp in dag_tools.get_parents(app_name,
job_id, True):
if not check_state(
app_name=parent, job_id=pjob_id, zk=zk, completed=True):
parents_completed = False
log.info(
'Must wait for parent task to complete before executing'
' child task.', extra=dict(
parent_app_name=parent, parent_job_id=pjob_id,
app_name=app_name, job_id=job_id))
if maybe_add_subtask(parent, pjob_id, zk):
msg_if_get_lock = (
"Waiting for parent to complete."
" Not unqueuing myself"
" because parent might have completed already.")
if _ensure_parents_completed_get_lock(
app_name, job_id, parent, pjob_id, zk,
timeout, parent_locks, msg_if_get_lock):
consume_queue = True
elif check_state(parent, pjob_id, zk=zk, pending=True):
msg_if_get_lock = (
'A parent is or was previously queued but is not'
' running. Will remove myself from queue since my'
' parent will eventually run and queue me.')
if _ensure_parents_completed_get_lock(
app_name, job_id, parent, pjob_id, zk,
timeout, parent_locks, msg_if_get_lock):
consume_queue = True
if check_state(app_name=parent, job_id=pjob_id, zk=zk, completed=True):
continue
parents_completed = False
log.info(
'My parent has not completed yet.',
extra=dict(
parent_app_name=parent, parent_job_id=pjob_id,
app_name=app_name, job_id=job_id))

# At this point, I need to be re-run
# The question at this point is whether to requeue myself or assume the
# parent will.

# Assume the default is I requeue myself. Sometimes, this might result
# in me cycling through the queue a couple times.

# If parent is running, it will be able to requeue me if I exit in
# time. If it doesn't, either I'll requeue myself by default or
# another parent will. So, do nothing in this case.

# if parent is not running, I should try to maybe_add_subtask it.
# - if can't add parent, then possibly something else is adding it, or
# it ran once and is waiting on one of my grandparents.
# - if I can maybe_add_subtask my parent, then it definitely wasn't
# running before.

# In both cases,
# I should try to unqueue myself if I can guarantee that the parent
# won't run by the time I unqueue myself. Otherwise, I should just
# default to assuming parent is running and requeue myself by default.

maybe_add_subtask(parent, pjob_id, zk)

elock = obtain_execute_lock(
parent, pjob_id, zk=zk,
raise_on_error=False, timeout=timeout)
if elock:
if not check_state(parent, pjob_id, zk=zk, pending=True):
elock.release() # race condition
else:
consume_queue = True
parent_locks.add(elock)
log.info(
"I will unqueue myself with the expectation that"
" my parent will requeue me", extra=dict(
app_name=app_name, job_id=job_id))
return parents_completed, consume_queue, parent_locks


def _ensure_parents_completed_get_lock(app_name, job_id, parent, pjob_id, zk,
timeout, parent_locks, msg_if_get_lock):
"""If we can obtain an execute lock on the given parent, then we can
guarantee that removing the child from its queue is safe"""
elock = obtain_execute_lock(
parent, pjob_id, zk=zk,
raise_on_error=False, timeout=timeout)
if elock:
parent_locks.add(elock)
log.info(msg_if_get_lock, extra=dict(
parent_app_name=parent, parent_job_id=pjob_id,
app_name=app_name, job_id=job_id))
return True
return False


def _recursively_reset_child_task_state(parent_app_name, job_id, zk):
log.debug(
"recursively setting all descendant tasks to 'pending' and "
Expand Down

0 comments on commit 0232bdb

Please sign in to comment.