Skip to content

Commit

Permalink
BitfinexBrokerage updates (QuantConnect#5787)
Browse files Browse the repository at this point in the history
* Update BinanceBrokerage to handle more than 512 symbols

* Address review

- fetch symbol weights only if required
- remove code duplication

* Add rate limiting for new connections

* Update WebSocketMessage to include the websocket instance

* Handle resubscriptions on reconnect

* Address review

* Address review

* Remove unnecessary locking

* WebSocketClientWrapper updates

- remove allocation of receive buffer on each message
- add missing lock in Close method
- log message data when message type is Close
- fix race condition after unexpected websocket close

* Set WebSocketClientWrapper task to LongRunning

* Add missing check in GetHistory

* Fix exceptions with Binance downloader

- closes QuantConnect#5794

* Update Bitfinex symbols in symbol properties database

* Update BitfinexBrokerage to use BrokerageMultiWebSocketSubscriptionManager

* Address review

* Remove unnecessary locking

* Remove old channels on resubscription
  • Loading branch information
StefanoRaggi authored Jul 28, 2021
1 parent e823dfd commit 0c4e577
Show file tree
Hide file tree
Showing 13 changed files with 1,231 additions and 920 deletions.
34 changes: 33 additions & 1 deletion Brokerages/Binance/BinanceBrokerage.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public partial class BinanceBrokerage
/// </summary>
protected readonly object TickLocker = new object();

private void OnMessageImpl(WebSocketMessage e)
private void OnUserMessage(WebSocketMessage e)
{
try
{
Expand Down Expand Up @@ -65,7 +65,39 @@ private void OnMessageImpl(WebSocketMessage e)
OnFillOrder(upd);
}
break;
}
}
}
catch (Exception exception)
{
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Error, -1, $"Parsing wss message failed. Data: {e.Message} Exception: {exception}"));
throw;
}
}

private void OnDataMessage(WebSocketMessage e)
{
try
{
var obj = JObject.Parse(e.Message);

var objError = obj["error"];
if (objError != null)
{
var error = objError.ToObject<ErrorMessage>();
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Error, error.Code, error.Message));
return;
}

var objData = obj;

var objEventType = objData["e"];
if (objEventType != null)
{
var eventType = objEventType.ToObject<string>();

switch (eventType)
{
case "trade":
var trade = objData.ToObject<Trade>();
EmitTradeTick(
Expand Down
128 changes: 86 additions & 42 deletions Brokerages/Binance/BinanceBrokerage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
using System.Linq;
using System.Threading;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using QuantConnect.Configuration;
using QuantConnect.Util;
using Timer = System.Timers.Timer;

Expand Down Expand Up @@ -52,6 +54,8 @@ public partial class BinanceBrokerage : BaseWebsocketsBrokerage, IDataQueueHandl
private readonly BinanceRestApiClient _apiClient;
private readonly BrokerageConcurrentMessageHandler<WebSocketMessage> _messageHandler;

private const int MaximumSymbolsPerConnection = 512;

/// <summary>
/// Constructor for brokerage
/// </summary>
Expand All @@ -66,15 +70,20 @@ public BinanceBrokerage(string apiKey, string apiSecret, IAlgorithm algorithm, I
_job = job;
_algorithm = algorithm;
_aggregator = aggregator;
_messageHandler = new BrokerageConcurrentMessageHandler<WebSocketMessage>(OnMessageImpl);
_messageHandler = new BrokerageConcurrentMessageHandler<WebSocketMessage>(OnUserMessage);

var subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager();
subscriptionManager.SubscribeImpl += (s, t) =>
{
Subscribe(s);
return true;
};
subscriptionManager.UnsubscribeImpl += (s, t) => Unsubscribe(s);
var maximumWebSocketConnections = Config.GetInt("binance-maximum-websocket-connections");
var symbolWeights = maximumWebSocketConnections > 0 ? FetchSymbolWeights() : null;

var subscriptionManager = new BrokerageMultiWebSocketSubscriptionManager(
WebSocketBaseUrl,
MaximumSymbolsPerConnection,
maximumWebSocketConnections,
symbolWeights,
() => new BinanceWebSocketWrapper(null),
Subscribe,
Unsubscribe,
OnDataMessage);

SubscriptionManager = subscriptionManager;

Expand Down Expand Up @@ -398,51 +407,55 @@ public override void Dispose()
}

/// <summary>
/// Subscribes to the requested symbols (using an individual streaming channel)
/// Not used
/// </summary>
/// <param name="symbols">The list of symbols to subscribe</param>
public override void Subscribe(IEnumerable<Symbol> symbols)
{
foreach (var symbol in symbols)
{
Send(WebSocket,
new
// NOP
}

/// <summary>
/// Subscribes to the requested symbol (using an individual streaming channel)
/// </summary>
/// <param name="webSocket">The websocket instance</param>
/// <param name="symbol">The symbol to subscribe</param>
private bool Subscribe(IWebSocket webSocket, Symbol symbol)
{
Send(webSocket,
new
{
method = "SUBSCRIBE",
@params = new[]
{
method = "SUBSCRIBE",
@params = new[]
{
$"{symbol.Value.ToLowerInvariant()}@trade",
$"{symbol.Value.ToLowerInvariant()}@bookTicker"
},
id = GetNextRequestId()
}
);
}
$"{symbol.Value.ToLowerInvariant()}@trade",
$"{symbol.Value.ToLowerInvariant()}@bookTicker"
},
id = GetNextRequestId()
}
);

return true;
}

/// <summary>
/// Ends current subscriptions
/// Ends current subscription
/// </summary>
private bool Unsubscribe(IEnumerable<Symbol> symbols)
/// <param name="webSocket">The websocket instance</param>
/// <param name="symbol">The symbol to unsubscribe</param>
private bool Unsubscribe(IWebSocket webSocket, Symbol symbol)
{
if (WebSocket.IsOpen)
{
foreach (var symbol in symbols)
Send(webSocket,
new
{
Send(WebSocket,
new
{
method = "UNSUBSCRIBE",
@params = new[]
{
$"{symbol.Value.ToLowerInvariant()}@trade",
$"{symbol.Value.ToLowerInvariant()}@bookTicker"
},
id = GetNextRequestId()
}
);
method = "UNSUBSCRIBE",
@params = new[]
{
$"{symbol.Value.ToLowerInvariant()}@trade",
$"{symbol.Value.ToLowerInvariant()}@bookTicker"
},
id = GetNextRequestId()
}
}
);

return true;
}
Expand Down Expand Up @@ -485,5 +498,36 @@ private void OnOrderSubmit(BinanceOrderSubmitEventArgs e)
CachedOrderIDs.TryAdd(order.Id, order);
}
}

