Skip to content

Commit

Permalink
add heartbeat pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
rainx committed Jul 25, 2017
2 parents 0644fd6 + 9d95fb6 commit a5fe6f9
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 167 deletions.
9 changes: 8 additions & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
1.13
* 增加心跳包heartbeat参数,自动创建心跳包线程
* 将HqAPI和 ExHqAPI部分逻辑放到BaseSocketClient里

1.12
* pr #13 简化用户输入 https://github.com/rainx/pytdx/pull/13

1.11
------
* 追加exhq get_minute_time_data 接口
Expand Down Expand Up @@ -57,4 +64,4 @@

1.0
------
* 初始版本
* 初始版本
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,14 @@ get_k_data('000001','2017-07-03','2017-07-10')
api = TdxHq_API(multithread=True)
```

### 心跳包

由于长时间不与服务器交互,服务器将关闭连接,所以我们实现了心跳包的机制,可以通过
```python
api = TdxHq_API(heartbeat=True)
```
设置心跳包,程序会启动一个心跳包发送线程,在空闲状态下隔一段时间发送一个心跳包,注意,打开heartbeat=True选项的同时会自动打开multithread=True

### 调试模式

如果您需要调试本代码,监控传输过程中的数据包传输情况,可以使用调试模式,使用方法是设定环境变量 TDX_DEBUG 为 1 如
Expand Down
141 changes: 141 additions & 0 deletions pytdx/base_socket_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# coding=utf-8

#
# Just for practising
#


import os
import socket
import sys
import pandas as pd

if __name__ == '__main__':
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.realpath(__file__))))

from pytdx.log import DEBUG, log

import threading,datetime
import time
from pytdx.heartbeat import HqHeartBeatThread
import functools


CONNECT_TIMEOUT = 5.000
RECV_HEADER_LEN = 0x10
DEFAULT_HEARTBEAT_INTERVAL = 10.0


"""
In [7]: 0x7e
Out[7]: 126
In [5]: len(body)
Out[5]: 8066
In [6]: len(body)/126
Out[6]: 64.01587301587301
In [7]: len(body)%126
Out[7]: 2
In [8]: (len(body)-2)/126
Out[8]: 64.0
"""
def update_last_ack_time(func):
@functools.wraps(func)
def wrapper(self, *args, **kw):
self.last_ack_time = time.time()
log.debug("last ack time update to " + str(self.last_ack_time))
try:
ret = func(self, *args, **kw)
except Exception as e:
self.last_transaction_failed = True
ret = None
raise e
finally:
return ret
return wrapper


class BaseSocketClient(object):

def __init__(self, multithread=False, heartbeat=False):
self.need_setup = True
if multithread or heartbeat:
self.lock = threading.Lock()
else:
self.lock = None

self.client = None
self.heartbeat = heartbeat
self.heartbeat_thread = None
self.stop_event = None
self.heartbeat_interval = DEFAULT_HEARTBEAT_INTERVAL # 默认10秒一个心跳包
self.last_ack_time = time.time()
self.last_transaction_failed = False

def connect(self, ip='101.227.73.20', port=7709):
"""
:param ip: 服务器ip 地址
:param port: 服务器端口
:return: 是否连接成功 True/False
"""

self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.settimeout(CONNECT_TIMEOUT)
log.debug("connecting to server : %s on port :%d" % (ip, port))
try:
self.client.connect((ip, port))
except socket.timeout as e:
print(str(e))
log.debug("connection expired")
return False
log.debug("connected!")

if self.need_setup:
self.setup()

self.stop_event = threading.Event()
self.heartbeat_thread = HqHeartBeatThread(self, self.stop_event, self.heartbeat_interval)
self.heartbeat_thread.start()
return self

def disconnect(self):

if self.heartbeat_thread and \
self.heartbeat_thread.is_alive():
self.stop_event.set()

if self.client:
log.debug("disconnecting")
try:
self.client.shutdown(socket.SHUT_RDWR)
self.client.close()
self.client = None
except Exception as e:
log.debug(str(e))
log.debug("disconnected")

def close(self):
"""
disconnect的别名,为了支持 with closing(obj): 语法
:return:
"""
self.disconnect()


def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def to_df(self, v):
if isinstance(v, list):
return pd.DataFrame(data=v)
elif isinstance(v, dict):
return pd.DataFrame(data=[v,])
else:
return pd.DataFrame(data=[{'value': v}])
68 changes: 9 additions & 59 deletions pytdx/exhq.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
from pytdx.params import TDXParams

import threading,datetime
from pytdx.base_socket_client import BaseSocketClient, update_last_ack_time

CONNECT_TIMEOUT = 5.000
RECV_HEADER_LEN = 0x10

"""
In [7]: 0x7e
Expand All @@ -45,87 +44,38 @@
Out[8]: 64.0
"""

class TdxExHq_API(object):

def __init__(self, multithread=False):
self.need_setup = True
if multithread:
self.lock = threading.Lock()
else:
self.lock = None

def connect(self, ip, port):
"""
:param ip: 服务器ip 地址
:param port: 服务器端口
:return: 是否连接成功 True/False
"""

self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.settimeout(CONNECT_TIMEOUT)
log.debug("connecting to server : %s on port :%d" % (ip, port))
try:
self.client.connect((ip, port))
except socket.timeout as e:
print(str(e))
log.debug("connection expired")
return False
log.debug("connected!")

if self.need_setup:
self.setup()

return self

def disconnect(self):
if self.client:
log.debug("disconnecting")
try:
self.client.shutdown(socket.SHUT_RDWR)
self.client.close()
except Exception as e:
log.debug(str(e))
log.debug("disconnected")

def close(self):
"""
disconnect的别名,为了支持 with closing(obj): 语法
:return:
"""
self.disconnect()


def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class TdxExHq_API(BaseSocketClient):

def setup(self):
ExSetupCmd1(self.client).call_api()


### API LIST

@update_last_ack_time
def get_markets(self):
cmd = GetMarkets(self.client)
return cmd.call_api()

@update_last_ack_time
def get_instrument_count(self):
cmd = GetInstrumentCount(self.client)
return cmd.call_api()

@update_last_ack_time
def get_instrument_quote(self, market, code):
cmd = GetInstrumentQuote(self.client)
cmd.setParams(market, code)
return cmd.call_api()

@update_last_ack_time
def get_minute_time_data(self, market, code):
cmd = GetMinuteTimeData(self.client)
cmd.setParams(market, code)
return cmd.call_api()

def do_heartbeat(self):
self.get_instrument_count()

if __name__ == '__main__':
import pprint

Expand Down
2 changes: 1 addition & 1 deletion pytdx/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def run(self):
if self.client and (time.time() - self.api.last_ack_time > self.heartbeat_interval):
try:
# 发送一个获取股票数量的包作为心跳包
self.api.get_security_count(random.randint(0, 1))
self.api.do_heartbeat()
except Exception as e:
log.debug(str(e))

Expand Down
Loading

0 comments on commit a5fe6f9

Please sign in to comment.