Skip to content

Commit b91f1d1

Browse files
committed
添加易盛历史行情模块
1 parent 5049fc4 commit b91f1d1

26 files changed

+972
-5
lines changed

QuantBox.XAPI/Enum.cs

+2
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public enum ApiType : byte
9292
MarketData = 2,
9393
Level2 = 4,
9494
QuoteRequest = 8,
95+
HistoricalData = 16,
96+
Instrument = 32,
9597
};
9698

9799
public enum DataLevelType : byte

QuantBox.XAPI/Struct.cs

+4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public struct ServerInfoField
4646
/// </summary>
4747
public int TopicId;
4848
/// <summary>
49+
/// 端口号
50+
/// </summary>
51+
public int Port;
52+
/// <summary>
4953
/// 流恢复
5054
/// </summary>
5155
public ResumeType MarketDataTopicResumeType;

QuantBox_CTP_Trade/main.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ void* __stdcall XRequest(char type, void* pApi1, void* pApi2, double double1, do
1414
switch (rt)
1515
{
1616
case GetApiType:
17-
return (void*)(ApiType::Trade);
17+
return (void*)(ApiType::Trade | ApiType::Instrument);
1818
case GetApiVersion:
1919
return "0.2";
2020
case GetApiName:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
#include "stdafx.h"
2+
#include "HistoricalDataApi.h"
3+
4+
#include "../include/QueueEnum.h"
5+
#include "../include/QueueHeader.h"
6+
7+
#include "../include/ApiHeader.h"
8+
#include "../include/ApiStruct.h"
9+
10+
#include "../include/toolkit.h"
11+
12+
#include <cstring>
13+
#include <assert.h>
14+
15+
CHistoricalDataApi::CHistoricalDataApi(void)
16+
{
17+
m_pApi = nullptr;
18+
m_msgQueue = nullptr;
19+
m_lRequestID = 0;
20+
21+
m_hThread = nullptr;
22+
m_bRunning = false;
23+
}
24+
25+
26+
CHistoricalDataApi::~CHistoricalDataApi(void)
27+
{
28+
Disconnect();
29+
}
30+
31+
void CHistoricalDataApi::StartThread()
32+
{
33+
if (nullptr == m_hThread)
34+
{
35+
m_bRunning = true;
36+
m_hThread = new thread(ProcessThread, this);
37+
}
38+
}
39+
40+
void CHistoricalDataApi::StopThread()
41+
{
42+
m_bRunning = false;
43+
if (m_hThread)
44+
{
45+
m_hThread->join();
46+
delete m_hThread;
47+
m_hThread = nullptr;
48+
}
49+
}
50+
51+
void CHistoricalDataApi::Register(void* pMsgQueue)
52+
{
53+
m_msgQueue = pMsgQueue;
54+
}
55+
56+
void CHistoricalDataApi::Connect(const string& szPath,
57+
ServerInfoField* pServerInfo,
58+
UserInfoField* pUserInfo)
59+
{
60+
m_szPath = szPath;
61+
memcpy(&m_ServerInfo, pServerInfo, sizeof(ServerInfoField));
62+
memcpy(&m_UserInfo, pUserInfo, sizeof(UserInfoField));
63+
64+
m_pApi = CreateEsunnyQuotClient(this);
65+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Initialized, 0, nullptr, 0, nullptr, 0, nullptr, 0);
66+
67+
if (m_pApi)
68+
{
69+
// 停止已有线程,并清理
70+
StopThread();
71+
ReleaseRequestListBuf();
72+
ReleaseRequestMapBuf();
73+
74+
SRequest* pRequest = MakeRequestBuf(E_Init);
75+
if (pRequest)
76+
{
77+
AddToSendQueue(pRequest);
78+
}
79+
}
80+
}
81+
82+
void CHistoricalDataApi::Disconnect()
83+
{
84+
// 如果队列中有请求包,在后面又进行了Release,又回过头来发送,可能导致当了
85+
StopThread();
86+
87+
if (m_pApi)
88+
{
89+
m_pApi->DisConnect();
90+
DelEsunnyQuotClient(m_pApi);
91+
m_pApi = nullptr;
92+
93+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Disconnected, 0, nullptr, 0, nullptr, 0, nullptr, 0);
94+
}
95+
96+
m_lRequestID = 0;//由于线程已经停止,没有必要用原子操作了
97+
98+
ReleaseRequestListBuf();
99+
ReleaseRequestMapBuf();
100+
}
101+
102+
CHistoricalDataApi::SRequest* CHistoricalDataApi::MakeRequestBuf(RequestType type)
103+
{
104+
SRequest *pRequest = new SRequest;
105+
if (nullptr == pRequest)
106+
return nullptr;
107+
108+
memset(pRequest, 0, sizeof(SRequest));
109+
pRequest->type = type;
110+
switch (type)
111+
{
112+
case E_Init:
113+
pRequest->pBuf = nullptr;
114+
break;
115+
default:
116+
assert(false);
117+
break;
118+
}
119+
return pRequest;
120+
}
121+
122+
void CHistoricalDataApi::ReleaseRequestListBuf()
123+
{
124+
lock_guard<mutex> cl(m_csList);
125+
while (!m_reqList.empty())
126+
{
127+
SRequest * pRequest = m_reqList.front();
128+
delete pRequest;
129+
m_reqList.pop_front();
130+
}
131+
}
132+
133+
void CHistoricalDataApi::ReleaseRequestMapBuf()
134+
{
135+
lock_guard<mutex> cl(m_csMap);
136+
for (map<int, SRequest*>::iterator it = m_reqMap.begin(); it != m_reqMap.end(); ++it)
137+
{
138+
delete (*it).second;
139+
}
140+
m_reqMap.clear();
141+
}
142+
143+
void CHistoricalDataApi::ReleaseRequestMapBuf(int nRequestID)
144+
{
145+
lock_guard<mutex> cl(m_csMap);
146+
map<int, SRequest*>::iterator it = m_reqMap.find(nRequestID);
147+
if (it != m_reqMap.end())
148+
{
149+
delete it->second;
150+
m_reqMap.erase(nRequestID);
151+
}
152+
}
153+
154+
void CHistoricalDataApi::AddRequestMapBuf(int nRequestID, SRequest* pRequest)
155+
{
156+
if (nullptr == pRequest)
157+
return;
158+
159+
lock_guard<mutex> cl(m_csMap);
160+
map<int, SRequest*>::iterator it = m_reqMap.find(nRequestID);
161+
if (it != m_reqMap.end())
162+
{
163+
SRequest* p = it->second;
164+
if (pRequest != p)//如果实际上指的是同一内存,不再插入
165+
{
166+
delete p;
167+
m_reqMap[nRequestID] = pRequest;
168+
}
169+
}
170+
}
171+
172+
void CHistoricalDataApi::AddToSendQueue(SRequest * pRequest)
173+
{
174+
if (nullptr == pRequest)
175+
return;
176+
177+
lock_guard<mutex> cl(m_csList);
178+
bool bFind = false;
179+
//目前不去除相同类型的请求,即没有对大量同类型请求进行优化
180+
//for (list<SRequest*>::iterator it = m_reqList.begin();it!= m_reqList.end();++it)
181+
//{
182+
// if (pRequest->type == (*it)->type)
183+
// {
184+
// bFind = true;
185+
// break;
186+
// }
187+
//}
188+
189+
if (!bFind)
190+
m_reqList.push_back(pRequest);
191+
192+
if (!m_reqList.empty())
193+
{
194+
StartThread();
195+
}
196+
}
197+
198+
199+
200+
void CHistoricalDataApi::RunInThread()
201+
{
202+
int iRet = 0;
203+
204+
while (!m_reqList.empty() && m_bRunning)
205+
{
206+
SRequest * pRequest = m_reqList.front();
207+
int lRequest = ++m_lRequestID;// 这个地方是否会出现原子操作的问题呢?
208+
switch (pRequest->type)
209+
{
210+
case E_Init:
211+
iRet = ReqInit();
212+
if (iRet != 0 && m_bRunning)
213+
this_thread::sleep_for(chrono::milliseconds(1000 * 20));
214+
break;
215+
default:
216+
assert(false);
217+
break;
218+
}
219+
220+
if (0 == iRet)
221+
{
222+
//返回成功,填加到已发送池
223+
m_nSleep = 1;
224+
AddRequestMapBuf(lRequest, pRequest);
225+
226+
lock_guard<mutex> cl(m_csList);
227+
m_reqList.pop_front();
228+
}
229+
else
230+
{
231+
//失败,按4的幂进行延时,但不超过1s
232+
m_nSleep *= 4;
233+
m_nSleep %= 1023;
234+
}
235+
this_thread::sleep_for(chrono::milliseconds(m_nSleep));
236+
}
237+
238+
// 清理线程
239+
m_hThread = nullptr;
240+
m_bRunning = false;
241+
}
242+
243+
int CHistoricalDataApi::ReqInit()
244+
{
245+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Connecting, 0, nullptr, 0, nullptr, 0, nullptr, 0);
246+
//初始化连接
247+
int iRet = m_pApi->Connect(m_ServerInfo.Address, m_ServerInfo.Port);
248+
if (0 == iRet)
249+
{
250+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Connected, 0, nullptr, 0, nullptr, 0, nullptr, 0);
251+
iRet = m_pApi->Login(m_UserInfo.UserID, m_UserInfo.Password);
252+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Logining, 0, nullptr, 0, nullptr, 0, nullptr, 0);
253+
}
254+
else
255+
{
256+
RspUserLoginField field = { 0 };
257+
field.ErrorID = iRet;
258+
strcpy(field.ErrorMsg, "连接超时");
259+
260+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Disconnected, 0, &field, sizeof(RspUserLoginField), nullptr, 0, nullptr, 0);
261+
262+
return iRet;
263+
}
264+
return iRet;
265+
}
266+
267+
int __cdecl CHistoricalDataApi::OnRspLogin(int err, const char *errtext)
268+
{
269+
RspUserLoginField field = { 0 };
270+
field.ErrorID = err;
271+
strncpy(field.ErrorMsg, errtext, sizeof(ErrorMsgType));
272+
273+
if (err == 0)
274+
{
275+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Logined, 0, &field, sizeof(RspUserLoginField), nullptr, 0, nullptr, 0);
276+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Done, 0, nullptr, 0, nullptr, 0, nullptr, 0);
277+
}
278+
else
279+
{
280+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Disconnected, 0, &field, sizeof(RspUserLoginField), nullptr, 0, nullptr, 0);
281+
}
282+
283+
return 0;
284+
}
285+
286+
int __cdecl CHistoricalDataApi::OnChannelLost(int err, const char *errtext)
287+
{
288+
RspUserLoginField field = { 0 };
289+
field.ErrorID = err;
290+
strncpy(field.ErrorMsg, errtext, sizeof(ErrorMsgType));
291+
292+
XRespone(ResponeType::OnConnectionStatus, m_msgQueue, this, ConnectionStatus::Disconnected, 0, &field, sizeof(RspUserLoginField), nullptr, 0, nullptr, 0);
293+
294+
return 0;
295+
}
296+
int __cdecl CHistoricalDataApi::OnStkQuot(struct STKDATA *pData)
297+
{
298+
return 0;
299+
}
300+
301+
int __cdecl CHistoricalDataApi::OnRspHistoryQuot(struct STKHISDATA *pHisData)
302+
{
303+
return 0;
304+
}
305+
306+
int __cdecl CHistoricalDataApi::OnRspTraceData(struct STKTRACEDATA *pTraceData)
307+
{
308+
for (size_t i = 0; i < pTraceData->nCount; i++)
309+
{
310+
STOCKTRACEDATA item = pTraceData->TraceData[i];
311+
312+
DepthMarketDataField marketData = { 0 };
313+
314+
}
315+
return 0;
316+
}
317+
318+
int __cdecl CHistoricalDataApi::OnRspMarketInfo(struct MarketInfo *pMarketInfo, int bLast)
319+
{
320+
for (size_t i = 0; i < pMarketInfo->stocknum; i++)
321+
{
322+
StockInfo item = pMarketInfo->stockdata[i];
323+
324+
InstrumentField field = { 0 };
325+
326+
strcpy(field.InstrumentID, item.szCode);
327+
strcpy(field.ExchangeID, pMarketInfo->Market);
328+
329+
strcpy(field.Symbol, item.szCode);
330+
331+
strcpy(field.InstrumentName, item.szName);
332+
field.Type = InstrumentType::Future;
333+
334+
XRespone(ResponeType::OnRspQryInstrument, m_msgQueue, this, i >= pMarketInfo->stocknum -1, 0, &field, sizeof(InstrumentField), nullptr, 0, nullptr, 0);
335+
}
336+
337+
return 0;
338+
}

0 commit comments

Comments
 (0)