-
Notifications
You must be signed in to change notification settings - Fork 680
/
Copy pathbaseApi.py
155 lines (137 loc) · 6.32 KB
/
baseApi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'mayanqiong'
import asyncio
import functools
import sys
import time
from asyncio import Future
from typing import Optional, Coroutine
class TqBaseApi(object):
"""
天勤基类,处理 EventLoop 相关的调用
"""
def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
"""
创建天勤接口实例
Args:
loop(asyncio.AbstractEventLoop): [可选] 使用指定的 IOLoop, 默认创建一个新的.
"""
self._loop = asyncio.SelectorEventLoop() if loop is None else loop # 创建一个新的 ioloop, 避免和其他框架/环境产生干扰
self._event_rev, self._check_rev = 0, 0
self._wait_idle_list = [] # 所有等待 loop idle 的 Future
self._wait_timeout = False # wait_update 是否触发超时
self._tasks = set() # 由api维护的所有根task,不包含子task,子task由其父task维护
self._exceptions = [] # 由api维护的所有task抛出的例外
# 回测需要行情和交易 lockstep, 而 asyncio 没有将内部的 _ready 队列暴露出来,
# 因此 monkey patch call_soon 函数用来判断是否有任务等待执行
self._loop.call_soon = functools.partial(self._call_soon, self._loop.call_soon)
# Windows系统下asyncio不支持KeyboardInterrupt的临时补丁
if sys.platform.startswith("win"):
self._create_task(self._windows_patch())
def _create_task(self, coro: Coroutine, _caller_api: bool = False) -> asyncio.Task:
task = self._loop.create_task(coro)
py_ver = sys.version_info
current_task = asyncio.Task.current_task(loop=self._loop) if (py_ver.major == 3 and py_ver.minor < 7) else asyncio.current_task(loop=self._loop)
if current_task is None or _caller_api: # 由 api 创建的 task,需要 api 主动管理
self._tasks.add(task)
task.add_done_callback(self._on_task_done)
return task
async def _cancel_tasks(self, *tasks):
# 目前的 task 退出流程无法处理在 finally 中被 cancel 的情况,
# 例如: twap _run 中调用 _insert_order 并且刚好时间段结束执行到 finally 时整个 api 被 close 触发 CancelError
if len(tasks) == 0:
return
task = tasks[0]
other_tasks = tasks[1:]
try:
await self._cancel_task(task)
finally:
if tasks:
await self._cancel_tasks(*other_tasks)
async def _cancel_task(self, task):
exception = None
task.cancel()
# 如果 task 已经 done,可以调用 cancel 但是 cancelled 不会变成 True
while not task.done():
try:
await asyncio.shield(task) # task 不会再被 cancel
except asyncio.CancelledError as ex:
if not task.cancelled():
exception = ex
await asyncio.sleep(0) # Give callbacks a chance to run
if exception:
raise exception from None
def _call_soon(self, org_call_soon, callback, *args, **kargs):
"""ioloop.call_soon的补丁, 用来追踪是否有任务完成并等待执行"""
self._event_rev += 1
return org_call_soon(callback, *args, **kargs)
def _run_once(self):
"""执行 ioloop 直到 ioloop.stop 被调用"""
if not self._exceptions:
self._loop.run_forever()
if self._exceptions:
raise self._exceptions.pop(0)
def _run_until_idle(self, async_run=False):
"""执行 ioloop 直到没有待执行任务
async_run is True 会从 _wait_idle_list 中取出等待的异步任务,保证同步代码优先于异步代码执行,
只有被 _run_until_task_done 调用(即 api 等待 fetch_msg)时,async_run 会为 True
"""
while self._check_rev != self._event_rev:
check_handle = self._loop.call_soon(self._check_event, self._event_rev + 1)
try:
self._run_once()
finally:
check_handle.cancel()
if len(self._wait_idle_list) > 0 and async_run:
f = self._wait_idle_list.pop(0) # 取出 list 中的第一个 Future
f.set_result(None) # f 返回
async def _wait_until_idle(self):
"""等待 ioloop 执行到空闲时,才从网络连接处收数据包,在 TqConnect 类中使用"""
f = Future()
self._wait_idle_list.append(f)
self._loop.stop()
await f
def _run_until_task_done(self, task: asyncio.Task, deadline=None):
try:
self._wait_timeout = False
if deadline is not None:
deadline_handle = self._loop.call_later(max(0, deadline - time.time()), self._set_wait_timeout)
while not self._wait_timeout and not task.done():
if len(self._wait_idle_list) == 0:
self._run_once()
else:
self._run_until_idle(async_run=True)
finally:
if deadline is not None:
deadline_handle.cancel()
task.cancel()
def _check_event(self, rev):
self._check_rev = rev
self._loop.stop()
def _set_wait_timeout(self):
self._wait_timeout = True
self._loop.stop()
def _on_task_done(self, task):
"""当由 api 维护的 task 执行完成后取出运行中遇到的例外并停止 ioloop"""
try:
exception = task.exception()
if exception:
self._exceptions.append(exception)
except asyncio.CancelledError:
pass
finally:
self._tasks.remove(task)
self._loop.stop()
async def _windows_patch(self):
"""Windows系统下asyncio不支持KeyboardInterrupt的临时补丁, 详见 https://bugs.python.org/issue23057"""
while True:
await asyncio.sleep(1)
def _close(self) -> None:
self._run_until_idle(async_run=False) # 由于有的处于 ready 状态 task 可能需要报撤单, 因此一直运行到没有 ready 状态的 task
for task in self._tasks:
task.cancel()
while self._tasks: # 等待 task 执行完成
self._run_once()
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
self._loop.close()