Skip to content

Commit

Permalink
Merge branch 'v3.8' of https://github.com/FutunnOpen/futu-api into v3.8
Browse files Browse the repository at this point in the history
# Conflicts:
#	futu/common/constant.py
  • Loading branch information
dream committed Mar 5, 2019
2 parents f8e9ecb + e0a7698 commit 5547d67
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 22 deletions.
8 changes: 7 additions & 1 deletion futu/common/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -1617,4 +1617,10 @@ def load_dic(self):
self.UN_AGREE_DISCLAIMER: GetGlobalState_pb2.ProgramStatusType_UnAgreeDisclaimer,
self.READY: GetGlobalState_pb2.ProgramStatusType_Ready,
self.FORCE_LOGOUT: GetGlobalState_pb2.ProgramStatusType_ForceLogout
}
}

class ContextStatus:
START = 'START'
CONNECTING = 'CONNECTING'
READY = 'READY'
CLOSED = 'CLOSED'
54 changes: 34 additions & 20 deletions futu/common/open_context_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@
from futu.common.conn_mng import FutuConnMng
from futu.common.network_manager import NetManager
from .err import Err
from .constant import ContextStatus
from .callback_executor import CallbackExecutor, CallbackItem
from .ft_logger import *

_SyncReqRet = namedtuple('_SyncReqRet', ('ret', 'msg'))

class ContextStatus:
Start = 0
Connecting = 1
Ready = 2
Closed = 3

class OpenContextBase(object):
"""Base class for set context"""
Expand All @@ -40,14 +36,15 @@ def __init__(self, host, port, async_enable, is_encrypt=None):
self._net_mgr = NetManager.default()
self._handler_ctx = HandlerContext(self._is_proc_run)
self._lock = RLock()
self._status = ContextStatus.Start
self._status = ContextStatus.START
self._proc_run = True
self._sync_req_ret = None
self._sync_conn_id = 0
self._conn_id = 0
self._keep_alive_interval = 10
self._last_keep_alive_time = datetime.now()
self._reconnect_timer = None
self._sync_query_connect_timeout = None
self._keep_alive_fail_count = 0
self._is_encrypt = is_encrypt
if self.is_encrypt():
Expand All @@ -57,7 +54,7 @@ def __init__(self, host, port, async_enable, is_encrypt=None):

while True:
with self._lock:
if self._status == ContextStatus.Ready:
if self._status == ContextStatus.READY or self._status == ContextStatus.CLOSED:
break
sleep(0.02)

Expand All @@ -72,6 +69,15 @@ def get_login_user_id(self):
def __del__(self):
self._close()

@property
def status(self):
with self._lock:
return self._status

def set_sync_query_connect_timeout(self, timeout):
with self._lock:
self._sync_query_connect_timeout = timeout

