22
22
using System . Collections . Generic ;
23
23
using System . Globalization ;
24
24
using System . Linq ;
25
+ using System . Net ;
25
26
using System . Threading . Tasks ;
26
27
using System . Threading ;
27
28
using RestSharp ;
28
29
using System . Text . RegularExpressions ;
30
+ using QuantConnect . Configuration ;
29
31
using QuantConnect . Logging ;
30
32
using QuantConnect . Orders . Fees ;
31
33
using QuantConnect . Securities ;
@@ -46,8 +48,6 @@ public partial class GDAXBrokerage
46
48
private const string SymbolMatching = "ETH|LTC|BTC|BCH|XRP|EOS|XLM|ETC|ZRX" ;
47
49
private readonly IAlgorithm _algorithm ;
48
50
private readonly CancellationTokenSource _canceller = new CancellationTokenSource ( ) ;
49
- private readonly ConcurrentQueue < WebSocketMessage > _messageBuffer = new ConcurrentQueue < WebSocketMessage > ( ) ;
50
- private volatile bool _streamLocked ;
51
51
private readonly ConcurrentDictionary < Symbol , DefaultOrderBook > _orderBooks = new ConcurrentDictionary < Symbol , DefaultOrderBook > ( ) ;
52
52
private readonly bool _isDataQueueHandler ;
53
53
protected readonly IDataAggregator _aggregator ;
@@ -60,12 +60,19 @@ internal enum GdaxEndpointType { Public, Private }
60
60
61
61
private readonly IPriceProvider _priceProvider ;
62
62
63
+ private readonly CancellationTokenSource _ctsFillMonitor = new CancellationTokenSource ( ) ;
64
+ private readonly Task _fillMonitorTask ;
65
+ private readonly AutoResetEvent _fillMonitorResetEvent = new AutoResetEvent ( false ) ;
66
+ private readonly int _fillMonitorTimeout = Config . GetInt ( "gdax-fill-monitor-timeout" , 500 ) ;
67
+ private readonly ConcurrentDictionary < string , Order > _pendingOrders = new ConcurrentDictionary < string , Order > ( ) ;
68
+ private long _lastEmittedFillTradeId ;
69
+
63
70
#endregion
64
71
65
72
/// <summary>
66
73
/// The list of websocket channels to subscribe
67
74
/// </summary>
68
- protected virtual string [ ] ChannelNames { get ; } = { "heartbeat" , "user" } ;
75
+ protected virtual string [ ] ChannelNames { get ; } = { "heartbeat" } ;
69
76
70
77
/// <summary>
71
78
/// Constructor for brokerage
@@ -89,50 +96,9 @@ public GDAXBrokerage(string wssUrl, IWebSocket websocket, IRestClient restClient
89
96
_priceProvider = priceProvider ;
90
97
_aggregator = aggregator ;
91
98
92
- WebSocket . Open += ( sender , args ) =>
93
- {
94
- var tickers = new [ ]
95
- {
96
- "LTCUSD" , "LTCEUR" , "LTCBTC" ,
97
- "BTCUSD" , "BTCEUR" , "BTCGBP" ,
98
- "ETHBTC" , "ETHUSD" , "ETHEUR" ,
99
- "BCHBTC" , "BCHUSD" , "BCHEUR" ,
100
- "XRPUSD" , "XRPEUR" , "XRPBTC" ,
101
- "EOSUSD" , "EOSEUR" , "EOSBTC" ,
102
- "XLMUSD" , "XLMEUR" , "XLMBTC" ,
103
- "ETCUSD" , "ETCEUR" , "ETCBTC" ,
104
- "ZRXUSD" , "ZRXEUR" , "ZRXBTC" ,
105
- } ;
106
- Subscribe ( tickers . Select ( ticker => Symbol . Create ( ticker , SecurityType . Crypto , Market . GDAX ) ) ) ;
107
- } ;
108
-
109
99
_isDataQueueHandler = this is GDAXDataQueueHandler ;
110
- }
111
-
112
- /// <summary>
113
- /// Lock the streaming processing while we're sending orders as sometimes they fill before the REST call returns.
114
- /// </summary>
115
- public void LockStream ( )
116
- {
117
- Log . Trace ( "GDAXBrokerage.Messaging.LockStream(): Locking Stream" ) ;
118
- _streamLocked = true ;
119
- }
120
100
121
- /// <summary>
122
- /// Unlock stream and process all backed up messages.
123
- /// </summary>
124
- public void UnlockStream ( )
125
- {
126
- Log . Trace ( "GDAXBrokerage.Messaging.UnlockStream(): Processing Backlog..." ) ;
127
- while ( _messageBuffer . Any ( ) )
128
- {
129
- WebSocketMessage e ;
130
- _messageBuffer . TryDequeue ( out e ) ;
131
- OnMessageImpl ( this , e ) ;
132
- }
133
- Log . Trace ( "GDAXBrokerage.Messaging.UnlockStream(): Stream Unlocked." ) ;
134
- // Once dequeued in order; unlock stream.
135
- _streamLocked = false ;
101
+ _fillMonitorTask = Task . Factory . StartNew ( FillMonitorAction , _ctsFillMonitor . Token ) ;
136
102
}
137
103
138
104
/// <summary>
@@ -141,31 +107,6 @@ public void UnlockStream()
141
107
/// <param name="sender"></param>
142
108
/// <param name="e"></param>
143
109
public override void OnMessage ( object sender , WebSocketMessage e )
144
- {
145
- // Verify if we're allowed to handle the streaming packet yet; while we're placing an order we delay the
146
- // stream processing a touch.
147
- try
148
- {
149
- if ( _streamLocked )
150
- {
151
- _messageBuffer . Enqueue ( e ) ;
152
- return ;
153
- }
154
- }
155
- catch ( Exception err )
156
- {
157
- Log . Error ( err ) ;
158
- }
159
-
160
- OnMessageImpl ( sender , e ) ;
161
- }
162
-
163
- /// <summary>
164
- /// Implementation of the OnMessage event
165
- /// </summary>
166
- /// <param name="sender"></param>
167
- /// <param name="e"></param>
168
- private void OnMessageImpl ( object sender , WebSocketMessage e )
169
110
{
170
111
try
171
112
{
@@ -327,90 +268,55 @@ private void OnMatch(string data)
327
268
// deserialize the current match (trade) message
328
269
var message = JsonConvert . DeserializeObject < Messages . Matched > ( data , JsonSettings ) ;
329
270
330
- if ( string . IsNullOrEmpty ( message . UserId ) )
331
- {
332
- // message received from the "matches" channel
333
- if ( _isDataQueueHandler )
334
- {
335
- EmitTradeTick ( message ) ;
336
- }
337
- return ;
338
- }
339
-
340
- // message received from the "user" channel, this trade is ours
341
-
342
- // check the list of currently active orders, if the current trade is ours we are either a maker or a taker
343
- var currentOrder = CachedOrderIDs
344
- . FirstOrDefault ( o => o . Value . BrokerId . Contains ( message . MakerOrderId ) || o . Value . BrokerId . Contains ( message . TakerOrderId ) ) ;
345
-
346
- if ( currentOrder . Value == null )
271
+ // message received from the "matches" channel
272
+ if ( _isDataQueueHandler )
347
273
{
348
- // should never happen, log just in case
349
- Log . Error ( $ "GDAXBrokerage.OrderMatch(): Unexpected match: { message . ProductId } { data } ") ;
350
- return ;
274
+ EmitTradeTick ( message ) ;
351
275
}
276
+ }
352
277
353
- Log . Trace ( $ "GDAXBrokerage.OrderMatch(): Match: { message . ProductId } { data } " ) ;
354
-
355
- var order = currentOrder . Value ;
278
+ private void EmitFillOrderEvent ( Messages . Fill fill , Order order )
279
+ {
280
+ var symbol = ConvertProductId ( fill . ProductId ) ;
356
281
357
282
if ( ! FillSplit . ContainsKey ( order . Id ) )
358
283
{
359
284
FillSplit [ order . Id ] = new GDAXFill ( order ) ;
360
285
}
361
286
362
287
var split = FillSplit [ order . Id ] ;
363
- split . Add ( message ) ;
364
-
365
- var symbol = ConvertProductId ( message . ProductId ) ;
288
+ split . Add ( fill ) ;
366
289
367
290
// is this the total order at once? Is this the last split fill?
368
- var isFinalFill = Math . Abs ( message . Size ) == Math . Abs ( order . Quantity ) || Math . Abs ( split . OrderQuantity ) == Math . Abs ( split . TotalQuantity ) ;
369
-
370
- EmitFillOrderEvent ( message , symbol , split , isFinalFill ) ;
371
- }
372
-
373
- private void EmitFillOrderEvent ( Messages . Matched message , Symbol symbol , GDAXFill split , bool isFinalFill )
374
- {
375
- var order = split . Order ;
291
+ var isFinalFill = Math . Abs ( fill . Size ) == Math . Abs ( order . Quantity ) || Math . Abs ( split . OrderQuantity ) == Math . Abs ( split . TotalQuantity ) ;
376
292
377
293
var status = isFinalFill ? OrderStatus . Filled : OrderStatus . PartiallyFilled ;
378
294
379
- OrderDirection direction ;
380
- // Messages are always from the perspective of the market maker. Flip direction if executed as a taker.
381
- if ( order . BrokerId [ 0 ] == message . TakerOrderId )
382
- {
383
- direction = message . Side == "sell" ? OrderDirection . Buy : OrderDirection . Sell ;
384
- }
385
- else
386
- {
387
- direction = message . Side == "sell" ? OrderDirection . Sell : OrderDirection . Buy ;
388
- }
295
+ var direction = fill . Side == "sell" ? OrderDirection . Sell : OrderDirection . Buy ;
389
296
390
- var fillPrice = message . Price ;
391
- var fillQuantity = direction == OrderDirection . Sell ? - message . Size : message . Size ;
392
- var isMaker = order . BrokerId [ 0 ] == message . MakerOrderId ;
297
+ var fillPrice = fill . Price ;
298
+ var fillQuantity = direction == OrderDirection . Sell ? - fill . Size : fill . Size ;
393
299
394
300
var currency = order . PriceCurrency == string . Empty
395
301
? _algorithm . Securities [ symbol ] . SymbolProperties . QuoteCurrency
396
302
: order . PriceCurrency ;
397
303
398
- var orderFee = new OrderFee ( new CashAmount (
399
- GetFillFee ( _algorithm . UtcTime , fillPrice , fillQuantity , isMaker ) ,
400
- currency ) ) ;
304
+ var orderFee = new OrderFee ( new CashAmount ( fill . Fee , currency ) ) ;
401
305
402
306
var orderEvent = new OrderEvent
403
307
(
404
- order . Id , symbol , message . Time , status ,
308
+ order . Id , symbol , fill . CreatedAt , status ,
405
309
direction , fillPrice , fillQuantity ,
406
- orderFee , $ "GDAX Match Event { direction } "
310
+ orderFee , $ "GDAX Fill Event { direction } "
407
311
) ;
408
312
409
313
// when the order is completely filled, we no longer need it in the active order list
410
314
if ( orderEvent . Status == OrderStatus . Filled )
411
315
{
412
316
Order outOrder ;
413
317
CachedOrderIDs . TryRemove ( order . Id , out outOrder ) ;
318
+
319
+ _pendingOrders . TryRemove ( fill . OrderId , out outOrder ) ;
414
320
}
415
321
416
322
OnOrderEvent ( orderEvent ) ;
@@ -551,7 +457,8 @@ public void PollTick(Symbol symbol)
551
457
{
552
458
Value = rate ,
553
459
Time = DateTime . UtcNow ,
554
- Symbol = symbol
460
+ Symbol = symbol ,
461
+ TickType = TickType . Quote
555
462
} ;
556
463
_aggregator . Update ( latest ) ;
557
464
@@ -598,14 +505,58 @@ public void Unsubscribe(IEnumerable<Symbol> symbols)
598
505
}
599
506
}
600
507
601
- /// <summary>
602
- /// Returns the fee paid for a total or partial order fill
603
- /// </summary>
604
- public static decimal GetFillFee ( DateTime utcTime , decimal fillPrice , decimal fillQuantity , bool isMaker )
508
+ private void FillMonitorAction ( )
605
509
{
606
- var feePercentage = GDAXFeeModel . GetFeePercentage ( utcTime , isMaker ) ;
510
+ Log . Trace ( "GDAXBrokerage.FillMonitorAction(): task started" ) ;
511
+
512
+ try
513
+ {
514
+ foreach ( var order in GetOpenOrders ( ) )
515
+ {
516
+ _pendingOrders . TryAdd ( order . BrokerId . First ( ) , order ) ;
517
+ }
518
+
519
+ while ( ! _ctsFillMonitor . IsCancellationRequested )
520
+ {
521
+ _fillMonitorResetEvent . WaitOne ( TimeSpan . FromMilliseconds ( _fillMonitorTimeout ) , _ctsFillMonitor . Token ) ;
522
+
523
+ foreach ( var kvp in _pendingOrders )
524
+ {
525
+ var orderId = kvp . Key ;
526
+ var order = kvp . Value ;
527
+
528
+ var request = new RestRequest ( $ "/fills?order_id={ orderId } ", Method . GET ) ;
529
+ GetAuthenticationToken ( request ) ;
530
+
531
+ var response = ExecuteRestRequest ( request , GdaxEndpointType . Private ) ;
532
+
533
+ if ( response . StatusCode != HttpStatusCode . OK )
534
+ {
535
+ throw new Exception ( $ "GDAXBrokerage.FillMonitorAction(): request failed: [{ ( int ) response . StatusCode } ] { response . StatusDescription } , Content: { response . Content } , ErrorMessage: { response . ErrorMessage } ") ;
536
+ }
537
+
538
+ var fills = JsonConvert . DeserializeObject < List < Messages . Fill > > ( response . Content ) ;
539
+ foreach ( var fill in fills . OrderBy ( x => x . TradeId ) )
540
+ {
541
+ if ( fill . TradeId <= _lastEmittedFillTradeId )
542
+ {
543
+ continue ;
544
+ }
545
+
546
+ EmitFillOrderEvent ( fill , order ) ;
547
+
548
+ _lastEmittedFillTradeId = fill . TradeId ;
549
+ }
550
+
551
+ }
552
+ }
553
+ }
554
+ catch ( Exception exception )
555
+ {
556
+ OnMessage ( new BrokerageMessageEvent ( BrokerageMessageType . Error , - 1 , exception . Message ) ) ;
557
+ }
607
558
608
- return fillPrice * Math . Abs ( fillQuantity ) * feePercentage ;
559
+ Log . Trace ( "GDAXBrokerage.FillMonitorAction(): task ended" ) ;
609
560
}
610
561
}
611
562
}
0 commit comments