From 5173aca41366142ae3117fe16f680c5bb5a8ede5 Mon Sep 17 00:00:00 2001 From: JaysonAlbert Date: Tue, 7 Nov 2017 12:59:33 +0800 Subject: [PATCH 01/15] =?UTF-8?q?=E6=8A=8A=E6=B5=81=E9=87=8F=E7=BB=9F?= =?UTF-8?q?=E8=AE=A1=E5=B0=81=E8=A3=85=E5=88=B0send=E5=92=8Crecv=E4=B8=AD?= =?UTF-8?q?=EF=BC=8C=E4=B8=8B=E4=B8=80=E6=AD=A5=E6=89=93=E7=AE=97=E6=8A=8A?= =?UTF-8?q?send=EF=BC=8Crecv=E6=94=B9=E6=88=90asyncio=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E8=AF=95=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytdx/base_socket_client.py | 38 ++++++++++++++++++++++++++----------- pytdx/parser/base.py | 12 ------------ 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/pytdx/base_socket_client.py b/pytdx/base_socket_client.py index de9cf3c..13bb5d1 100644 --- a/pytdx/base_socket_client.py +++ b/pytdx/base_socket_client.py @@ -16,18 +16,16 @@ from pytdx.log import DEBUG, log from pytdx.errors import TdxConnectionError, TdxFunctionCallError -import threading,datetime +import threading, datetime import time from pytdx.heartbeat import HqHeartBeatThread import functools from pytdx.parser.raw_parser import RawParser - CONNECT_TIMEOUT = 5.000 RECV_HEADER_LEN = 0x10 DEFAULT_HEARTBEAT_INTERVAL = 10.0 - """ In [7]: 0x7e Out[7]: 126 @@ -44,6 +42,8 @@ In [8]: (len(body)-2)/126 Out[8]: 64.0 """ + + def update_last_ack_time(func): @functools.wraps(func) def wrapper(self, *args, **kw): @@ -81,6 +81,7 @@ def wrapper(self, *args, **kw): 如果raise_exception=False 返回None """ return ret + return wrapper @@ -96,6 +97,7 @@ class DefaultRetryStrategy(RetryStrategy): 返回下次重试的间隔时间, 单位为秒,我们会使用 time.sleep在这里同步等待之后进行重新connect,然后再重新发起 源请求,直到gen结束。 """ + @classmethod def gen(cls): # 默认重试4次 ... 时间间隔如下 @@ -107,6 +109,7 @@ class TrafficStatSocket(socket.socket): """ 实现支持流量统计的socket类 """ + def __init__(self, sock, mode): super(TrafficStatSocket, self).__init__(sock, mode) # 流量统计相关 @@ -119,8 +122,24 @@ def __init__(self, sock, mode): self.last_api_send_bytes = 0 # 最近的一次api调用的发送字节数 self.last_api_recv_bytes = 0 # 最近一次api调用的接收字节数 -class BaseSocketClient(object): + def send(self, data, flags=None): + nsended = super(TrafficStatSocket, self).send(data, flags) + if self.first_pkg_send_time is None: + self.first_pkg_send_time = datetime.datetime.now() + self.send_pkg_num += 1 + self.send_pkg_bytes += nsended + self.last_api_send_bytes += nsended + return nsended + + def recv(self, buffersize, flags=None): + head_buf = super(TrafficStatSocket, self).recv(buffersize, flags) + self.recv_pkg_num += 1 + self.recv_pkg_bytes += buffersize + self.last_api_recv_bytes += buffersize + return head_buf + +class BaseSocketClient(object): def __init__(self, multithread=False, heartbeat=False, auto_retry=False, raise_exception=False): self.need_setup = True if multithread or heartbeat: @@ -128,25 +147,23 @@ def __init__(self, multithread=False, heartbeat=False, auto_retry=False, raise_e else: self.lock = None - self.client = None self.heartbeat = heartbeat self.heartbeat_thread = None self.stop_event = None - self.heartbeat_interval = DEFAULT_HEARTBEAT_INTERVAL # 默认10秒一个心跳包 + self.heartbeat_interval = DEFAULT_HEARTBEAT_INTERVAL # 默认10秒一个心跳包 self.last_ack_time = time.time() self.last_transaction_failed = False self.ip = None self.port = None # 是否重试 - self.auto_retry=auto_retry + self.auto_retry = auto_retry # 可以覆盖这个属性,使用新的重试策略 self.retry_strategy = DefaultRetryStrategy() # 是否在函数调用出错的时候抛出异常 self.raise_exception = raise_exception - def connect(self, ip='101.227.73.20', port=7709): """ @@ -187,7 +204,7 @@ def connect(self, ip='101.227.73.20', port=7709): def disconnect(self): if self.heartbeat_thread and \ - self.heartbeat_thread.is_alive(): + self.heartbeat_thread.is_alive(): self.stop_event.set() if self.client: @@ -209,7 +226,6 @@ def close(self): """ self.disconnect() - def get_traffic_stats(self): """ 获取流量统计的信息 @@ -257,6 +273,6 @@ def to_df(self, v): if isinstance(v, list): return pd.DataFrame(data=v) elif isinstance(v, dict): - return pd.DataFrame(data=[v,]) + return pd.DataFrame(data=[v, ]) else: return pd.DataFrame(data=[{'value': v}]) diff --git a/pytdx/parser/base.py b/pytdx/parser/base.py index 67a5312..22aa1dc 100644 --- a/pytdx/parser/base.py +++ b/pytdx/parser/base.py @@ -78,13 +78,6 @@ def _call_api(self): nsended = self.client.send(self.send_pkg) - self.client.send_pkg_num += 1 - self.client.send_pkg_bytes += nsended - self.client.last_api_send_bytes = nsended - - if self.client.first_pkg_send_time is None: - self.client.first_pkg_send_time = datetime.datetime.now() - if DEBUG: log.debug("send package:" + str(self.send_pkg)) if nsended != len(self.send_pkg): @@ -95,8 +88,6 @@ def _call_api(self): if DEBUG: log.debug("recv head_buf:" + str(head_buf) + " |len is :" + str(len(head_buf))) if len(head_buf) == self.rsp_header_len: - self.client.recv_pkg_num += 1 - self.client.recv_pkg_bytes += self.rsp_header_len _, _, _, zipsize, unzipsize = struct.unpack(" Date: Tue, 7 Nov 2017 16:14:05 +0800 Subject: [PATCH 02/15] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E4=BF=A1=E6=81=AFbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytdx/base_socket_client.py | 8 ++++++-- pytdx/parser/base.py | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pytdx/base_socket_client.py b/pytdx/base_socket_client.py index 13bb5d1..899bc81 100644 --- a/pytdx/base_socket_client.py +++ b/pytdx/base_socket_client.py @@ -128,16 +128,20 @@ def send(self, data, flags=None): self.first_pkg_send_time = datetime.datetime.now() self.send_pkg_num += 1 self.send_pkg_bytes += nsended - self.last_api_send_bytes += nsended return nsended def recv(self, buffersize, flags=None): head_buf = super(TrafficStatSocket, self).recv(buffersize, flags) self.recv_pkg_num += 1 self.recv_pkg_bytes += buffersize - self.last_api_recv_bytes += buffersize return head_buf + def set_last_api_sent(self,num): + self.last_api_recv_bytes = num + + def set_last_api_received(self,num): + self.last_api_recv_bytes = num + class BaseSocketClient(object): def __init__(self, multithread=False, heartbeat=False, auto_retry=False, raise_exception=False): diff --git a/pytdx/parser/base.py b/pytdx/parser/base.py index 66354c2..77bb906 100644 --- a/pytdx/parser/base.py +++ b/pytdx/parser/base.py @@ -84,6 +84,7 @@ def _call_api(self): raise SendPkgNotReady("send pkg not ready") nsended = self.client.send(self.send_pkg) + self.client.set_last_api_sent(nsended) if DEBUG: log.debug("send package:" + str(self.send_pkg)) @@ -109,7 +110,7 @@ def _call_api(self): if not(buf) or len_buf == 0 or len(body_buf) == zipsize: break - + self.client.set_last_api_received(last_api_recv_bytes) if len(buf) == 0: log.debug("接收数据体失败服务器断开连接") raise ResponseRecvFails("接收数据体失败服务器断开连接") From ab5b7a17d02042ad501b7a6bc876b2ba3c9e70a7 Mon Sep 17 00:00:00 2001 From: JaysonAlbert Date: Tue, 7 Nov 2017 16:20:42 +0800 Subject: [PATCH 03/15] send, recv bug --- pytdx/base_socket_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytdx/base_socket_client.py b/pytdx/base_socket_client.py index 899bc81..0bda4a5 100644 --- a/pytdx/base_socket_client.py +++ b/pytdx/base_socket_client.py @@ -123,7 +123,7 @@ def __init__(self, sock, mode): self.last_api_recv_bytes = 0 # 最近一次api调用的接收字节数 def send(self, data, flags=None): - nsended = super(TrafficStatSocket, self).send(data, flags) + nsended = super(TrafficStatSocket, self).send(data) if self.first_pkg_send_time is None: self.first_pkg_send_time = datetime.datetime.now() self.send_pkg_num += 1 @@ -131,7 +131,7 @@ def send(self, data, flags=None): return nsended def recv(self, buffersize, flags=None): - head_buf = super(TrafficStatSocket, self).recv(buffersize, flags) + head_buf = super(TrafficStatSocket, self).recv(buffersize) self.recv_pkg_num += 1 self.recv_pkg_bytes += buffersize return head_buf From f4a4e9493630f466902b5e64e496dedad3a78ab8 Mon Sep 17 00:00:00 2001 From: RainX Date: Wed, 8 Nov 2017 09:18:43 +0800 Subject: [PATCH 04/15] add reflect version to asynclize --- pytdx/async/__init__.py | 0 pytdx/async/reflection.py | 71 +++++++++++++++++++++++++++++++++++++++ pytdx/parser/base.py | 1 + 3 files changed, 72 insertions(+) create mode 100644 pytdx/async/__init__.py create mode 100644 pytdx/async/reflection.py diff --git a/pytdx/async/__init__.py b/pytdx/async/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py new file mode 100644 index 0000000..1c83d2a --- /dev/null +++ b/pytdx/async/reflection.py @@ -0,0 +1,71 @@ +#coding: utf-8 + +import six +if six.PY2: + raise NotImplementedError("I am only working for Python3") + +from pytdx.parser.base import BaseParser +import asyncio +from pytdx.parser.base import SendPkgNotReady, SendRequestPkgFails, ResponseRecvFails +from pytdx.log import DEBUG, log +import struct +import zlib + + +def make_async_parser(parser: BaseParser): + """ + 通过反射,重新绑定Parser 的 call_api 和 _call_api 方法 + :param parser: + :return: + """ + + @asyncio.coroutine + def call_api(self): + if self.lock: + with self.lock: + log.debug("sending thread lock api call") + result = yield from self._call_api() + else: + result = yield from self._call_api() + return result + + @asyncio.coroutine + def _call_api(self): + if not self.send_pkg: + SendPkgNotReady("send pkg not ready") + + self.client.send(self.send_pkg) + head_buf = yield from self.client.recv(self.rsp_header_len) + if len(head_buf) == self.rsp_header_len: + _, _, _, zipsize, unzipsize = struct.unpack(" Date: Wed, 8 Nov 2017 10:24:44 +0800 Subject: [PATCH 05/15] add reflaction interface --- pytdx/async/async_base_socket_client.py | 49 +++++++++++++++++++++++++ pytdx/async/reflection.py | 12 ++++++ 2 files changed, 61 insertions(+) create mode 100644 pytdx/async/async_base_socket_client.py diff --git a/pytdx/async/async_base_socket_client.py b/pytdx/async/async_base_socket_client.py new file mode 100644 index 0000000..2395352 --- /dev/null +++ b/pytdx/async/async_base_socket_client.py @@ -0,0 +1,49 @@ +#coding: utf-8 + +import datetime +import asyncio + + +class AsyncTrafficStatSocket(object): + """ + 实现支持流量统计的socket类 + """ + + def __init__(self, reader, writer, loop): + super(AsyncTrafficStatSocket, self).__init__() + # 流量统计相关 + self.send_pkg_num = 0 # 发送次数 + self.recv_pkg_num = 0 # 接收次数 + self.send_pkg_bytes = 0 # 发送字节 + self.recv_pkg_bytes = 0 # 接收字节数 + self.first_pkg_send_time = None # 第一个数据包发送时间 + + self.last_api_send_bytes = 0 # 最近的一次api调用的发送字节数 + self.last_api_recv_bytes = 0 # 最近一次api调用的接收字节数 + self.reader = reader + self.writer = writer + self.loop = loop + + @asyncio.coroutine + def send(self, data, flags=None): + nsended = len(data) + self.writer.write(data) + yield self.writer.drain() + if self.first_pkg_send_time is None: + self.first_pkg_send_time = datetime.datetime.now() + self.send_pkg_num += 1 + self.send_pkg_bytes += nsended + return nsended + + @asyncio.coroutine + def recv(self, buffersize, flags=None): + head_buf = yield from self.reader.read(buffersize) + self.recv_pkg_num += 1 + self.recv_pkg_bytes += buffersize + return head_buf + + def set_last_api_sent(self,num): + self.last_api_recv_bytes = num + + def set_last_api_received(self,num): + self.last_api_recv_bytes = num \ No newline at end of file diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index 1c83d2a..05ef602 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -10,6 +10,8 @@ from pytdx.log import DEBUG, log import struct import zlib +from pytdx.async.async_base_socket_client import AsyncTrafficStatSocket +from pytdx.base_socket_client import BaseSocketClient def make_async_parser(parser: BaseParser): @@ -69,3 +71,13 @@ def _call_api(self): return parser +if __name__ == '__main__': + + from pytdx.parser.get_security_quotes import GetSecurityQuotesCmd + import pprint + cmd = GetSecurityQuotesCmd(client=None, lock=None) + + async_cmd = make_async_parser(cmd) + + pprint.pprint(async_cmd) + From 43cdd94e69629558db24dfb0c567dd2b6f4de257 Mon Sep 17 00:00:00 2001 From: RainX Date: Wed, 8 Nov 2017 10:35:58 +0800 Subject: [PATCH 06/15] remove useless ref --- pytdx/async/reflection.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index 05ef602..62c074d 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -10,9 +10,6 @@ from pytdx.log import DEBUG, log import struct import zlib -from pytdx.async.async_base_socket_client import AsyncTrafficStatSocket -from pytdx.base_socket_client import BaseSocketClient - def make_async_parser(parser: BaseParser): """ From 54b087c82fc1fbc525fb3b3323d91b8b038133e8 Mon Sep 17 00:00:00 2001 From: JaysonAlbert Date: Tue, 2 Jan 2018 11:03:32 +0800 Subject: [PATCH 07/15] add connection pool for async, and fix bugs. --- pytdx/async/async_base_socket_client.py | 24 ++- pytdx/async/hq.py | 225 ++++++++++++++++++++++++ pytdx/async/pool.py | 51 ++++++ pytdx/async/reflection.py | 63 +++++-- pytdx/parser/base.py | 2 +- pytdx/parser/setup_commands.py | 10 +- 6 files changed, 352 insertions(+), 23 deletions(-) create mode 100644 pytdx/async/hq.py create mode 100644 pytdx/async/pool.py diff --git a/pytdx/async/async_base_socket_client.py b/pytdx/async/async_base_socket_client.py index 2395352..70a9eb7 100644 --- a/pytdx/async/async_base_socket_client.py +++ b/pytdx/async/async_base_socket_client.py @@ -9,7 +9,7 @@ class AsyncTrafficStatSocket(object): 实现支持流量统计的socket类 """ - def __init__(self, reader, writer, loop): + def __init__(self, ip, port, loop): super(AsyncTrafficStatSocket, self).__init__() # 流量统计相关 self.send_pkg_num = 0 # 发送次数 @@ -20,15 +20,29 @@ def __init__(self, reader, writer, loop): self.last_api_send_bytes = 0 # 最近的一次api调用的发送字节数 self.last_api_recv_bytes = 0 # 最近一次api调用的接收字节数 - self.reader = reader - self.writer = writer + self.reader = None + self.writer = None + self.ip = ip + self.port = port self.loop = loop + self.connected = False + + async def connect(self): + self.reader, self.writer = await asyncio.open_connection(self.ip, self.port, loop=self.loop) + self.connected = True + return self + + def disconnect(self): + self.writer.close() + @asyncio.coroutine def send(self, data, flags=None): + if not (self.reader and self.writer): + yield from self.connect() nsended = len(data) self.writer.write(data) - yield self.writer.drain() + # yield from self.writer.drain() if self.first_pkg_send_time is None: self.first_pkg_send_time = datetime.datetime.now() self.send_pkg_num += 1 @@ -37,6 +51,8 @@ def send(self, data, flags=None): @asyncio.coroutine def recv(self, buffersize, flags=None): + if not (self.reader and self.writer): + yield from self.connect() head_buf = yield from self.reader.read(buffersize) self.recv_pkg_num += 1 self.recv_pkg_bytes += buffersize diff --git a/pytdx/async/hq.py b/pytdx/async/hq.py new file mode 100644 index 0000000..40e11eb --- /dev/null +++ b/pytdx/async/hq.py @@ -0,0 +1,225 @@ +from pytdx.async.pool import ConnectionPool +from pytdx.async.reflection import make_async_parser +import timeit +import random +import pandas as pd +from pytdx.base_socket_client import update_last_ack_time + +from pytdx.parser.get_block_info import (GetBlockInfo, GetBlockInfoMeta, + get_and_parse_block_info) +from pytdx.parser.get_company_info_category import GetCompanyInfoCategory +from pytdx.parser.get_company_info_content import GetCompanyInfoContent +from pytdx.parser.get_finance_info import GetFinanceInfo +from pytdx.parser.get_history_minute_time_data import GetHistoryMinuteTimeData +from pytdx.parser.get_history_transaction_data import GetHistoryTransactionData +from pytdx.parser.get_index_bars import GetIndexBarsCmd +from pytdx.parser.get_minute_time_data import GetMinuteTimeData +from pytdx.parser.get_security_bars import GetSecurityBarsCmd +from pytdx.parser.get_security_count import GetSecurityCountCmd +from pytdx.parser.get_security_list import GetSecurityList +from pytdx.parser.get_security_quotes import GetSecurityQuotesCmd +from pytdx.parser.get_transaction_data import GetTransactionData +from pytdx.parser.get_xdxr_info import GetXdXrInfo +from pytdx.parser.setup_commands import SetupCmd1, SetupCmd2, SetupCmd3 + +from functools import wraps + + +def exec_command(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + connection = self.pool.get_connection() + + if not connection.connected: + yield from make_async_parser(SetupCmd1, connection).call_api() + + yield from make_async_parser(SetupCmd2, connection).call_api() + + yield from make_async_parser(SetupCmd3, connection).call_api() + + data = yield from func(self, *args, **kwargs, connection=connection) + self.pool.release(connection) + return data + + return wrapper + + +class ATdxHq_API(): + + def __init__(self, ip='101.227.73.20', port=7709, auto_retry=False, raise_exception=True): + self.pool = ConnectionPool(ip=ip, port=port) + self.auto_retry = auto_retry + self.raise_exception = raise_exception + connection = None + + # Notice:,如果一个股票当天停牌,那天的K线还是能取到,成交量为0 + @update_last_ack_time + @exec_command + def get_security_bars(self, category, market, code, start, count, connection=None): + cmd = make_async_parser(GetSecurityBarsCmd, connection) + cmd.setParams(category, market, code, start, count) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_index_bars(self, category, market, code, start, count, connection=None): + cmd = make_async_parser(GetIndexBarsCmd, connection) + cmd.setParams(category, market, code, start, count) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_security_quotes(self, all_stock, code=None, connection=None): + """ + 支持三种形式的参数 + get_security_quotes(market, code ) + get_security_quotes((market, code)) + get_security_quotes([(market1, code1), (market2, code2)] ) + :param all_stock (market, code) 的数组 + :param code{optional} code to query + :return: + """ + + if code is not None: + all_stock = [(all_stock, code)] + elif (isinstance(all_stock, list) or isinstance(all_stock, tuple)) \ + and len(all_stock) == 2 and type(all_stock[0]) is int: + all_stock = [all_stock] + + cmd = make_async_parser(GetSecurityQuotesCmd, connection) + cmd.setParams(all_stock) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_security_count(self, market, connection=None): + cmd = make_async_parser(GetSecurityCountCmd, connection) + cmd.setParams(market) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_security_list(self, market, start, connection=None): + cmd = make_async_parser(GetSecurityList, connection) + cmd.setParams(market, start) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_minute_time_data(self, market, code, connection=None): + cmd = make_async_parser(GetMinuteTimeData, connection) + cmd.setParams(market, code) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_history_minute_time_data(self, market, code, date, connection=None): + cmd = make_async_parser(GetHistoryMinuteTimeData, connection) + cmd.setParams(market, code, date) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_transaction_data(self, market, code, start, count, connection=None): + cmd = make_async_parser(GetTransactionData, connection) + cmd.setParams(market, code, start, count) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_history_transaction_data(self, market, code, start, count, date, connection=None): + cmd = make_async_parser(GetHistoryTransactionData, connection) + cmd.setParams(market, code, start, count, date) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_company_info_category(self, market, code, connection=None): + cmd = make_async_parser(GetCompanyInfoCategory, connection) + cmd.setParams(market, code) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_company_info_content(self, market, code, filename, start, length, connection=None): + cmd = make_async_parser(GetCompanyInfoContent, connection) + cmd.setParams(market, code, filename, start, length) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_xdxr_info(self, market, code, connection=None): + cmd = make_async_parser(GetXdXrInfo, connection) + cmd.setParams(market, code) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_finance_info(self, market, code, connection=None): + cmd = make_async_parser(GetFinanceInfo, connection) + cmd.setParams(market, code) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_block_info_meta(self, blockfile, connection=None): + cmd = make_async_parser(GetBlockInfoMeta, connection) + cmd.setParams(blockfile) + return cmd.call_api() + + @update_last_ack_time + @exec_command + def get_block_info(self, blockfile, start, size, connection=None): + cmd = make_async_parser(GetBlockInfo, connection) + cmd.setParams(blockfile, start, size) + return cmd.call_api() + + def get_and_parse_block_info(self, blockfile): + return get_and_parse_block_info(self, blockfile) + + @update_last_ack_time + @exec_command + def do_heartbeat(self): + return self.get_security_count(random.randint(0, 1), connection=None) + + def run_until_complete(self, *args, **kwargs): + return self.pool.run_until_complete(*args, **kwargs) + + @update_last_ack_time + @exec_command + def get_k_data(self, code, start_date, end_date, connection=None): + # 具体详情参见 https://github.com/rainx/pytdx/issues/5 + # 具体详情参见 https://github.com/rainx/pytdx/issues/21 + def __select_market_code(code): + code = str(code) + if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204"]: + return 1 + return 0 + + # 新版一劳永逸偷懒写法zzz + market_code = 1 if str(code)[0] == '6' else 0 + # https://github.com/rainx/pytdx/issues/33 + # 0 - 深圳, 1 - 上海 + + data = pd.concat([self.to_df(self.get_security_bars(9, __select_market_code( + code), code, (9 - i) * 800, 800)) for i in range(10)], axis=0) + + data = data.assign(date=data['datetime'].apply(lambda x: str(x)[0:10])).assign(code=str(code)) \ + .set_index('date', drop=False, inplace=False) \ + .drop(['year', 'month', 'day', 'hour', 'minute', 'datetime'], axis=1)[start_date:end_date] + return data.assign(date=data['date'].apply(lambda x: str(x)[0:10])) + + +if __name__ == '__main__': + import asyncio + + + def main(): + api = ATdxHq_API(ip='218.108.98.244') + + res = [api.get_security_bars(8, 0, '000001', 0, 80) for i in range(100)] + + api.run_until_complete(asyncio.wait(res)) + + + print(timeit.timeit(main, number=1)) diff --git a/pytdx/async/pool.py b/pytdx/async/pool.py new file mode 100644 index 0000000..5daee58 --- /dev/null +++ b/pytdx/async/pool.py @@ -0,0 +1,51 @@ +import os +import asyncio +from itertools import chain +from .async_base_socket_client import AsyncTrafficStatSocket + +import os + + +class ConnectionPool(object): + + def __init__(self, ip, port, max_connections=None, loop=None): + + self.pid = os.getpid() + self.max_connections = max_connections or 2 ** 31 + + self.loop = loop or asyncio.get_event_loop() + self.ip = ip + self.port = port + self._available_connections = [] + self.created_connect = 0 + self._in_use_connections = set() + + def get_connection(self): + try: + connection = self._available_connections.pop() + except IndexError: + connection = self.make_connection() + + self._in_use_connections.add(connection) + return connection + + def make_connection(self): + if self.created_connect >= self.max_connections: + raise ConnectionError("Too many connections") + + self.created_connect += 1 + return AsyncTrafficStatSocket(self.ip, self.port, self.loop) + + def release(self, connection): + self._in_use_connections.remove(connection) + self._available_connections.append(connection) + + def disconnect(self): + "Disconnects all connections in the pool" + all_conns = chain(self._available_connections, + self._in_use_connections) + for connection in all_conns: + connection.disconnect() + + def run_until_complete(self,*args,**kwargs): + return self.loop.run_until_complete(*args,**kwargs) diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index 62c074d..5daee8a 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -1,17 +1,26 @@ -#coding: utf-8 +# coding: utf-8 import six + if six.PY2: raise NotImplementedError("I am only working for Python3") from pytdx.parser.base import BaseParser import asyncio from pytdx.parser.base import SendPkgNotReady, SendRequestPkgFails, ResponseRecvFails +from pytdx.parser.setup_commands import ( + SetupCmd1, + SetupCmd2, + SetupCmd3 +) from pytdx.log import DEBUG, log import struct import zlib +from pytdx.async.pool import ConnectionPool +import timeit + -def make_async_parser(parser: BaseParser): +def make_async_parser(parser, connection): """ 通过反射,重新绑定Parser 的 call_api 和 _call_api 方法 :param parser: @@ -33,14 +42,14 @@ def _call_api(self): if not self.send_pkg: SendPkgNotReady("send pkg not ready") - self.client.send(self.send_pkg) - head_buf = yield from self.client.recv(self.rsp_header_len) + yield from connection.send(self.send_pkg) + head_buf = yield from connection.recv(self.rsp_header_len) if len(head_buf) == self.rsp_header_len: _, _, _, zipsize, unzipsize = struct.unpack(" Date: Tue, 2 Jan 2018 14:31:30 +0800 Subject: [PATCH 08/15] change asyncio.coroutine to async def --- pytdx/async/async_base_socket_client.py | 12 +++++------- pytdx/async/hq.py | 10 +++++----- pytdx/async/reflection.py | 26 ++++++++++++------------- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/pytdx/async/async_base_socket_client.py b/pytdx/async/async_base_socket_client.py index 70a9eb7..25ae66d 100644 --- a/pytdx/async/async_base_socket_client.py +++ b/pytdx/async/async_base_socket_client.py @@ -36,10 +36,9 @@ async def connect(self): def disconnect(self): self.writer.close() - @asyncio.coroutine - def send(self, data, flags=None): + async def send(self, data, flags=None): if not (self.reader and self.writer): - yield from self.connect() + await self.connect() nsended = len(data) self.writer.write(data) # yield from self.writer.drain() @@ -49,11 +48,10 @@ def send(self, data, flags=None): self.send_pkg_bytes += nsended return nsended - @asyncio.coroutine - def recv(self, buffersize, flags=None): + async def recv(self, buffersize, flags=None): if not (self.reader and self.writer): - yield from self.connect() - head_buf = yield from self.reader.read(buffersize) + await self.connect() + head_buf = await self.reader.read(buffersize) self.recv_pkg_num += 1 self.recv_pkg_bytes += buffersize return head_buf diff --git a/pytdx/async/hq.py b/pytdx/async/hq.py index 40e11eb..62ae868 100644 --- a/pytdx/async/hq.py +++ b/pytdx/async/hq.py @@ -27,17 +27,17 @@ def exec_command(func): @wraps(func) - def wrapper(self, *args, **kwargs): + async def wrapper(self, *args, **kwargs): connection = self.pool.get_connection() if not connection.connected: - yield from make_async_parser(SetupCmd1, connection).call_api() + await make_async_parser(SetupCmd1, connection).call_api() - yield from make_async_parser(SetupCmd2, connection).call_api() + await make_async_parser(SetupCmd2, connection).call_api() - yield from make_async_parser(SetupCmd3, connection).call_api() + await make_async_parser(SetupCmd3, connection).call_api() - data = yield from func(self, *args, **kwargs, connection=connection) + data = await func(self, *args, **kwargs, connection=connection) self.pool.release(connection) return data diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index 5daee8a..aedb29d 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -27,29 +27,27 @@ def make_async_parser(parser, connection): :return: """ - @asyncio.coroutine - def call_api(self): + async def call_api(self): if self.lock: with self.lock: log.debug("sending thread lock api call") - result = yield from self._call_api() + result = await self._call_api() else: - result = yield from self._call_api() + result = await self._call_api() return result - @asyncio.coroutine - def _call_api(self): + async def _call_api(self): if not self.send_pkg: SendPkgNotReady("send pkg not ready") - yield from connection.send(self.send_pkg) - head_buf = yield from connection.recv(self.rsp_header_len) + await connection.send(self.send_pkg) + head_buf = await connection.recv(self.rsp_header_len) if len(head_buf) == self.rsp_header_len: _, _, _, zipsize, unzipsize = struct.unpack(" Date: Wed, 3 Jan 2018 11:47:08 +0800 Subject: [PATCH 09/15] limit connection numbers, asyncio.sleep(0.1) --- pytdx/async/hq.py | 2 +- pytdx/async/pool.py | 19 ++++++++++--------- pytdx/async/reflection.py | 11 ++++++----- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/pytdx/async/hq.py b/pytdx/async/hq.py index 62ae868..f15c5cb 100644 --- a/pytdx/async/hq.py +++ b/pytdx/async/hq.py @@ -28,7 +28,7 @@ def exec_command(func): @wraps(func) async def wrapper(self, *args, **kwargs): - connection = self.pool.get_connection() + connection = await self.pool.get_connection() if not connection.connected: await make_async_parser(SetupCmd1, connection).call_api() diff --git a/pytdx/async/pool.py b/pytdx/async/pool.py index 5daee58..7b76906 100644 --- a/pytdx/async/pool.py +++ b/pytdx/async/pool.py @@ -2,16 +2,16 @@ import asyncio from itertools import chain from .async_base_socket_client import AsyncTrafficStatSocket - +import pandas as pd import os class ConnectionPool(object): - def __init__(self, ip, port, max_connections=None, loop=None): + def __init__(self, ip, port, max_connections=300, loop=None): self.pid = os.getpid() - self.max_connections = max_connections or 2 ** 31 + self.max_connections = max_connections self.loop = loop or asyncio.get_event_loop() self.ip = ip @@ -20,8 +20,12 @@ def __init__(self, ip, port, max_connections=None, loop=None): self.created_connect = 0 self._in_use_connections = set() - def get_connection(self): + async def get_connection(self): try: + if self.created_connect >= self.max_connections: + # if self.created_connect > self.max_connections: + while len(self._available_connections) == 0: + await asyncio.sleep(0.2) connection = self._available_connections.pop() except IndexError: connection = self.make_connection() @@ -30,9 +34,6 @@ def get_connection(self): return connection def make_connection(self): - if self.created_connect >= self.max_connections: - raise ConnectionError("Too many connections") - self.created_connect += 1 return AsyncTrafficStatSocket(self.ip, self.port, self.loop) @@ -47,5 +48,5 @@ def disconnect(self): for connection in all_conns: connection.disconnect() - def run_until_complete(self,*args,**kwargs): - return self.loop.run_until_complete(*args,**kwargs) + def run_until_complete(self, *args, **kwargs): + return self.loop.run_until_complete(*args, **kwargs) diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index aedb29d..8d5121d 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -80,12 +80,12 @@ async def _call_api(self): from pytdx.hq import TdxHq_API import pprint + def time_async(): pool = ConnectionPool(ip='101.227.73.20', port=7709) - async def exec_command(pool,cmd): - + async def exec_command(pool, cmd): connection = pool.get_connection() if not connection.connected: @@ -95,7 +95,6 @@ async def exec_command(pool,cmd): await make_async_parser(SetupCmd3, connection).call_api() - async_cmd = make_async_parser(cmd, connection) async_cmd.setParams(8, 0, '000001', 0, 80) @@ -105,9 +104,10 @@ async def exec_command(pool,cmd): return data - res = [exec_command(pool,GetSecurityBarsCmd) for i in range(100)] + res = [exec_command(pool, GetSecurityBarsCmd) for i in range(100)] pool.run_until_complete(asyncio.wait(res)) + def time_orig(): api = TdxHq_API() api.connect(ip='218.108.98.244', port=7709) @@ -115,5 +115,6 @@ def time_orig(): for i in range(100): api.get_security_bars(8, 0, '000001', 0, 80) + # print(timeit.timeit(time_async, number=1)) - print(timeit.timeit(time_orig, number=1)) \ No newline at end of file + print(timeit.timeit(time_orig, number=1)) From 645167ffc079cf1168ff39c6f806bc566055f585 Mon Sep 17 00:00:00 2001 From: JaysonAlbert Date: Fri, 5 Jan 2018 10:07:53 +0800 Subject: [PATCH 10/15] setattr of object instead of class --- pytdx/async/reflection.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index 8d5121d..610972c 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -1,6 +1,7 @@ # coding: utf-8 import six +from functools import partial if six.PY2: raise NotImplementedError("I am only working for Python3") @@ -69,10 +70,12 @@ async def _call_api(self): return self.parseResponse(body_buf) - setattr(parser, "call_api", call_api) - setattr(parser, "_call_api", _call_api) + cmd = parser(None, None) - return parser(None, None) + setattr(cmd, "call_api", partial(call_api,cmd)) + setattr(cmd, "_call_api", partial(_call_api, cmd)) + + return cmd if __name__ == '__main__': From e17e6b536ec224dbd3b93b7de8d837891a671cb5 Mon Sep 17 00:00:00 2001 From: JaysonAlbert Date: Fri, 5 Jan 2018 13:00:03 +0800 Subject: [PATCH 11/15] bugfix: get_event_loop() --> new_event_loop() --- pytdx/async/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytdx/async/pool.py b/pytdx/async/pool.py index 7b76906..c8891b0 100644 --- a/pytdx/async/pool.py +++ b/pytdx/async/pool.py @@ -13,7 +13,7 @@ def __init__(self, ip, port, max_connections=300, loop=None): self.pid = os.getpid() self.max_connections = max_connections - self.loop = loop or asyncio.get_event_loop() + self.loop = loop or asyncio.new_event_loop() self.ip = ip self.port = port self._available_connections = [] From 2dde062f9464d9eadf9d30e8d5a94c83b679088c Mon Sep 17 00:00:00 2001 From: JaysonAlbert Date: Mon, 8 Jan 2018 11:51:08 +0800 Subject: [PATCH 12/15] =?UTF-8?q?=E4=BD=BF=E7=94=A8thread=20executor?= =?UTF-8?q?=E4=BB=A3=E7=90=86parseResponse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytdx/async/async_base_socket_client.py | 4 +- pytdx/async/hq.py | 1 - pytdx/async/pool.py | 3 +- pytdx/async/reflection.py | 55 ++++--------------------- pytdx/helper.py | 7 ++-- 5 files changed, 18 insertions(+), 52 deletions(-) diff --git a/pytdx/async/async_base_socket_client.py b/pytdx/async/async_base_socket_client.py index 25ae66d..0c5db6c 100644 --- a/pytdx/async/async_base_socket_client.py +++ b/pytdx/async/async_base_socket_client.py @@ -9,7 +9,7 @@ class AsyncTrafficStatSocket(object): 实现支持流量统计的socket类 """ - def __init__(self, ip, port, loop): + def __init__(self, ip, port, loop, pool): super(AsyncTrafficStatSocket, self).__init__() # 流量统计相关 self.send_pkg_num = 0 # 发送次数 @@ -26,6 +26,8 @@ def __init__(self, ip, port, loop): self.port = port self.loop = loop + self.pool = pool + self.connected = False async def connect(self): diff --git a/pytdx/async/hq.py b/pytdx/async/hq.py index f15c5cb..d22d8e2 100644 --- a/pytdx/async/hq.py +++ b/pytdx/async/hq.py @@ -38,7 +38,6 @@ async def wrapper(self, *args, **kwargs): await make_async_parser(SetupCmd3, connection).call_api() data = await func(self, *args, **kwargs, connection=connection) - self.pool.release(connection) return data return wrapper diff --git a/pytdx/async/pool.py b/pytdx/async/pool.py index c8891b0..7986623 100644 --- a/pytdx/async/pool.py +++ b/pytdx/async/pool.py @@ -14,6 +14,7 @@ def __init__(self, ip, port, max_connections=300, loop=None): self.max_connections = max_connections self.loop = loop or asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) self.ip = ip self.port = port self._available_connections = [] @@ -35,7 +36,7 @@ async def get_connection(self): def make_connection(self): self.created_connect += 1 - return AsyncTrafficStatSocket(self.ip, self.port, self.loop) + return AsyncTrafficStatSocket(self.ip, self.port, self.loop, self) def release(self, connection): self._in_use_connections.remove(connection) diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index 610972c..4458d28 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -19,6 +19,11 @@ import zlib from pytdx.async.pool import ConnectionPool import timeit +import asyncio + +from concurrent.futures import ThreadPoolExecutor + +executor = ThreadPoolExecutor(8) def make_async_parser(parser, connection): @@ -54,6 +59,8 @@ async def _call_api(self): if not (buf) or len_buf == 0 or len(body_buf) == zipsize: break + connection.pool.release(connection) + if len(buf) == 0: log.debug("接收数据体失败服务器断开连接") raise ResponseRecvFails("接收数据体失败服务器断开连接") @@ -68,7 +75,8 @@ async def _call_api(self): log.debug("recv body: ") log.debug(body_buf) - return self.parseResponse(body_buf) + # return self.parseResponse(body_buf) + return await asyncio.get_event_loop().run_in_executor(executor, self.parseResponse, body_buf) cmd = parser(None, None) @@ -76,48 +84,3 @@ async def _call_api(self): setattr(cmd, "_call_api", partial(_call_api, cmd)) return cmd - - -if __name__ == '__main__': - from pytdx.parser.get_security_bars import GetSecurityBarsCmd - from pytdx.hq import TdxHq_API - import pprint - - - def time_async(): - - pool = ConnectionPool(ip='101.227.73.20', port=7709) - - async def exec_command(pool, cmd): - connection = pool.get_connection() - - if not connection.connected: - await make_async_parser(SetupCmd1, connection).call_api() - - await make_async_parser(SetupCmd2, connection).call_api() - - await make_async_parser(SetupCmd3, connection).call_api() - - async_cmd = make_async_parser(cmd, connection) - - async_cmd.setParams(8, 0, '000001', 0, 80) - - data = await async_cmd.call_api() - pool.release(connection) - - return data - - res = [exec_command(pool, GetSecurityBarsCmd) for i in range(100)] - pool.run_until_complete(asyncio.wait(res)) - - - def time_orig(): - api = TdxHq_API() - api.connect(ip='218.108.98.244', port=7709) - - for i in range(100): - api.get_security_bars(8, 0, '000001', 0, 80) - - - # print(timeit.timeit(time_async, number=1)) - print(timeit.timeit(time_orig, number=1)) diff --git a/pytdx/helper.py b/pytdx/helper.py index 275c25c..775d846 100644 --- a/pytdx/helper.py +++ b/pytdx/helper.py @@ -121,12 +121,13 @@ def get_time(buffer, pos): return hour, minute, pos -def indexbytes(data, pos): - if six.PY2: +if six.PY2: + def indexbytes(data, pos): if type(data) is bytearray: return data[pos] else: return six.indexbytes(data, pos) - else: +else: + def indexbytes(data, pos): return data[pos] From 71cdb1de18d2db30a209b89f86b4a5cb66ac7104 Mon Sep 17 00:00:00 2001 From: JaysonAlbert Date: Thu, 18 Jan 2018 13:41:37 +0800 Subject: [PATCH 13/15] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BD=91=E7=BB=9C?= =?UTF-8?q?=E4=B8=8D=E7=A8=B3=E5=AE=9A=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytdx/async/hq.py | 25 +++++++++++++++++++------ pytdx/async/reflection.py | 4 ++-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pytdx/async/hq.py b/pytdx/async/hq.py index d22d8e2..18b3276 100644 --- a/pytdx/async/hq.py +++ b/pytdx/async/hq.py @@ -20,9 +20,21 @@ from pytdx.parser.get_security_quotes import GetSecurityQuotesCmd from pytdx.parser.get_transaction_data import GetTransactionData from pytdx.parser.get_xdxr_info import GetXdXrInfo -from pytdx.parser.setup_commands import SetupCmd1, SetupCmd2, SetupCmd3 from functools import wraps +import struct + +async def receive_all(send_pkg, connection): + await connection.send(send_pkg) + head_buf = await connection.recv(0x10) + if len(head_buf) == 0x10: + _, _, _, zipsize, unzipsize = struct.unpack(" Date: Thu, 18 Jan 2018 14:48:30 +0800 Subject: [PATCH 14/15] =?UTF-8?q?=E5=87=8F=E5=B0=91=E5=86=85=E5=AD=98?= =?UTF-8?q?=E5=BC=80=E9=94=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytdx/async/pool.py | 2 +- pytdx/async/reflection.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pytdx/async/pool.py b/pytdx/async/pool.py index 7986623..4acda58 100644 --- a/pytdx/async/pool.py +++ b/pytdx/async/pool.py @@ -8,7 +8,7 @@ class ConnectionPool(object): - def __init__(self, ip, port, max_connections=300, loop=None): + def __init__(self, ip, port, max_connections=100, loop=None): self.pid = os.getpid() self.max_connections = max_connections diff --git a/pytdx/async/reflection.py b/pytdx/async/reflection.py index d901326..983e7ee 100644 --- a/pytdx/async/reflection.py +++ b/pytdx/async/reflection.py @@ -75,7 +75,6 @@ async def _call_api(self): log.debug("recv body: ") log.debug(body_buf) - # return self.parseResponse(body_buf) return await asyncio.get_event_loop().run_in_executor(executor, self.parseResponse, body_buf) cmd = parser(None, None) From 7d59d9be10202883d78c4667ef08ced5d83bde87 Mon Sep 17 00:00:00 2001 From: RainX Date: Mon, 22 Jan 2018 10:15:30 +0800 Subject: [PATCH 15/15] add utf-8 declare --- pytdx/async/hq.py | 2 ++ pytdx/async/pool.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pytdx/async/hq.py b/pytdx/async/hq.py index 18b3276..032d6c4 100644 --- a/pytdx/async/hq.py +++ b/pytdx/async/hq.py @@ -1,3 +1,5 @@ +# coding: utf-8 + from pytdx.async.pool import ConnectionPool from pytdx.async.reflection import make_async_parser import timeit diff --git a/pytdx/async/pool.py b/pytdx/async/pool.py index 4acda58..19b11db 100644 --- a/pytdx/async/pool.py +++ b/pytdx/async/pool.py @@ -1,3 +1,5 @@ +# coding: utf-8 + import os import asyncio from itertools import chain