From 4bcfddd26d01cd6a4886c06b79c7ea26cec17252 Mon Sep 17 00:00:00 2001 From: 51bitquant Date: Sun, 3 Jul 2022 23:31:03 +0800 Subject: [PATCH] add tv --- howtrader/app/cta_strategy/engine.py | 2 +- howtrader/rpc/__init__.py | 2 - howtrader/rpc/client.py | 169 --------------------------- howtrader/rpc/common.py | 10 -- howtrader/rpc/server.py | 139 ---------------------- howtrader/trader/event.py | 4 +- howtrader/trader/setting.py | 2 + howtrader/trader/ui/qt.py | 2 +- main_window.py | 44 ++++++- requirements.txt | 1 + setup.py | 3 +- 11 files changed, 47 insertions(+), 331 deletions(-) delete mode 100644 howtrader/rpc/__init__.py delete mode 100644 howtrader/rpc/client.py delete mode 100644 howtrader/rpc/common.py delete mode 100644 howtrader/rpc/server.py diff --git a/howtrader/app/cta_strategy/engine.py b/howtrader/app/cta_strategy/engine.py index 5c544cb..cef8b1f 100644 --- a/howtrader/app/cta_strategy/engine.py +++ b/howtrader/app/cta_strategy/engine.py @@ -823,8 +823,8 @@ def sync_strategy_data(self, strategy: CtaTemplate) -> None: data.pop("trading") self.sync_strategy_data_lock.acquire() self.strategy_data[strategy.strategy_name] = data - self.sync_strategy_data_lock.release() save_json(self.data_filename, self.strategy_data) + self.sync_strategy_data_lock.release() def get_all_strategy_class_names(self) -> list: """ diff --git a/howtrader/rpc/__init__.py b/howtrader/rpc/__init__.py deleted file mode 100644 index bc5511d..0000000 --- a/howtrader/rpc/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .client import RpcClient -from .server import RpcServer \ No newline at end of file diff --git a/howtrader/rpc/client.py b/howtrader/rpc/client.py deleted file mode 100644 index cdd18bc..0000000 --- a/howtrader/rpc/client.py +++ /dev/null @@ -1,169 +0,0 @@ -import threading -from datetime import datetime -from functools import lru_cache -from typing import Any - -import zmq -from zmq.constants import NOBLOCK - -from .common import HEARTBEAT_TOPIC, HEARTBEAT_TOLERANCE - - -class RemoteException(Exception): - """ - RPC remote exception - """ - - def __init__(self, value: Any) -> None: - """ - Constructor - """ - self._value = value - - def __str__(self) -> str: - """ - Output error message - """ - return self._value - - -class RpcClient: - """""" - - def __init__(self) -> None: - """Constructor""" - # zmq port related - self._context: zmq.Context = zmq.Context() - - # Request socket (Request–reply pattern) - self._socket_req: zmq.Socket = self._context.socket(zmq.REQ) - - # Subscribe socket (Publish–subscribe pattern) - self._socket_sub: zmq.Socket = self._context.socket(zmq.SUB) - - # Worker thread relate, used to process data pushed from server - self._active: bool = False # RpcClient status - self._thread: threading.Thread = None # RpcClient thread - self._lock: threading.Lock = threading.Lock() - - self._last_received_ping: datetime = datetime.utcnow() - - @lru_cache(100) - def __getattr__(self, name: str) -> Any: - """ - Realize remote call function - """ - - # Perform remote call task - def dorpc(*args, **kwargs): - # Get timeout value from kwargs, default value is 30 seconds - if "timeout" in kwargs: - timeout = kwargs.pop("timeout") - else: - timeout = 30000 - - # Generate request - req: list = [name, args, kwargs] - - # Send request and wait for response - with self._lock: - self._socket_req.send_pyobj(req) - - # Timeout reached without any data - n: int = self._socket_req.poll(timeout) - if not n: - msg: str = f"Timeout of {timeout}ms reached for {req}" - raise RemoteException(msg) - - rep = self._socket_req.recv_pyobj() - - # Return response if successed; Trigger exception if failed - if rep[0]: - return rep[1] - else: - raise RemoteException(rep[1]) - - return dorpc - - def start( - self, - req_address: str, - sub_address: str - ) -> None: - """ - Start RpcClient - """ - if self._active: - return - - # Connect zmq port - self._socket_req.connect(req_address) - self._socket_sub.connect(sub_address) - - # Start RpcClient status - self._active = True - - # Start RpcClient thread - self._thread = threading.Thread(target=self.run) - self._thread.start() - - self._last_received_ping = datetime.utcnow() - - def stop(self) -> None: - """ - Stop RpcClient - """ - if not self._active: - return - - # Stop RpcClient status - self._active = False - - def join(self) -> None: - # Wait for RpcClient thread to exit - if self._thread and self._thread.is_alive(): - self._thread.join() - self._thread = None - - def run(self) -> None: - """ - Run RpcClient function - """ - pull_tolerance: int = HEARTBEAT_TOLERANCE * 1000 - - while self._active: - if not self._socket_sub.poll(pull_tolerance): - self.on_disconnected() - continue - - # Receive data from subscribe socket - topic, data = self._socket_sub.recv_pyobj(flags=NOBLOCK) - - if topic == HEARTBEAT_TOPIC: - self._last_received_ping = data - else: - # Process data by callable function - self.callback(topic, data) - - # Close socket - self._socket_req.close() - self._socket_sub.close() - - def callback(self, topic: str, data: Any) -> None: - """ - Callable function - """ - raise NotImplementedError - - def subscribe_topic(self, topic: str) -> None: - """ - Subscribe data - """ - self._socket_sub.setsockopt_string(zmq.SUBSCRIBE, topic) - - def on_disconnected(self): - """ - Callback when heartbeat is lost. - """ - msg: str = f"RpcServer has no response over {HEARTBEAT_TOLERANCE} seconds, please check you connection." - print(msg) \ No newline at end of file diff --git a/howtrader/rpc/common.py b/howtrader/rpc/common.py deleted file mode 100644 index 0bcc78e..0000000 --- a/howtrader/rpc/common.py +++ /dev/null @@ -1,10 +0,0 @@ -import signal - - -# Achieve Ctrl-c interrupt recv -signal.signal(signal.SIGINT, signal.SIG_DFL) - - -HEARTBEAT_TOPIC = "heartbeat" -HEARTBEAT_INTERVAL = 10 -HEARTBEAT_TOLERANCE = 30 \ No newline at end of file diff --git a/howtrader/rpc/server.py b/howtrader/rpc/server.py deleted file mode 100644 index 79800f7..0000000 --- a/howtrader/rpc/server.py +++ /dev/null @@ -1,139 +0,0 @@ -import threading -import traceback -from time import time -from typing import Any, Callable, Dict - -import zmq - -from .common import HEARTBEAT_TOPIC, HEARTBEAT_INTERVAL - - -class RpcServer: - """""" - - def __init__(self) -> None: - """ - Constructor - """ - # Save functions dict: key is function name, value is function object - self._functions: Dict[str, Callable] = {} - - # Zmq port related - self._context: zmq.Context = zmq.Context() - - # Reply socket (Request–reply pattern) - self._socket_rep: zmq.Socket = self._context.socket(zmq.REP) - - # Publish socket (Publish–subscribe pattern) - self._socket_pub: zmq.Socket = self._context.socket(zmq.PUB) - - # Worker thread related - self._active: bool = False # RpcServer status - self._thread: threading.Thread = None # RpcServer thread - self._lock: threading.Lock = threading.Lock() - - # Heartbeat related - self._heartbeat_at: int = None - - def is_active(self) -> bool: - """""" - return self._active - - def start( - self, - rep_address: str, - pub_address: str, - ) -> None: - """ - Start RpcServer - """ - if self._active: - return - - # Bind socket address - self._socket_rep.bind(rep_address) - self._socket_pub.bind(pub_address) - - # Start RpcServer status - self._active = True - - # Start RpcServer thread - self._thread = threading.Thread(target=self.run) - self._thread.start() - - # Init heartbeat publish timestamp - self._heartbeat_at = time() + HEARTBEAT_INTERVAL - - def stop(self) -> None: - """ - Stop RpcServer - """ - if not self._active: - return - - # Stop RpcServer status - self._active = False - - def join(self) -> None: - # Wait for RpcServer thread to exit - if self._thread and self._thread.is_alive(): - self._thread.join() - self._thread = None - - def run(self) -> None: - """ - Run RpcServer functions - """ - while self._active: - # Poll response socket for 1 second - n: int = self._socket_rep.poll(1000) - self.check_heartbeat() - - if not n: - continue - - # Receive request data from Reply socket - req = self._socket_rep.recv_pyobj() - - # Get function name and parameters - name, args, kwargs = req - - # Try to get and execute callable function object; capture exception information if it fails - try: - func: Callable = self._functions[name] - r: Any = func(*args, **kwargs) - rep: list = [True, r] - except Exception as e: # noqa - rep: list = [False, traceback.format_exc()] - - # send callable response by Reply socket - self._socket_rep.send_pyobj(rep) - - # Unbind socket address - self._socket_pub.unbind(self._socket_pub.LAST_ENDPOINT) - self._socket_rep.unbind(self._socket_rep.LAST_ENDPOINT) - - def publish(self, topic: str, data: Any) -> None: - """ - Publish data - """ - with self._lock: - self._socket_pub.send_pyobj([topic, data]) - - def register(self, func: Callable) -> None: - """ - Register function - """ - self._functions[func.__name__] = func - - def check_heartbeat(self) -> None: - """ - Check whether it is required to send heartbeat. - """ - now: float = time() - if now >= self._heartbeat_at: - # Publish heartbeat - self.publish(HEARTBEAT_TOPIC, now) - - # Update timestamp of next publish - self._heartbeat_at = now + HEARTBEAT_INTERVAL \ No newline at end of file diff --git a/howtrader/trader/event.py b/howtrader/trader/event.py index e8b58fc..79ab705 100644 --- a/howtrader/trader/event.py +++ b/howtrader/trader/event.py @@ -12,4 +12,6 @@ EVENT_QUOTE = "eQuote." EVENT_CONTRACT = "eContract." EVENT_LOG = "eLog" -EVENT_TRADINGVIEW = 'eTradingView' +EVENT_TV_SIGNAL = 'eTVSignal' +EVENT_TV_LOG = 'eTVLog' +EVENT_TV_STRATEGY = "eTVStrategy" diff --git a/howtrader/trader/setting.py b/howtrader/trader/setting.py index 586990d..02309f1 100644 --- a/howtrader/trader/setting.py +++ b/howtrader/trader/setting.py @@ -26,6 +26,8 @@ "email.receiver": "", "update_interval": 600, + "passphrase": "howtrader", # tv passphrase + "port": 9999, # tv server port "datafeed.name": "", "datafeed.username": "", diff --git a/howtrader/trader/ui/qt.py b/howtrader/trader/ui/qt.py index b8f2282..2aad669 100644 --- a/howtrader/trader/ui/qt.py +++ b/howtrader/trader/ui/qt.py @@ -101,7 +101,7 @@ def init_ui(self) -> None: def show_exception(self, msg: str) -> None: """""" self.msg_edit.setText(msg) - self.show() + # self.show() def _copy_text(self) -> None: """""" diff --git a/main_window.py b/main_window.py index b906e9a..ead4c1f 100644 --- a/main_window.py +++ b/main_window.py @@ -1,9 +1,9 @@ -from howtrader.event import EventEngine - +from howtrader.event import EventEngine, Event +from howtrader.trader.event import EVENT_TV_SIGNAL from howtrader.trader.engine import MainEngine from howtrader.trader.ui import MainWindow, create_qapp - +from howtrader.trader.setting import SETTINGS from howtrader.gateway.binance import BinanceUsdtGateway, BinanceSpotGateway, BinanceInverseGateway from howtrader.app.cta_strategy import CtaStrategyApp @@ -12,19 +12,47 @@ from howtrader.app.algo_trading import AlgoTradingApp from howtrader.app.risk_manager import RiskManagerApp from howtrader.app.spread_trading import SpreadTradingApp +from howtrader.app.tradingview import TradingViewApp +from threading import Thread +import json +from flask import Flask, request + +# create global event_engine +event_engine: EventEngine = EventEngine() +passphrase = SETTINGS.get("passphrase", "") +port = SETTINGS.get("port", 9999) + +app = Flask(__name__) + +@app.route('/', methods=['GET']) +def welcome(): + return "Hi, this is tv server!" + +@app.route('/webhook', methods=['POST']) +def webhook(): + try: + data = json.loads(request.data) + if data.get('passphrase', None) != passphrase: + return {"status": "failure", "msg": "passphrase is incorrect"} + event:Event = Event(type=EVENT_TV_SIGNAL, data=data) + event_engine.put(event) + return {"status": "success", "msg": ""} + except Exception as error: + return {"status": "error", "msg": str(error)} + +def start_tv_server(): + app.run(host="127.0.0.1", port=port) def main(): """""" qapp = create_qapp() - - event_engine = EventEngine() - main_engine = MainEngine(event_engine) main_engine.add_gateway(BinanceSpotGateway) main_engine.add_gateway(BinanceUsdtGateway) main_engine.add_gateway(BinanceInverseGateway) main_engine.add_app(CtaStrategyApp) + main_engine.add_app(TradingViewApp) main_engine.add_app(DataManagerApp) main_engine.add_app(AlgoTradingApp) main_engine.add_app(DataRecorderApp) @@ -34,6 +62,10 @@ def main(): main_window = MainWindow(main_engine, event_engine) main_window.showMaximized() + t1 = Thread(target=start_tv_server) + t1.daemon = True + t1.start() + qapp.exec() diff --git a/requirements.txt b/requirements.txt index a81ba8f..436251f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ simplejson requests aiohttp +flask pytz peewee pymongo diff --git a/setup.py b/setup.py index 220cd7b..a2e4a92 100644 --- a/setup.py +++ b/setup.py @@ -24,9 +24,8 @@ def get_install_requires(): install_requires = [ - "six", "simplejson", - "wheel", + "flask", "PySide6", "pyqtgraph", "qdarkstyle",