Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
phuslu committed Sep 30, 2011
1 parent d63db04 commit 34ec61c
Showing 1 changed file with 90 additions and 33 deletions.
123 changes: 90 additions & 33 deletions redisclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,55 @@
from tornado.iostream import IOStream
from tornado.util import bytes_type

class RedisPError(Exception):
def __init__(self, message=None):
message = message or 'Unknown'
Exception.__init__(self, 'Redis %s' % message)

def encode(request):
'''print repr(encode(('SET', 'mykey', 123)))'''
assert type(request) is tuple
data = '*%d\r\n' % len(request) + ''.join(['$%d\r\n%s\r\n' % (len(str(x)), x) for x in request])
return data

def decode(data):
'''print decode('*4\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nhello\r\n$5\r\nworld\r\n')'''
assert type(data) is bytes_type
c = data[0]
if c == '+':
return True
elif c == '-':
raise RedisPError(data[1:].rstrip())
elif c == ':':
return int(data[1:])
elif c == '$':
if data[:3] == '$-1':
return None
else:
pos = data.find('\r\n')
number = int(data[1:pos])
return data[pos+2:pos+2+number]
elif c == '*':
if data[:3] == '*-1':
return None
else:
result = []
pos = data.find('\r\n')
number = int(data[1:pos])
pos1 = pos2 = pos + 2
while number:
pos2 = data.find('\r\n', pos1)
length = int(data[pos1+1:pos2])
element = data[pos2+2:pos2+2+length]
pos1 = pos2 + length + 4
result.append(element)
number -= 1
return result
else:
raise RedisPError('Unknown Redis bulk startswith %r', c)

class AsyncRedisClient(object):
'''https://github.com/d3vz3r0/tornado-redis/blob/master/redis/redis.py'''
'''http://ordinary.iteye.com/blog/1097456'''

stream_pool = {}

Expand Down Expand Up @@ -37,7 +84,12 @@ def fetch(self, request, callback):
if not self.stream._connected:
self.stream.connect(self.address, self._on_connect)
else:
self.stream.write(_request.raw, self._on_write)
data = encode(self._request)
self.stream.write(data, self._on_write)

def _execute_callback(self):
result = decode(self._data)
self._callback(result)

def _on_connect(self):
self.stream._connected = True
Expand All @@ -48,45 +100,50 @@ def _on_write(self):
self.stream.read_util(bytes_type('\r\n'), self._on_read_first_line)

def _on_read_first_line(self, data):
bulk = data[0]
if bulk == ':':
self._callback(RedisResponse(data))
elif bulk == '$':
self._data_line_number = int(data[1:])
self._data += data
self.stream.read_util(bytes_type('\r\n'), self._on_read_line)

def _on_read_line(self, data):
self._data_line_number -= 1
if self._data_line_number:
self._data += data
self.stream.read_util('\r\n', self._on_read_line)
c = data[0]
if c in '+-:':
self._data = data
self._execute_callback()
elif c == '$':
if data[:3] == '$-1':
self._data = data
self._execute_callback()
else:
self._data = data
length = int(data[1:])
self.stream.read_bytes(length+2, self._on_read_bulk_line)
elif c == '*':
if data[:3] == '*-1':
self._data = data
self._execute_callback()
else:
self._data = data
self._multibulk_number = int(data[1:])
self.stream.read_util('\r\n', self._on_read_multibulk_linehead)

def _on_read_bulk_line(self, data):
self._data += data
self._execute_callback()

def _on_read_multibulk_linehead(self, data):
self._data += data
length = int(data[1:])
self.stream.read_bytes(length+2, self.__on_read_multibulk_linebody)

def __on_read_multibulk_linebody(self, data):
self._data += data
self._multibulk_number -= 1
if self._multibulk_number:
self.stream.read_util('\r\n', self._on_read_multibulk_linehead)
else:
self._callback(RedisResponse(self._data))
self._execute_callback()

def close(self):
self.stream_set.add(self.stream)
self.stream = None
self.stream_set = None


class RedisRequest(object):

def __init__(self, data):
self.data = data

class RedisResponse(object):

def __init__(self, data):
self.data = data

class RedisPError(Exception):
def __init__(self, code, message=None, response=None):
self.code = code
message = message or httplib.responses.get(code, "Unknown")
self.response = response
Exception.__init__(self, "Redis %d: %s" % (self.code, message))

def main():
pass

Expand Down

0 comments on commit 34ec61c

Please sign in to comment.