Skip to content

Commit

Permalink
Merge pull request steemit#207 from steemit/ws-pool-timing
Browse files Browse the repository at this point in the history
Add timing for ws pool
  • Loading branch information
john-g-g authored Aug 7, 2018
2 parents 1bceeae + 67cba6b commit 7a8fce5
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 6 deletions.
55 changes: 53 additions & 2 deletions jussi/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from funcy.decorators import decorator
from sanic import response
from sanic.exceptions import RequestTimeout
from sanic.exceptions import ServiceUnavailable
from sanic.exceptions import SanicException

from .typedefs import HTTPRequest
from .typedefs import HTTPResponse
from .typedefs import JrpcRequest
from .typedefs import JrpcResponse
from .typedefs import WebApp
from .async_stats import fmt_timings

logger = structlog.get_logger(__name__)

Expand All @@ -32,15 +34,25 @@ def setup_error_handlers(app: WebApp) -> WebApp:
# pylint: disable=unused-variable

@app.exception(RequestTimeout)
def handle_timeout_errors(request: HTTPRequest,
exception: SanicException) -> Optional[
def handle_request_timeout_errors(request: HTTPRequest,
exception: SanicException) -> Optional[
HTTPResponse]:
"""handles noisy request timeout errors"""
# pylint: disable=unused-argument
if not request:
return None
return RequestTimeoutError(http_request=request).to_sanic_response()

@app.exception(ServiceUnavailable)
def handle_response_timeout_errors(request: HTTPRequest,
exception: SanicException) -> Optional[
HTTPResponse]:
"""handles noisy request timeout errors"""
# pylint: disable=unused-argument
if not request:
return None
return ResponseTimeoutError(http_request=request).to_sanic_response()

# pylint: disable=unused-argument
@app.exception(JsonRpcError)
def handle_jsonrpc_error(request: HTTPRequest,
Expand Down Expand Up @@ -187,6 +199,30 @@ def to_dict(self) -> dict:

return base_error

def timings(self) -> Optional[dict]:
try:
if self.http_request.is_single_jrpc:
request_timings = fmt_timings(self.http_request.timings)
jsonrpc_timings = fmt_timings(self.http_request.jsonrpc.timings)
return {
'request_timings': request_timings,
'jsonrpc_timings': jsonrpc_timings
}
elif self.http_request.is_batch_jrpc:
request_timings = fmt_timings(self.http_request.timings)
jsonrpc_timings = []
for r in self.http_request.jsonrpc:
jsonrpc_timings.extend(fmt_timings(r.timings))
return {
'request_timings': request_timings,
'jsonrpc_timings': jsonrpc_timings
}
else:
return None

except Exception as e:
return None

def log(self) -> None:
if self.log_traceback and self.exception:
self.logger.error(self.format_message(), **self.to_dict(),
Expand Down Expand Up @@ -248,6 +284,21 @@ class RequestTimeoutError(JsonRpcError):
code = 1000
message = 'Request Timeout'

def to_dict(self):
data = super().to_dict()
try:
timings = self.timings()
if timings:
data.update(**timings)
except Exception as e:
logger.info('error adding timing data to RequestTimeoutError', e=e)
return data


class ResponseTimeoutError(JsonRpcError):
code = 1050
message = 'Response Timeout'


class UpstreamResponseError(JsonRpcError):
code = 1100
Expand Down
3 changes: 1 addition & 2 deletions jussi/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,8 @@ async def fetch_http(http_request: HTTPRequest,
json=upstream_request,
headers=jrpc_request.upstream_headers,
timeout=jrpc_request.upstream.timeout) as resp:
jrpc_request.timings.append((perf(), 'fetch_http.sent'))
jrpc_request.timings.append((perf(), 'fetch_http.response'))
upstream_response = await resp.json(encoding='utf-8', content_type=None)
jrpc_request.timings.append((perf(), 'fetch_http.response'))

except (concurrent.futures.TimeoutError,
asyncio.TimeoutError) as e:
Expand Down
2 changes: 1 addition & 1 deletion jussi/middlewares/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def get_response(request: HTTPRequest) -> None:
else:
request.timings.append((perf(), 'get_cached_response.exit'))
return
request.timings.append((perf(), 'get_cached_response.await'))
request.timings.append((perf(), 'get_cached_response.acquire'))
cached_response = await asyncio.wait_for(cached_response_future,
timeout=cache_read_timeout)
request.timings.append((perf(), 'get_cached_response.response'))
Expand Down
4 changes: 4 additions & 0 deletions jussi/request/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,7 @@ def jussi_request_id(self) -> str:
@property
def amzn_trace_id(self) -> str:
return self.headers.get('x-amzn-trace-id', '')

@property
def request_start_time(self) -> float:
return self.timings[0][0]
13 changes: 12 additions & 1 deletion jussi/ws/pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import asyncio

from time import perf_counter

import structlog
# pylint: disable=no-name-in-module
from websockets import WebSocketClientProtocol as WSConn
Expand Down Expand Up @@ -264,17 +266,26 @@ async def _async__init__(self):
async def _get_new_connection(self) -> WSConn:
# First connection attempt on this pool.
logger.debug('spawning new ws conn')
return await websockets_connect(self._connect_url, loop=self._loop,
start = perf_counter()
conn = await websockets_connect(self._connect_url, loop=self._loop,
**self._connect_kwargs)
elapsed = perf_counter() - start
logger.info('new ws conn', elapsed=elapsed)
return conn

async def acquire(self, timeout: int=None) -> PoolConnectionProxy:
async def _acquire_impl(timeout=None) -> PoolConnectionProxy:
start = perf_counter()
ch = await self._queue.get() # type: PoolConnectionHolder
self._queue.task_done()
try:
proxy = await ch.acquire() # type: # type: PoolConnectionProxy
elapsed = perf_counter() - start
if elapsed > 1:
logger.info('acquire ws conn', elapsed=elapsed)
except Exception:
self._queue.put_nowait(ch)

raise
else:
# Record the timeout, as we will apply it by default
Expand Down

0 comments on commit 7a8fce5

Please sign in to comment.