diff --git a/stolos/__init__.py b/stolos/__init__.py index b063c57..ab209bd 100644 --- a/stolos/__init__.py +++ b/stolos/__init__.py @@ -5,3 +5,5 @@ import pkg_resources as _pkg_resources __version__ = _pkg_resources.get_distribution( _p.basename(_p.dirname(_p.abspath(__file__)))).version + +from stolos import api diff --git a/stolos/api.py b/stolos/api.py new file mode 100644 index 0000000..625422c --- /dev/null +++ b/stolos/api.py @@ -0,0 +1,18 @@ +from stolos.zookeeper_tools import ( + check_state, get_qsize, + maybe_add_subtask, readd_subtask, + get_zkclient +) +# linting +check_state, get_qsize, +maybe_add_subtask, readd_subtask, +get_zkclient + +from stolos.dag_tools import ( + get_tasks_config, build_dag, visualize_dag, + create_job_id, parse_job_id, + get_parents, get_children, +) +get_tasks_config, build_dag, visualize_dag, +create_job_id, parse_job_id, +get_parents, get_children, diff --git a/stolos/zookeeper_tools.py b/stolos/zookeeper_tools.py index 8d6c6da..5d09598 100644 --- a/stolos/zookeeper_tools.py +++ b/stolos/zookeeper_tools.py @@ -32,6 +32,35 @@ def get_zkclient(zookeeper_hosts=None): return zk +def get_qsize(app_name, zk, queued=True, taken=True): + """ + Find the number of jobs in the given app's queue + + `queued` - Include the entries in the queue that are not currently + being processed or otherwise locked + `taken` - Include the entries in the queue that are currently being + processed or are otherwise locked + """ + pq = join(app_name, 'entries') + pt = join(app_name, 'taken') + if queued: + entries = len(zk.get_children(pq)) + if taken: + return entries + else: + taken = len(zk.get_children(pt)) + return entries - taken + else: + if taken: + taken = len(zk.get_children(pt)) + return taken + else: + raise AttributeError( + "You asked for an impossible situation. Queue items are" + " waiting for a lock xor taken. You cannot have queue entries" + " that are both not locked and not waiting.") + + def _queue(app_name, job_id, zk, queue=True, priority=None): """ Calling code should obtain a lock first! If queue=False, do everything except queue (ie set state)"""