Skip to content

Commit

Permalink
[Add] RpcClient: Added send lock.
Browse files Browse the repository at this point in the history
[Add] RcpClient: Added Heartbeat package
  • Loading branch information
nanoric committed Aug 20, 2019
1 parent 8a68548 commit a0f1bc4
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 21 deletions.
4 changes: 3 additions & 1 deletion vnpy/app/rpc_service/engine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""""""

import traceback
from typing import Optional

from vnpy.event import Event, EventEngine
from vnpy.rpc import RpcServer
Expand All @@ -24,7 +25,7 @@ def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
self.rep_address = "tcp://*:2014"
self.pub_address = "tcp://*:4102"

self.server = None
self.server: Optional[RpcServer] = None

self.init_server()
self.load_setting()
Expand Down Expand Up @@ -96,6 +97,7 @@ def stop(self):
return False

self.server.stop()
self.server.join()
self.write_log("RPC服务已停止")
return True

Expand Down
1 change: 1 addition & 0 deletions vnpy/gateway/rpc/rpc_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def query_all(self):
def close(self):
""""""
self.client.stop()
self.client.join()

def client_callback(self, topic: str, event: Event):
""""""
Expand Down
84 changes: 64 additions & 20 deletions vnpy/rpc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import signal
import threading
import traceback
import signal
import zmq
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Any, Callable

import zmq

_ = lambda x: x

# Achieve Ctrl-c interrupt recv
from zmq.backend.cython.constants import NOBLOCK

signal.signal(signal.SIGINT, signal.SIG_DFL)

KEEP_ALIVE_TOPIC = '_keep_alive'
KEEP_ALIVE_INTERVAL = timedelta(seconds=1)
KEEP_ALIVE_TOLERANCE = timedelta(seconds=3)


class RemoteException(Exception):
"""
Expand Down Expand Up @@ -50,6 +60,8 @@ def __init__(self):
self.__active = False # RpcServer status
self.__thread = None # RpcServer thread

self._register(KEEP_ALIVE_TOPIC, lambda n: n)

def is_active(self):
""""""
return self.__active
Expand Down Expand Up @@ -82,21 +94,29 @@ def stop(self):
# Stop RpcServer status
self.__active = False

# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)
self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT)

def join(self):
# Wait for RpcServer thread to exit
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None

# Unbind socket address
self.__socket_pub.unbind(self.__socket_pub.LAST_ENDPOINT)
self.__socket_rep.unbind(self.__socket_rep.LAST_ENDPOINT)

def run(self):
"""
Run RpcServer functions
"""
start = datetime.utcnow()
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
cur = datetime.utcnow()
delta = cur - start

if delta >= KEEP_ALIVE_INTERVAL:
self.publish(KEEP_ALIVE_TOPIC, cur)

if not self.__socket_rep.poll(1000):
continue

Expand Down Expand Up @@ -127,7 +147,13 @@ def register(self, func: Callable):
"""
Register function
"""
self.__functions[func.__name__] = func
return self._register(func.__name__, func)

def _register(self, name: str, func: Callable):
"""
Register function
"""
self.__functions[name] = func


class RpcClient:
Expand All @@ -145,21 +171,27 @@ def __init__(self):
self.__socket_sub = self.__context.socket(zmq.SUB)

# Worker thread relate, used to process data pushed from server
self.__active = False # RpcClient status
self.__thread = None # RpcClient thread
self.__active = False # RpcClient status
self.__thread = None # RpcClient thread
self.__lock = threading.Lock()

self._last_received_ping: datetime = datetime.utcnow()

@lru_cache(100)
def __getattr__(self, name: str):
"""
Realize remote call function
"""

# Perform remote call task
def dorpc(*args, **kwargs):
# Generate request
req = [name, args, kwargs]

# Send request and wait for response
self.__socket_req.send_pyobj(req)
rep = self.__socket_req.recv_pyobj()
with self.__lock:
self.__socket_req.send_pyobj(req)
rep = self.__socket_req.recv_pyobj()

# Return response if successed; Trigger exception if failed
if rep[0]:
Expand Down Expand Up @@ -187,6 +219,8 @@ def start(self, req_address: str, sub_address: str):
self.__thread = threading.Thread(target=self.run)
self.__thread.start()

self._last_received_ping = datetime.utcnow()

def stop(self):
"""
Stop RpcClient
Expand All @@ -197,29 +231,39 @@ def stop(self):
# Stop RpcClient status
self.__active = False

# Close socket
self.__socket_req.close()
self.__socket_sub.close()

def join(self):
# Wait for RpcClient thread to exit
if self.__thread.isAlive():
self.__thread.join()
self.__thread = None

# Close socket
self.__socket_req.close()
self.__socket_sub.close()

def run(self):
"""
Run RpcClient function
"""
pull_tolerance = int(KEEP_ALIVE_TOLERANCE.total_seconds() * 1000)
while self.__active:
# Use poll to wait event arrival, waiting time is 1 second (1000 milliseconds)
if not self.__socket_sub.poll(1000):
if not self.__socket_sub.poll(pull_tolerance):
self._on_unexpected_disconnected()
continue

# Receive data from subscribe socket
topic, data = self.__socket_sub.recv_pyobj()
topic, data = self.__socket_sub.recv_pyobj(flags=NOBLOCK)

if topic == KEEP_ALIVE_TOPIC:
self._last_received_ping = data
else:
# Process data by callable function
self.callback(topic, data)

# Process data by callable function
self.callback(topic, data)
@staticmethod
def _on_unexpected_disconnected():
print(_("RpcServer has no response over {tolerance} seconds, please check you connection."
.format(tolerance=KEEP_ALIVE_TOLERANCE.total_seconds())))

def callback(self, topic: str, data: Any):
"""
Expand Down

0 comments on commit a0f1bc4

Please sign in to comment.