forked from elastic/rally
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_connection.py
71 lines (63 loc) · 2.54 KB
/
async_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
import aiohttp
import elasticsearch
class RawClientResponse(aiohttp.ClientResponse):
"""
Returns the body as bytes object (instead of a str) to avoid decoding overhead.
"""
async def text(self, encoding=None, errors="strict"):
"""Read response payload and decode."""
if self._body is None:
await self.read()
return self._body
class AIOHttpConnection(elasticsearch.AIOHttpConnection):
def __init__(self,
host="localhost",
port=None,
http_auth=None,
use_ssl=False,
ssl_assert_fingerprint=None,
headers=None,
ssl_context=None,
http_compress=None,
cloud_id=None,
api_key=None,
opaque_id=None,
loop=None,
trace_config=None,
**kwargs,):
super().__init__(host=host,
port=port,
http_auth=http_auth,
use_ssl=use_ssl,
ssl_assert_fingerprint=ssl_assert_fingerprint,
# provided to the base class via `maxsize` to keep base class state consistent despite Rally
# calling the attribute differently.
maxsize=max(256, kwargs.get("max_connections", 0)),
headers=headers,
ssl_context=ssl_context,
http_compress=http_compress,
cloud_id=cloud_id,
api_key=api_key,
opaque_id=opaque_id,
loop=loop,
**kwargs,)
self._trace_configs = [trace_config] if trace_config else None
self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", False)
async def _create_aiohttp_session(self):
if self.loop is None:
self.loop = asyncio.get_running_loop()
self.session = aiohttp.ClientSession(
headers=self.headers,
auto_decompress=True,
loop=self.loop,
cookie_jar=aiohttp.DummyCookieJar(),
response_class=RawClientResponse,
connector=aiohttp.TCPConnector(
limit=self._limit,
use_dns_cache=True,
ssl_context=self._ssl_context,
enable_cleanup_closed=self._enable_cleanup_closed
),
trace_configs=self._trace_configs,
)