Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async #1

Merged
merged 26 commits into from
Aug 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5173aca
把流量统计封装到send和recv中,下一步打算把send,recv改成asyncio版本试试
JaysonAlbert Nov 7, 2017
be8935d
Merge branch 'master' of https://github.com/rainx/pytdx
JaysonAlbert Nov 7, 2017
d415066
修复统计信息bug
JaysonAlbert Nov 7, 2017
ab5b7a1
send, recv bug
JaysonAlbert Nov 7, 2017
51fa319
Merge pull request #97 from JaysonAlbert/master
rainx Nov 8, 2017
f4a4e94
add reflect version to asynclize
rainx Nov 8, 2017
b4068ac
add reflaction interface
rainx Nov 8, 2017
43cdd94
remove useless ref
rainx Nov 8, 2017
54b087c
add connection pool for async, and fix bugs.
JaysonAlbert Jan 2, 2018
583f241
Merge pull request #123 from JaysonAlbert/async
rainx Jan 2, 2018
209a103
change asyncio.coroutine to async def
JaysonAlbert Jan 2, 2018
a826fb6
limit connection numbers, asyncio.sleep(0.1)
JaysonAlbert Jan 3, 2018
26e362c
Merge pull request #124 from JaysonAlbert/async
rainx Jan 4, 2018
645167f
setattr of object instead of class
JaysonAlbert Jan 5, 2018
008b5f1
Merge pull request #128 from JaysonAlbert/async
rainx Jan 5, 2018
e17e6b5
bugfix: get_event_loop() --> new_event_loop()
JaysonAlbert Jan 5, 2018
f82e2d0
Merge pull request #129 from JaysonAlbert/async
rainx Jan 8, 2018
2dde062
使用thread executor代理parseResponse
JaysonAlbert Jan 8, 2018
e95e5d6
Merge pull request #130 from JaysonAlbert/async
rainx Jan 9, 2018
71cdb1d
修复网络不稳定问题
JaysonAlbert Jan 18, 2018
7a809ec
减少内存开销
JaysonAlbert Jan 18, 2018
0f287b0
Merge branch 'master' into async
JaysonAlbert Jan 20, 2018
5984602
Merge pull request #140 from JaysonAlbert/async
rainx Jan 22, 2018
bd242b0
Merge branch 'master' into async
rainx Jan 22, 2018
7d59d9b
add utf-8 declare
rainx Jan 22, 2018
d63759a
Merge branch 'master' into async
rainx Jan 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions pytdx/async/async_base_socket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ async def connect(self):
def disconnect(self):
self.writer.close()

@asyncio.coroutine
def send(self, data, flags=None):
async def send(self, data, flags=None):
if not (self.reader and self.writer):
yield from self.connect()
await self.connect()
nsended = len(data)
self.writer.write(data)
# yield from self.writer.drain()
Expand All @@ -49,11 +48,10 @@ def send(self, data, flags=None):
self.send_pkg_bytes += nsended
return nsended

@asyncio.coroutine
def recv(self, buffersize, flags=None):
async def recv(self, buffersize, flags=None):
if not (self.reader and self.writer):
yield from self.connect()
head_buf = yield from self.reader.read(buffersize)
await self.connect()
head_buf = await self.reader.read(buffersize)
self.recv_pkg_num += 1
self.recv_pkg_bytes += buffersize
return head_buf
Expand Down
12 changes: 6 additions & 6 deletions pytdx/async/hq.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@

