Skip to content

Commit

Permalink
Add autorestart for Binance data connections (QuantConnect#5827)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanoRaggi authored Aug 5, 2021
1 parent ed835fa commit 1a02fba
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
3 changes: 2 additions & 1 deletion Brokerages/Binance/BinanceBrokerage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public BinanceBrokerage(string apiKey, string apiSecret, IAlgorithm algorithm, I
() => new BinanceWebSocketWrapper(null),
Subscribe,
Unsubscribe,
OnDataMessage);
OnDataMessage,
new TimeSpan(23, 45, 0));

SubscriptionManager = subscriptionManager;

Expand Down
1 change: 1 addition & 0 deletions Brokerages/Bitfinex/BitfinexBrokerage.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public BitfinexBrokerage(IWebSocket websocket, IRestClient restClient, string ap
Subscribe,
Unsubscribe,
OnDataMessage,
TimeSpan.Zero,
_connectionRateLimiter);

_symbolPropertiesDatabase = SymbolPropertiesDatabase.FromDataFolder();
Expand Down
55 changes: 53 additions & 2 deletions Brokerages/BrokerageMultiWebSocketSubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace QuantConnect.Brokerages
/// <summary>
/// Handles brokerage data subscriptions with multiple websocket connections, with optional symbol weighting
/// </summary>
public class BrokerageMultiWebSocketSubscriptionManager : EventBasedDataQueueHandlerSubscriptionManager
public class BrokerageMultiWebSocketSubscriptionManager : EventBasedDataQueueHandlerSubscriptionManager, IDisposable
{
private readonly string _webSocketUrl;
private readonly int _maximumSymbolsPerWebSocket;
Expand All @@ -37,6 +37,7 @@ public class BrokerageMultiWebSocketSubscriptionManager : EventBasedDataQueueHan
private readonly Func<IWebSocket, Symbol, bool> _unsubscribeFunc;
private readonly Action<WebSocketMessage> _messageHandler;
private readonly RateGate _connectionRateLimiter;
private readonly System.Timers.Timer _reconnectTimer;

private const int ConnectionTimeout = 30000;

Expand All @@ -49,12 +50,13 @@ public class BrokerageMultiWebSocketSubscriptionManager : EventBasedDataQueueHan
/// <param name="webSocketUrl">The URL for websocket connections</param>
/// <param name="maximumSymbolsPerWebSocket">The maximum number of symbols per websocket connection</param>
/// <param name="maximumWebSocketConnections">The maximum number of websocket connections allowed (if zero, symbol weighting is disabled)</param>
/// <param name="connectionRateLimiter">The rate limiter for creating new websocket connections</param>
/// <param name="symbolWeights">A dictionary for the symbol weights</param>
/// <param name="webSocketFactory">A function which returns a new websocket instance</param>
/// <param name="subscribeFunc">A function which subscribes a symbol</param>
/// <param name="unsubscribeFunc">A function which unsubscribes a symbol</param>
/// <param name="messageHandler">The websocket message handler</param>
/// <param name="webSocketConnectionDuration">The maximum duration of the websocket connection, TimeSpan.Zero for no duration limit</param>
/// <param name="connectionRateLimiter">The rate limiter for creating new websocket connections</param>
public BrokerageMultiWebSocketSubscriptionManager(
string webSocketUrl,
int maximumSymbolsPerWebSocket,
Expand All @@ -64,6 +66,7 @@ public BrokerageMultiWebSocketSubscriptionManager(
Func<IWebSocket, Symbol, bool> subscribeFunc,
Func<IWebSocket, Symbol, bool> unsubscribeFunc,
Action<WebSocketMessage> messageHandler,
TimeSpan webSocketConnectionDuration,
RateGate connectionRateLimiter = null)
{
_webSocketUrl = webSocketUrl;
Expand All @@ -86,6 +89,40 @@ public BrokerageMultiWebSocketSubscriptionManager(
_webSocketEntries.Add(new BrokerageMultiWebSocketEntry(symbolWeights, webSocket));
}
}

// Some exchanges (e.g. Binance) require a daily restart for websocket connections
if (webSocketConnectionDuration != TimeSpan.Zero)
{
_reconnectTimer = new System.Timers.Timer
{
Interval = webSocketConnectionDuration.TotalMilliseconds
};
_reconnectTimer.Elapsed += (_, _) =>
{
Log.Trace("BrokerageMultiWebSocketSubscriptionManager(): Restarting websocket connections");

lock (_locker)
{
foreach (var entry in _webSocketEntries)
{
if (entry.WebSocket.IsOpen)
{
Task.Factory.StartNew(() =>
{
Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})");
Disconnect(entry.WebSocket);

Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})");
Connect(entry.WebSocket);
});
}
}
}
};
_reconnectTimer.Start();

Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): WebSocket connections will be restarted every: {webSocketConnectionDuration}");
}
}

/// <summary>
Expand Down Expand Up @@ -134,6 +171,15 @@ protected override bool Unsubscribe(IEnumerable<Symbol> symbols, TickType tickTy
return success;
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
_reconnectTimer?.Stop();
_reconnectTimer.DisposeSafely();
}

private BrokerageMultiWebSocketEntry GetWebSocketEntryBySymbol(Symbol symbol)
{
lock (_locker)
Expand Down Expand Up @@ -228,6 +274,11 @@ private void Connect(IWebSocket webSocket)
}
}

private void Disconnect(IWebSocket webSocket)
{
webSocket.Close();
}

private void OnOpen(object sender, EventArgs e)
{
var webSocket = (IWebSocket)sender;
Expand Down

0 comments on commit 1a02fba

Please sign in to comment.