@abstractmethod
def close(self):
"""
Expand All @@ -89,9 +95,9 @@ def on_api_socket_reconnected(self):

def _close(self):
with self._lock:
if self._status == ContextStatus.Closed:
if self._status == ContextStatus.CLOSED:
return
self._status = ContextStatus.Closed
self._status = ContextStatus.CLOSED
net_mgr = self._net_mgr
conn_id = self._conn_id
self._conn_id = 0
Expand Down Expand Up @@ -167,12 +173,20 @@ def _get_sync_query_processor(self, pack_func, unpack_func, is_create_socket=Tru

def sync_query_processor(**kargs):
"""sync query processor"""
start_time = datetime.now()
while True:
with self._lock:
if self._status == ContextStatus.Ready:
if self._status == ContextStatus.READY:
net_mgr = self._net_mgr
conn_id = self._conn_id
break
elif self._status == ContextStatus.CLOSED:
return RET_ERROR, Err.ConnectionClosed.text, None

if self._sync_query_connect_timeout is not None:
elapsed_time = datetime.now() - start_time
if elapsed_time.total_seconds() >= self._sync_query_connect_timeout:
return RET_ERROR, Err.Timeout.text, None
sleep(0.01)

try:
Expand All @@ -199,7 +213,7 @@ def _send_async_req(self, req_str):
conn_id = 0
net_mgr = None
with self._lock:
if self._status != ContextStatus.Ready:
if self._status != ContextStatus.READY:
return RET_ERROR, 'Context closed or not ready'
conn_id = self._conn_id
net_mgr = self._net_mgr
Expand All @@ -212,7 +226,7 @@ def _socket_reconnect_and_wait_ready(self):
"""
logger.info("Start connecting: host={}; port={};".format(self.__host, self.__port))
with self._lock:
self._status = ContextStatus.Connecting
self._status = ContextStatus.CONNECTING
# logger.info("try connecting: host={}; port={};".format(self.__host, self.__port))
ret, msg, conn_id = self._net_mgr.connect((self.__host, self.__port), self, 5, self.is_encrypt())
if ret == RET_OK:
Expand All @@ -225,7 +239,7 @@ def _socket_reconnect_and_wait_ready(self):
with self._lock:
if self._sync_req_ret is not None:
if self._sync_req_ret.ret == RET_OK:
self._status = ContextStatus.Ready
self._status = ContextStatus.READY
else:
ret, msg = self._sync_req_ret.ret, self._sync_req_ret.msg
self._sync_req_ret = None
Expand Down Expand Up @@ -319,15 +333,15 @@ def on_connected(self, conn_id):
def on_error(self, conn_id, err):
logger.warning('Connect error: conn_id={0}; err={1};'.format(conn_id, err))
with self._lock:
if self._status != ContextStatus.Connecting:
if self._status != ContextStatus.CONNECTING:
self._wait_reconnect()
else:
self._sync_req_ret = _SyncReqRet(RET_ERROR, str(err))

def on_closed(self, conn_id):
logger.warning('Connect closed: conn_id={0}'.format(conn_id))
with self._lock:
if self._status != ContextStatus.Connecting:
if self._status != ContextStatus.CONNECTING:
self._wait_reconnect()
else:
self._sync_req_ret = _SyncReqRet(RET_ERROR, 'Connection closed')
Expand All @@ -348,7 +362,7 @@ def on_packet(self, conn_id, proto_info, ret_code, msg, rsp_pb):

def on_activate(self, conn_id, now):
with self._lock:
if self._status != ContextStatus.Ready:
if self._status != ContextStatus.READY:
return
time_elapsed = now - self._last_keep_alive_time
if time_elapsed.total_seconds() < self._keep_alive_interval:
Expand All @@ -367,7 +381,7 @@ def on_activate(self, conn_id, now):

def packet_callback(self, proto_id, rsp_pb):
with self._lock:
if self._status != ContextStatus.Ready:
if self._status != ContextStatus.READY:
return

handler_ctx = self._handler_ctx
Expand Down Expand Up @@ -414,14 +428,14 @@ def _wait_reconnect(self):
net_mgr = None
conn_id = 0
with self._lock:
if self._status == ContextStatus.Closed or self._reconnect_timer is not None:
if self._status == ContextStatus.CLOSED or self._reconnect_timer is not None:
return
logger.info('Wait reconnect in {} seconds: host={}; port={};'.format(wait_reconnect_interval,
self.__host,
self.__port))
net_mgr = self._net_mgr
conn_id = self._conn_id
self._status = ContextStatus.Connecting
self._status = ContextStatus.CONNECTING
self._sync_conn_id = 0
self._conn_id = 0
self._keep_alive_fail_count = 0
Expand All @@ -434,7 +448,7 @@ def _reconnect(self):
with self._lock:
self._reconnect_timer.cancel()
self._reconnect_timer = None
if self._status != ContextStatus.Connecting:
if self._status != ContextStatus.CONNECTING:
return

self._socket_reconnect_and_wait_ready()
Expand Down
2 changes: 1 addition & 1 deletion futu/quote/open_quote_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def on_api_socket_reconnected(self):
# 重定阅失败,重连
if ret_code != RET_OK:
logger.error("reconnect subscribe error, close connect and retry!!")
self._status = ContextStatus.Start
self._status = ContextStatus.START
self._wait_reconnect()
return ret_code, ret_msg

Expand Down

0 comments on commit 5547d67

Please sign in to comment.