Skip to content

Commit

Permalink
add stolos.api module
Browse files Browse the repository at this point in the history
  • Loading branch information
adgaudio committed Nov 5, 2014
1 parent 9b80dda commit 30d7a8e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
2 changes: 2 additions & 0 deletions stolos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
import pkg_resources as _pkg_resources
__version__ = _pkg_resources.get_distribution(
_p.basename(_p.dirname(_p.abspath(__file__)))).version

from stolos import api
18 changes: 18 additions & 0 deletions stolos/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from stolos.zookeeper_tools import (
check_state, get_qsize,
maybe_add_subtask, readd_subtask,
get_zkclient
)
# linting
check_state, get_qsize,
maybe_add_subtask, readd_subtask,
get_zkclient

from stolos.dag_tools import (
get_tasks_config, build_dag, visualize_dag,
create_job_id, parse_job_id,
get_parents, get_children,
)
get_tasks_config, build_dag, visualize_dag,
create_job_id, parse_job_id,
get_parents, get_children,
29 changes: 29 additions & 0 deletions stolos/zookeeper_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,35 @@ def get_zkclient(zookeeper_hosts=None):
return zk


def get_qsize(app_name, zk, queued=True, taken=True):
"""
Find the number of jobs in the given app's queue
`queued` - Include the entries in the queue that are not currently
being processed or otherwise locked
`taken` - Include the entries in the queue that are currently being
processed or are otherwise locked
"""
pq = join(app_name, 'entries')
pt = join(app_name, 'taken')
if queued:
entries = len(zk.get_children(pq))
if taken:
return entries
else:
taken = len(zk.get_children(pt))
return entries - taken
else:
if taken:
taken = len(zk.get_children(pt))
return taken
else:
raise AttributeError(
"You asked for an impossible situation. Queue items are"
" waiting for a lock xor taken. You cannot have queue entries"
" that are both not locked and not waiting.")


def _queue(app_name, job_id, zk, queue=True, priority=None):
""" Calling code should obtain a lock first!
If queue=False, do everything except queue (ie set state)"""
Expand Down

0 comments on commit 30d7a8e

Please sign in to comment.