Skip to content

Commit

Permalink
Merge pull request steemit#198 from steemit/websocket-timeout-handling
Browse files Browse the repository at this point in the history
Terminate websocket connection on any error
  • Loading branch information
john-g-g authored Jul 18, 2018
2 parents 17276ec + 0e0ba99 commit 5d43dae
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
5 changes: 0 additions & 5 deletions jussi/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,6 @@ def to_dict(self) -> dict:
'jrpc_request_id': self.jrpc_request_id,
'jussi_request_id': self.jussi_request_id
}
if self.jsonrpc_request:
try:
base_error.update(self.jsonrpc_request.log_extra())
except Exception as e:
logger.warning('JussiInteralError jsonrpc_request serialization error', e=e)

try:
base_error.update(**self.kwargs)
Expand Down
15 changes: 14 additions & 1 deletion jussi/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,25 @@ async def fetch_ws(http_request: HTTPRequest,
except (concurrent.futures.TimeoutError,
concurrent.futures.CancelledError,
asyncio.TimeoutError) as e:
try:
conn.terminate()
except NameError:
pass
except Exception as e:
logger.error('error while closing connection', e=e)

raise RequestTimeoutError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
upstream_request=upstream_request,
tasks_count=len(asyncio.tasks.Task.all_tasks()))
except AssertionError as e:
try:
conn.terminate()
except NameError:
pass
except Exception as e:
logger.error('error while closing connection', e=e)
raise UpstreamResponseError(http_request=http_request,
jrpc_request=jrpc_request,
exception=e,
Expand All @@ -182,6 +194,8 @@ async def fetch_ws(http_request: HTTPRequest,
conn.terminate()
except NameError:
pass
except Exception as e:
logger.error('error while closing connection', e=e)
try:
response = upstream_response
except NameError:
Expand All @@ -200,7 +214,6 @@ async def fetch_http(http_request: HTTPRequest,
session = http_request.app.config.aiohttp['session']
upstream_request = jrpc_request.to_upstream_request(as_json=False)
try:

async with session.post(jrpc_request.upstream.url,
json=upstream_request,
headers=jrpc_request.upstream_headers,
Expand Down
21 changes: 11 additions & 10 deletions jussi/ws/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ def __getattr__(self, attr):
# Proxy all unresolved attributes to the wrapped Connection object.
return getattr(self._con, attr)

def send(self, *args, **kwargs):
def send(self, *args, **kwargs) -> None:
return self._con.send(*args, **kwargs)

def recv(self, *args, **kwargs):
def recv(self, *args, **kwargs) -> bytes:
return self._con.recv(*args, **kwargs)

def terminate(self) -> None:
self._holder.terminate()


class PoolConnectionHolder:
__slots__ = ('_con',
Expand Down Expand Up @@ -111,7 +114,7 @@ async def acquire(self) -> PoolConnectionProxy:
self._proxy = PoolConnectionProxy(self, self._con)
return self._proxy

async def release(self, timeout):
async def release(self, timeout: int=None):
if self._in_use is None:
raise ValueError(
'PoolConnectionHolder.release() called on '
Expand Down Expand Up @@ -175,11 +178,9 @@ def _release(self):

class Pool:
"""A connection pool.
Connection pool can be used to manage a set of connections to the database.
Connection pool can be used to manage a set of connections to an upstream.
Connections are first acquired from the pool, then used, and then released
back to the pool. Once a connection is released, it's reset to close all
open cursors and other resources *except* prepared statements.
Pools are created by calling :func:`~asyncpg.pool.create_pool`.
back to the pool.
"""

__slots__ = ('_queue',
Expand Down Expand Up @@ -292,7 +293,7 @@ async def _acquire_impl(timeout=None) -> PoolConnectionProxy:
_acquire_impl(), timeout=timeout, loop=self._loop)

async def release(self, connection: PoolConnectionProxy, *, timeout: int=None):
"""Release a database connection back to the pool.
"""Release a connection back to the pool.
"""
if connection._con is None:
# Already released, do nothing.
Expand All @@ -319,8 +320,8 @@ async def close(self):
:meth:`Pool.terminate() <pool.Pool.terminate>`.
It is advisable to use :func:`python:asyncio.wait_for` to set
a timeout.
.. versionchanged:: 0.16.0
``close()`` now waits until all pool connections are released
now waits until all pool connections are released
before closing them and the pool. Errors raised in ``close()``
will cause immediate pool termination.
"""
Expand Down
3 changes: 0 additions & 3 deletions tests/test_jussi_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
])
async def test_upstream_error_responses(mocker, mocked_app_test_cli, jsonrpc_request,
expected):

mocked_ws_conn, test_cli = mocked_app_test_cli
mocked_ws_conn.recv.return_value = json.dumps(expected)
response = await test_cli.post('/', json=jsonrpc_request, headers={'x-jussi-request-id': str(jsonrpc_request['id'])})
Expand All @@ -97,8 +96,6 @@ async def test_upstream_error_responses(mocker, mocked_app_test_cli, jsonrpc_req
async def test_content_encoding(mocker, mocked_app_test_cli, jsonrpc_request,
expected):
mocked_ws_conn, test_cli = mocked_app_test_cli

mocked_ws_conn, test_cli = mocked_app_test_cli
mocked_ws_conn.recv.return_value = ujson.dumps(
{'id': 1, 'jsonrpc': '2.0', 'result': 'ignore'}).encode()
response = await test_cli.post('/', json=jsonrpc_request, headers={'x-jussi-request-id': str(jsonrpc_request['id'])})
Expand Down

0 comments on commit 5d43dae

Please sign in to comment.