/// <summary>
/// Returns the weights for each symbol (the weight value is the count of trades in the last 24 hours)
/// </summary>
private static Dictionary<Symbol, int> FetchSymbolWeights()
{
var dict = new Dictionary<Symbol, int>();

try
{
const string url = "https://api.binance.com/api/v3/ticker/24hr";
var json = url.DownloadData();

foreach (var row in JArray.Parse(json))
{
var ticker = row["symbol"].ToObject<string>();
var count = row["count"].ToObject<int>();

var symbol = Symbol.Create(ticker, SecurityType.Crypto, Market.Binance);

dict.Add(symbol, count);
}
}
catch (Exception exception)
{
Log.Error(exception);
throw;
}

return dict;
}
}
}
36 changes: 20 additions & 16 deletions Brokerages/Binance/BinanceRestApiClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
Expand Down Expand Up @@ -317,12 +317,18 @@ public bool CancelOrder(Order order)
var klines = JsonConvert.DeserializeObject<object[][]>(response.Content)
.Select(entries => new Messages.Kline(entries))
.ToList();
if (klines.Count > 0)
{
startMs = klines.Last().OpenTime + resolutionInMs;

startMs = klines.Last().OpenTime + resolutionInMs;

foreach (var kline in klines)
foreach (var kline in klines)
{
yield return kline;
}
}
else
{
yield return kline;
startMs += resolutionInMs;
}
}
}
Expand Down Expand Up @@ -355,19 +361,17 @@ public bool SessionKeepAlive()
/// </summary>
public void StopSession()
{
if (string.IsNullOrEmpty(SessionId))
if (!string.IsNullOrEmpty(SessionId))
{
throw new Exception("BinanceBrokerage:UserStream. listenKey wasn't allocated or has been refused.");
var request = new RestRequest(UserDataStreamEndpoint, Method.DELETE);
request.AddHeader(KeyHeader, ApiKey);
request.AddParameter(
"application/x-www-form-urlencoded",
Encoding.UTF8.GetBytes($"listenKey={SessionId}"),
ParameterType.RequestBody
);
ExecuteRestRequest(request);
}

var request = new RestRequest(UserDataStreamEndpoint, Method.DELETE);
request.AddHeader(KeyHeader, ApiKey);
request.AddParameter(
"application/x-www-form-urlencoded",
Encoding.UTF8.GetBytes($"listenKey={SessionId}"),
ParameterType.RequestBody
);
ExecuteRestRequest(request);
}

/// <summary>
Expand Down
Loading

0 comments on commit 0c4e577

Please sign in to comment.