Skip to content

Commit

Permalink
Add simplified cache client and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
john-g-g committed Jun 22, 2018
1 parent fd3d78c commit e09c7cc
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 166 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ python-rapidjson = "*"
requests = "*"
yapf = "*"
funcy = "*"
vprof = "*"

[packages]
aiodns = "*"
Expand Down
3 changes: 2 additions & 1 deletion jussi/cache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import structlog

from aredis import StrictRedis
from aredis.cache import Cache


from .cache_group import CacheGroup
from ..typedefs import WebApp
from .backends.redis import Cache

logger = structlog.get_logger(__name__)

Expand Down
1 change: 0 additions & 1 deletion jussi/cache/backends/max_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __init__(self, max_ttl: int = None, max_size: int=None):
self._keys = self._cache.keys()
self._values = self._cache.values()
self._items = self._cache.items()
self.client = self # hack to allow compat with aredis cache
self._max_ttl = max_ttl or MEMORY_CACHE_MAX_TTL
self._max_size = max_size or MEMORY_CACHE_MAX_SIZE

Expand Down
107 changes: 107 additions & 0 deletions jussi/cache/backends/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
from zlib import compress
from zlib import decompress
from typing import Dict
from typing import List
from typing import NoReturn
from typing import Optional
from typing import Tuple
from typing import TypeVar


from ujson import dumps
from ujson import loads

CacheTTLValue = TypeVar('CacheTTL', int, float, type(None))
CacheKey = str
CacheKeys = List[CacheKey]
CacheValue = TypeVar('CacheValue', int, float, str, dict)
CachePair = Tuple[CacheKey, CacheValue]
CachePairs = Dict[CacheKey, CacheValue]
CacheResultValue = TypeVar('CacheValue', int, float, str, dict)
CacheResult = Optional[CacheResultValue]
CacheResults = List[CacheResult]


class Cache:
"""cache provides basic function"""

def __init__(self, client):
self.client = client

def _pack(self, value) -> bytes:
return compress(dumps(value, ensure_ascii=False).encode('utf8'))

def _unpack(self, value: bytes) -> CacheResult:
if not value:
return None
return loads(decompress(value))

async def get(self, key: CacheKey) -> CacheResult:
res = await self.client.get(key)
if res:
return self._unpack(res)
return None

async def set(self, key: str, value, expire_time: CacheTTLValue=None) -> NoReturn:
value = self._pack(value)
await self.client.set(key, value, ex=expire_time)

async def set_many(self, data: CachePairs, expire_time: CacheTTLValue=None) -> NoReturn:
async with await self.client.pipeline() as pipeline:
for key, value in data.items():
value = self._pack(value)
await pipeline.set(key, value, expire_time)
return await pipeline.execute()

async def mget(self, keys: CacheKeys) -> CacheResults:
return [self._unpack(r) for r in await self.client.mget(keys)]

async def clear(self):
return await self.client.clear()

async def close(self):
self.client.connection_pool.disconnect()

async def delete(self, key):
await self.client.delete(key)


class AttrDict(dict):
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self


class MockClient:
def __init__(self, cache):
self.cache = cache
self.connection_pool = AttrDict()
self.connection_pool.disconnect = lambda: None

async def execute(self):
pass

async def set(self, key, value, ex: CacheTTLValue=None) -> NoReturn:
self.cache.sets(key, value, ex)

async def get(self, key) -> CacheResult:
return self.cache.gets(key)

async def mget(self, keys) -> CacheResults:
return self.cache.mgets(keys)

async def pipeline(self):
return self

async def clear(self):
self.cache.clears()

async def delete(self, key):
self.cache.deletes(key)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
pass
11 changes: 2 additions & 9 deletions jussi/cache/cache_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def mget(self, keys: CacheKeys) -> CacheResults:
missing = [
key for key, response in zip(
keys, results) if not response]
cache_results = await cache.client.mget(missing)
cache_results = await cache.mget(missing)
cache_iter = iter(cache_results)
results = [existing or next(cache_iter) for existing in results]
if all(results):
Expand Down Expand Up @@ -150,18 +150,11 @@ async def clear(self) -> NoReturn:

async def close(self) -> NoReturn:
for cache in self._all_caches:
cache.client.connection_pool.disconnect()
cache.close()

# jsonrpc related methods
#

async def get_jsonrpc_response(self,
request: JrpcRequest) -> Optional[JrpcResponse]:
if not isinstance(request, list):
return await self.get_single_jsonrpc_response(request)
else:
return await self.get_batch_jsonrpc_responses(request)

