Skip to content

Commit

Permalink
finish porting LockingQueue.size() and .is_queued from majorityredis.…
Browse files Browse the repository at this point in the history
… tests passing
  • Loading branch information
adgaudio committed Nov 23, 2015
1 parent f2d27ca commit b9970dd
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions stolos/queue_backend/qbcli_majorityredis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from contextlib import contextmanager
from majorityredis import MajorityRedis
import random
import time
import threading
Expand Down Expand Up @@ -44,14 +43,6 @@ def raw_client():
return clients[0]


@util.cached
def raw_client_mr():
NS = get_NS()
return MajorityRedis(
[raw_client()], NS.qb_redis_n_servers or len(NS.qb_redis_hosts),
getset_history_prefix=NS.qb_redis_history_prefix, threadsafe=True)


class BaseStolosRedis(object):
"""
Base Class for Lock and LockingQueue that initializes a few things
Expand Down Expand Up @@ -297,15 +288,15 @@ class LockingQueue(BaseStolosRedis, BaseLockingQueue):
"""),

# returns whether an item is in queue or currently being processed.
# raises an error if already completed.
# returns boolean tuple of form: (is_taken, is_queued, is_completed)
# O(1)
lq_is_queued_h_k=dict(
keys=('Q', 'h_k'), args=(), script="""
local taken = redis.call("GET", KEYS[2])
if "completed" == taken then
return {err="already completed"}
elseif taken then return {true, false}
else return {false, false ~= redis.call("ZSCORE", KEYS[1], KEYS[2])} end
return {false, false, true}
elseif taken then return {true, false, false}
else return {false, false ~= redis.call("ZSCORE", KEYS[1], KEYS[2]), false} end
"""),

# returns whether an item is in queue or currently being processed.
Expand All @@ -317,13 +308,13 @@ class LockingQueue(BaseStolosRedis, BaseLockingQueue):
if string.sub(k, -string.len(KEYS[2])) == KEYS[2] then
local taken = redis.call("GET", k)
if taken then
if "completed" == taken then return {err="already completed"} end
return {true, false}
if "completed" == taken then return {false, false, true} end
return {true, false, false}
else
return {false, true} end
return {false, true, false} end
end
end
return {false, false}
return {false, false, false}
"""),
)

Expand Down Expand Up @@ -406,9 +397,25 @@ def size(self, queued=True, taken=True):
"""
if not queued and not taken:
raise AttributeError("either `taken` or `queued` must be True")
# TODO: port this over from majorityredis
return raw_client_mr().LockingQueue(self._path)\
.size(queued=queued, taken=taken)

if taken and queued:

n_queued_and_taken, _ = raw_client().evalsha(
self._SHAS['lq_qsize_fast'],
len(self.SCRIPTS['lq_qsize_fast']['keys']),
self._path, self._q_lookup)
return n_queued_and_taken
else:
nqueued, ntaken, _ = raw_client().evalsha(
self._SHAS['lq_qsize_slow'],
len(self.SCRIPTS['lq_qsize_slow']['keys']),
self._path, self._q_lookup)
if queued:
return nqueued
elif taken:
return ntaken
else:
raise Exception('code error - should never get here')

def is_queued(self, value):
"""
Expand All @@ -417,9 +424,17 @@ def is_queued(self, value):
Redis will not like this operation. Use sparingly with large queues.
"""
# TODO: port this over from majorityredis
return raw_client_mr().LockingQueue(self._path)\
.is_queued(item=value)
if value == self._item:
taken, queued, completed = raw_client().evalsha(
self._SHAS['lq_is_queued_h_k'],
len(self.SCRIPTS['lq_is_queued_h_k']['keys']),
self._path, self._h_k)
else:
taken, queued, completed = raw_client().evalsha(
self._SHAS['lq_is_queued_item'],
len(self.SCRIPTS['lq_is_queued_item']['keys']),
self._path, value)
return taken or queued


def _raise_err(x, y):
Expand Down

0 comments on commit b9970dd

Please sign in to comment.