def exec_command(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
connection = self.pool.get_connection()
async def wrapper(self, *args, **kwargs):
connection = await self.pool.get_connection()

if not connection.connected:
yield from make_async_parser(SetupCmd1, connection).call_api()
await make_async_parser(SetupCmd1, connection).call_api()

yield from make_async_parser(SetupCmd2, connection).call_api()
await make_async_parser(SetupCmd2, connection).call_api()

yield from make_async_parser(SetupCmd3, connection).call_api()
await make_async_parser(SetupCmd3, connection).call_api()

data = yield from func(self, *args, **kwargs, connection=connection)
data = await func(self, *args, **kwargs, connection=connection)
self.pool.release(connection)
return data

Expand Down
19 changes: 10 additions & 9 deletions pytdx/async/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
import asyncio
from itertools import chain
from .async_base_socket_client import AsyncTrafficStatSocket

import pandas as pd
import os


class ConnectionPool(object):

def __init__(self, ip, port, max_connections=None, loop=None):
def __init__(self, ip, port, max_connections=300, loop=None):

self.pid = os.getpid()
self.max_connections = max_connections or 2 ** 31
self.max_connections = max_connections

self.loop = loop or asyncio.get_event_loop()
self.ip = ip
Expand All @@ -20,8 +20,12 @@ def __init__(self, ip, port, max_connections=None, loop=None):
self.created_connect = 0
self._in_use_connections = set()

def get_connection(self):
async def get_connection(self):
try:
if self.created_connect >= self.max_connections:
# if self.created_connect > self.max_connections:
while len(self._available_connections) == 0:
await asyncio.sleep(0.2)
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
Expand All @@ -30,9 +34,6 @@ def get_connection(self):
return connection

def make_connection(self):
if self.created_connect >= self.max_connections:
raise ConnectionError("Too many connections")

self.created_connect += 1
return AsyncTrafficStatSocket(self.ip, self.port, self.loop)

Expand All @@ -47,5 +48,5 @@ def disconnect(self):
for connection in all_conns:
connection.disconnect()

def run_until_complete(self,*args,**kwargs):
return self.loop.run_until_complete(*args,**kwargs)
def run_until_complete(self, *args, **kwargs):
return self.loop.run_until_complete(*args, **kwargs)
35 changes: 17 additions & 18 deletions pytdx/async/reflection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,27 @@ def make_async_parser(parser, connection):
:return:
"""

@asyncio.coroutine
def call_api(self):
async def call_api(self):
if self.lock:
with self.lock:
log.debug("sending thread lock api call")
result = yield from self._call_api()
result = await self._call_api()
else:
result = yield from self._call_api()
result = await self._call_api()
return result

@asyncio.coroutine
def _call_api(self):
async def _call_api(self):
if not self.send_pkg:
SendPkgNotReady("send pkg not ready")

yield from connection.send(self.send_pkg)
head_buf = yield from connection.recv(self.rsp_header_len)
await connection.send(self.send_pkg)
head_buf = await connection.recv(self.rsp_header_len)
if len(head_buf) == self.rsp_header_len:
_, _, _, zipsize, unzipsize = struct.unpack("<IIIHH", head_buf)
body_buf = bytearray()

while True:
buf = yield from connection.recv(zipsize)
buf = await connection.recv(zipsize)
len_buf = len(buf)
body_buf.extend(buf)
if not (buf) or len_buf == 0 or len(body_buf) == zipsize:
Expand Down Expand Up @@ -82,40 +80,41 @@ def _call_api(self):
from pytdx.hq import TdxHq_API
import pprint


def time_async():

pool = ConnectionPool(ip='101.227.73.20', port=7709)

def exec_command(pool,cmd):

async def exec_command(pool, cmd):
connection = pool.get_connection()

if not connection.connected:
yield from make_async_parser(SetupCmd1, connection).call_api()

yield from make_async_parser(SetupCmd2, connection).call_api()
await make_async_parser(SetupCmd1, connection).call_api()

yield from make_async_parser(SetupCmd3, connection).call_api()
await make_async_parser(SetupCmd2, connection).call_api()

await make_async_parser(SetupCmd3, connection).call_api()

async_cmd = make_async_parser(cmd, connection)

async_cmd.setParams(8, 0, '000001', 0, 80)

data = yield from async_cmd.call_api()
data = await async_cmd.call_api()
pool.release(connection)

return data

res = [exec_command(pool,GetSecurityBarsCmd) for i in range(100)]
res = [exec_command(pool, GetSecurityBarsCmd) for i in range(100)]
pool.run_until_complete(asyncio.wait(res))


def time_orig():
api = TdxHq_API()
api.connect(ip='218.108.98.244', port=7709)

for i in range(100):
api.get_security_bars(8, 0, '000001', 0, 80)


# print(timeit.timeit(time_async, number=1))
print(timeit.timeit(time_orig, number=1))
print(timeit.timeit(time_orig, number=1))