-
Notifications
You must be signed in to change notification settings - Fork 680
/
Copy pathzq_otg.py
76 lines (66 loc) · 3.26 KB
/
zq_otg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'chenli'
import asyncio
import json
import os
import tempfile
import sys
import subprocess
from pathlib import Path
from asyncio.subprocess import DEVNULL, PIPE
from tqsdk.exceptions import TqContextManagerError
class ZqOtgContext(object):
def __init__(self, api):
acc_types = ", ".join([type(acc).__name__ for acc in api._account._account_list if acc._account_auth.get("feature") == "tq_direct"])
try:
from tqsdk_zq_otg import __version__ as otg_version
from tqsdk import __version__ as tqsdk_version
from tqsdk_zq_otg import get_zq_otg_path
except ImportError:
raise Exception(f"使用 {acc_types} 账户需要安装 tqsdk_zq_otg 包: pip install -U tqsdk_zq_otg") from None
if otg_version != tqsdk_version:
raise Exception(f"使用 {acc_types} 账户需要更新 tqsdk_zq_otg 包到最新版本: pip install -U tqsdk_zq_otg")
self._zq_otg_path = get_zq_otg_path()
self._zq_otg_exe = str(Path(self._zq_otg_path) / "otg_adapter")
self._zq_otg_env = os.environ.copy()
self._zq_otg_env["LD_LIBRARY_PATH"] = str(self._zq_otg_path)
self._zq_otg_proc = None
async def __aenter__(self):
self._zq_otg_data_dir = tempfile.TemporaryDirectory()
self._zq_otg_data_path = Path(self._zq_otg_data_dir.name)
return self
async def get_url(self, url_info):
"""无法启动时抛出 TqContextManagerError 例外"""
# port_file 是创建在 log_file_path 下的
port_file = self._zq_otg_data_path / "port.json"
parameters = json.dumps({
"log_file_path": str(self._zq_otg_data_path),
"user_file_path": str(self._zq_otg_data_path),
"host": "127.0.0.1",
"port": 0,
})
if self._zq_otg_proc is not None and sys.platform.startswith("win"):
# subprocess.Popen 需要调用 poll 才会更新 returncode
self._zq_otg_proc.poll()
if self._zq_otg_proc is None or self._zq_otg_proc.returncode is not None:
if sys.platform.startswith("win"):
self._zq_otg_proc = subprocess.Popen([self._zq_otg_exe, f"--config={parameters}", "--mode=cmd"], stdin=PIPE, stdout=DEVNULL, stderr=DEVNULL, env=self._zq_otg_env)
else:
self._zq_otg_proc = await asyncio.create_subprocess_exec(self._zq_otg_exe, f"--config={parameters}", "--mode=cmd", stdin=PIPE, stdout=DEVNULL, stderr=DEVNULL, env=self._zq_otg_env)
for i in range(30):
if port_file.exists():
with open(port_file, 'r') as file:
port = json.load(file)["port"]
if port != 0:
return url_info._replace(scheme="ws", netloc=f"127.0.0.1:{port}").geturl()
await asyncio.sleep(1)
raise TqContextManagerError("获取交易服务地址失败")
async def __aexit__(self, exc_type, exc, tb):
if self._zq_otg_proc is not None:
self._zq_otg_proc.stdin.close()
if sys.platform.startswith("win"):
self._zq_otg_proc.wait()
else:
await self._zq_otg_proc.wait()
self._zq_otg_data_dir.cleanup()