async def get_single_jsonrpc_response(self,
request: SingleJrpcRequest) -> Optional[SingleJrpcResponse]:
if request.upstream.ttl == TTL.NO_CACHE:
Expand Down
6 changes: 0 additions & 6 deletions jussi/cache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ def block_num_from_jsonrpc_response(
return None


def block_num_from_id(block_hash: str) -> int:
"""return the first 4 bytes (8 hex digits) of the block ID (the block_num)
"""
return int(str(block_hash)[:8], base=16)


def merge_cached_response(request: SingleJrpcRequest,
cached_response: CachedSingleResponse,
) -> Optional[SingleJrpcResponse]:
Expand Down
3 changes: 2 additions & 1 deletion jussi/middlewares/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-


from .jussi import initialize_jussi_request
from .jussi import finalize_jussi_response
from .limits import check_limits
from .caching import get_response
Expand All @@ -15,6 +15,7 @@ def setup_middlewares(app):
logger.info('setup_middlewares', when='before_server_start')

# request middleware
app.request_middleware.append(initialize_jussi_request)
app.request_middleware.append(init_stats)
app.request_middleware.append(check_limits)
app.request_middleware.append(get_response)
Expand Down
15 changes: 13 additions & 2 deletions jussi/middlewares/jussi.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
# -*- coding: utf-8 -*-
from typing import Optional
from reprlib import repr as _repr
from time import perf_counter as perf

import structlog

from ..errors import handle_middleware_exceptions
from ..typedefs import HTTPRequest
from ..typedefs import HTTPResponse
from ..errors import JsonRpcError

logger = structlog.get_logger('jussi')


@handle_middleware_exceptions
async def initialize_jussi_request(request: HTTPRequest) -> Optional[HTTPResponse]:
# parse jsonrpc
try:
request.jsonrpc
except JsonRpcError as e:
return e.to_sanic_response()
except Exception as e:
return JsonRpcError(http_request=request,
exception=e).to_sanic_response()


async def finalize_jussi_response(request: HTTPRequest,
response: HTTPResponse) -> None:
# pylint: disable=bare-except
Expand Down
6 changes: 1 addition & 5 deletions jussi/middlewares/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,14 @@


async def init_stats(request: HTTPRequest) -> None:

try:
statsd_client = getattr(request.app.config, 'statsd_client', None)
if not statsd_client:
return
_ = request.jsonrpc
# statsd_client.gauge('tasks',len(Task.all_tasks()))
if request.is_single_jrpc:
statsd_client.incr('jrpc.inflight')
elif request.is_batch_jrpc and statsd_client:
_ = [statsd_client.incr('jrpc.inflight') for r in range(len(request.jsonrpc))]

_ = [statsd_client.incr('jrpc.inflight') for r in request.jsonrpc]
except BaseException as e:
logger.warning('send_stats', e=e)

Expand Down
2 changes: 1 addition & 1 deletion jussi/request/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from jussi.empty import _empty
from jussi.request.jsonrpc import JSONRPCRequest
from jussi.request.jsonrpc import from_request as jsonrpc_from_request
from jussi.request.jsonrpc import from_http_request as jsonrpc_from_request

# pylint: enable=no-name-in-module

Expand Down
20 changes: 1 addition & 19 deletions jussi/request/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,6 @@ def upstream_id(self) -> int:
def translated(self) -> bool:
return self.original_request is not None

def log_extra(self, **kwargs) -> Optional[Dict[str, Any]]:
try:
base_extra = {
'x-amzn-trace-id': self.amzn_trace_id,
'jussi_request_id': self.jussi_request_id,
'jsonrpc_id': self.id,
'batch_index': self.batch_index,
'upstream_request_id': self.upstream_id,
'translated': self.translated,
**self.urn.to_dict(),
**self.upstream._asdict(),
}
base_extra.update(**kwargs)
return base_extra

except Exception:
return None

def __hash__(self) -> int:
return hash(self.urn)

Expand All @@ -133,7 +115,7 @@ def translate_to_appbase(request: SingleRawRequest, urn) -> dict:

# pylint: disable=no-member

def from_request(http_request, batch_index: int, request: SingleRawRequest):
def from_http_request(http_request, batch_index: int, request: SingleRawRequest):
from ..urn import from_request as urn_from_request
from ..upstream import Upstream

Expand Down
17 changes: 16 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import jussi.logging_config
import jussi.middlewares
import jussi.serve
from jussi.cache.backends.max_ttl import SimplerMaxTTLMemoryCache
from jussi.cache.backends.redis import Cache
from jussi.cache.backends.redis import MockClient
from jussi.urn import URN
from jussi.empty import _empty
from jussi.upstream import _Upstreams
from jussi.request.jsonrpc import from_request as jsonrpc_from_request
from jussi.request.jsonrpc import from_http_request as jsonrpc_from_request
from jussi.request.http import HTTPRequest


Expand Down Expand Up @@ -1681,6 +1684,11 @@ def __init__(self, *args, **kwargs):
self.__dict__ = self


def build_mocked_cache():
mock_client = MockClient(cache=SimplerMaxTTLMemoryCache())
return Cache(client=mock_client)


def make_request(headers: dict=None, body=None, app=None, method: str='POST',
url_bytes: bytes=b'/', upstreams=TEST_UPSTREAM_CONFIG) -> HTTPRequest:
headers = headers or {'x-amzn-trace-id': '123', 'x-jussi-request-id': '123'}
Expand All @@ -1702,6 +1710,13 @@ def upstreams():
yield copy.deepcopy(_Upstreams(TEST_UPSTREAM_CONFIG, validate=False))


@pytest.fixture(scope='session')
def translate_to_appbase_upstreams():
upstreams = copy.deepcopy(_Upstreams(TEST_UPSTREAM_CONFIG, validate=False))
upstreams[0]['translate_to_appbbase'] = True
yield upstreams


@pytest.fixture(scope='function')
def app(loop):
args = jussi.serve.parse_args(args=[])
Expand Down
12 changes: 10 additions & 2 deletions tests/profiling_tests/profile_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@
import asyncio
import cProfile
import uvloop

from jussi.ws.pool2 import Pool
from tests.conftest import AttrDict
from jussi.ws.pool import Pool
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

pr = cProfile.Profile()


def patch_pool(Pool):
MockedWSConn = AttrDict()
MockedWSConn.send = lambda x: None
MockedWSConn.recv = lambda x: None
Pool._get_new_connection = lambda s: MockedWSConn
return Pool


async def run(pool, count):
pr.enable()
for i in range(count):
Expand Down
Loading

0 comments on commit e09c7cc

Please sign in to comment.