Skip to content

Commit

Permalink
- add : 添加了shutdown 机制
Browse files Browse the repository at this point in the history
  • Loading branch information
lamter committed Aug 19, 2016
1 parent 5f5a938 commit 30f8948
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 2 deletions.
90 changes: 90 additions & 0 deletions easyquant/main_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import time
from collections import OrderedDict
import dill
import threading
from threading import Thread, Lock
import signal

import easytrader
from logbook import Logger, StreamHandler
Expand Down Expand Up @@ -56,6 +58,12 @@ def __init__(self, broker=None, need_data=None, quotation_engines=None,

if type(quotation_engines) != list:
quotation_engines = [quotation_engines]
else:
types = [quo.EventType for quo in quotation_engines]
if len(types) != set(types):
types.sort()
types = ','.join([str(t) for t in types])
raise ValueError("行情引擎 EventType 重复:" + types)
self.quotation_engines = []
for quotation_engine in quotation_engines:
self.quotation_engines.append(quotation_engine(self.event_engine, self.clock_engine))
Expand All @@ -78,17 +86,37 @@ def __init__(self, broker=None, need_data=None, quotation_engines=None,
# 加载线程
self._watch_thread = Thread(target=self._load_strategy, name="MainEngine.watch_reload_strategy")

# shutdown 函数
self.before_shutdown = [] # 关闭引擎前的 shutdown
self.main_shutdown = [] # 引擎自身要执行的 shutdown
self.after_shutdown = [] # 关闭引擎后的 shutdown
self.shutdown_signals = [
signal.SIGQUIT, # quit 信号
signal.SIGINT, # 键盘信号
signal.SIGHUP, # nohup 命令
signal.SIGTERM, # kill 命令
]

for s in self.shutdown_signals:
# 捕获退出信号后的要调用的,唯一的 shutdown 接口
signal.signal(s, self._shutdown)

self.log.info('启动主引擎')

def start(self):
"""启动主引擎"""
self.event_engine.start()
self._add_main_shutdown(self.event_engine.stop)

if self.broker == 'gf':
self.log.warn("sleep 10s 等待 gf 账户加载")
time.sleep(10)
for quotation_engine in self.quotation_engines:
quotation_engine.start()
self._add_main_shutdown(quotation_engine.stop)

self.clock_engine.start()
self._add_main_shutdown(self.clock_engine.stop)

def load(self, names, strategy_file):
with self.lock:
Expand Down Expand Up @@ -185,3 +213,65 @@ def get_strategy(self, name):
if strategy.name == name:
return strategy
return None

def get_quotation(self, eventype):
"""
:param name:
:return:
"""
for quo in self.quotation_engines:
if quo.EventType == eventype:
return quo
else:
return None

def add_before_shutdown(self, shutdown):

if not hasattr(shutdown, "__call__"):
n = shutdown.__name__ if hasattr(shutdown, "__name__") else str(shutdown)
raise ValueError("%s 不是可调用对象 " % n)

self.before_shutdown.append(shutdown)

def add_after_shutdown(self, shutdown):
if not hasattr(shutdown, "__call__"):
n = shutdown.__name__ if hasattr(shutdown, "__name__") else str(shutdown)
raise ValueError("%s 不是可调用对象 " % n)

self.after_shutdown.append(shutdown)

def _add_main_shutdown(self, shutdown):
if not hasattr(shutdown, "__call__"):
n = shutdown.__name__ if hasattr(shutdown, "__name__") else str(shutdown)
raise ValueError("%s 不是可调用对象 " % n)

self.main_shutdown.append(shutdown)

def _shutdown(self, sig, frame):
"""
关闭进程前的处理
:return:
"""
# 所有 shutdown 前的触发点
for st in self.before_shutdown:
st()

# 引擎自身的 shutdown
for st in self.main_shutdown:
st()

# 等待所有线程关闭, 直到只留下主线程
c = threading.active_count()
while threading.active_count() == c:
time.sleep(2)

# 调用策略的 shutdown
for s in self.strategy_list:
s.shutdown()

# 所有 shutdown 后的触发点
for st in self.after_shutdown:
st()

# 退出
sys.exit(1)
16 changes: 14 additions & 2 deletions easyquant/push_engine/base_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def push_quotation(self):
try:
response_data = self.fetch_quotation()
except aiohttp.errors.ServerDisconnectedError:
time.sleep(self.PushInterval)
self.wait()
continue
event = Event(event_type=self.EventType, data=response_data)
self.event_engine.put(event)
time.sleep(self.PushInterval)
self.wait()

def fetch_quotation(self):
# return your quotation
Expand All @@ -49,3 +49,15 @@ def fetch_quotation(self):
def init(self):
# do something init
pass

def wait(self):
interval = self.PushInterval
if interval < 1:
time.sleep(interval)
return
else:
time.sleep(self.PushInterval - int(interval))
interval = int(interval)
while interval > 0 and self.is_active:
time.sleep(1)
interval -= 1
7 changes: 7 additions & 0 deletions easyquant/strategy/strategyTemplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,10 @@ def log_handler(self):
:return: log_handler or None
"""
return None

def shutdown(self):
"""
关闭进程前调用该函数
:return:
"""
pass
8 changes: 8 additions & 0 deletions strategies/策略1_Demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from easyquant import DefaultLogHandler
from easyquant import StrategyTemplate


class Strategy(StrategyTemplate):
name = '测试策略1'

Expand Down Expand Up @@ -83,3 +84,10 @@ def clock(self, event):
def log_handler(self):
"""自定义 log 记录方式"""
return DefaultLogHandler(self.name, log_type='file', filepath='demo1.log')

def shutdown(self):
"""
关闭进程前的调用
:return:
"""
self.log.info("假装在关闭前保存了策略数据")

0 comments on commit 30f8948

Please sign in to comment.