Skip to content

Commit

Permalink
Websocket-refactor for websocket message data as text and binary (Qua…
Browse files Browse the repository at this point in the history
…ntConnect#5833)

* websocket-refactor for websocket message  as text and binary

Initial commit

websocket support for text and binary messageData

code style fixes

test fix

* delete un-used file

* zerodha websocket textMessage parse

* messageData class abscration

* Empty

Co-authored-by: Martin-Molinero <[email protected]>
  • Loading branch information
rjra2611 and Martin-Molinero authored Aug 10, 2021
1 parent 32fdb51 commit 7881aeb
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 375 deletions.
8 changes: 6 additions & 2 deletions Brokerages/Binance/BinanceBrokerage.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ public partial class BinanceBrokerage
/// </summary>
protected readonly object TickLocker = new object();

private void OnUserMessage(WebSocketMessage e)
private void OnUserMessage(WebSocketMessage webSocketMessage)
{
var e = (WebSocketClientWrapper.TextMessage)webSocketMessage.Data;

try
{
var obj = JObject.Parse(e.Message);
Expand Down Expand Up @@ -75,8 +77,10 @@ private void OnUserMessage(WebSocketMessage e)
}
}

private void OnDataMessage(WebSocketMessage e)
private void OnDataMessage(WebSocketMessage webSocketMessage)
{
var e = (WebSocketClientWrapper.TextMessage)webSocketMessage.Data;

try
{
var obj = JObject.Parse(e.Message);
Expand Down
5 changes: 3 additions & 2 deletions Brokerages/Bitfinex/BitfinexBrokerage.DataQueueHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ private bool UnsubscribeChannel(IWebSocket webSocket, BitfinexWebSocketChannels
return true;
}

private void OnDataMessage(WebSocketMessage e)
private void OnDataMessage(WebSocketMessage webSocketMessage)
{
var webSocket = (BitfinexWebSocketWrapper)e.WebSocket;
var webSocket = (BitfinexWebSocketWrapper)webSocketMessage.WebSocket;
var e = (WebSocketClientWrapper.TextMessage)webSocketMessage.Data;

try
{
Expand Down
4 changes: 3 additions & 1 deletion Brokerages/Bitfinex/BitfinexBrokerage.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ private long GetNextClientOrderId()
/// Implementation of the OnMessage event
/// </summary>
/// <param name="e"></param>
private void OnMessageImpl(WebSocketMessage e)
private void OnMessageImpl(WebSocketMessage webSocketMessage)
{
var e = (WebSocketClientWrapper.TextMessage)webSocketMessage.Data;

try
{
var token = JToken.Parse(e.Message);
Expand Down
4 changes: 3 additions & 1 deletion Brokerages/GDAX/GDAXBrokerage.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ public GDAXBrokerage(string wssUrl, IWebSocket websocket, IRestClient restClient
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
public override void OnMessage(object sender, WebSocketMessage e)
public override void OnMessage(object sender, WebSocketMessage webSocketMessage)
{
var e = (WebSocketClientWrapper.TextMessage)webSocketMessage.Data;

try
{
var raw = JsonConvert.DeserializeObject<Messages.BaseMessage>(e.Message, JsonSettings);
Expand Down
5 changes: 3 additions & 2 deletions Brokerages/Tradier/TradierBrokerage.DataQueueHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
Expand Down Expand Up @@ -190,8 +190,9 @@ private void SendSubscribeMessage(List<string> tickers)
_webSocketClient.Send(json);
}

private void OnMessage(object sender, WebSocketMessage e)
private void OnMessage(object sender, WebSocketMessage webSocketMessage)
{
var e = (WebSocketClientWrapper.TextMessage)webSocketMessage.Data;
var obj = JObject.Parse(e.Message);
JToken error;
if (obj.TryGetValue("error", out error))
Expand Down
81 changes: 72 additions & 9 deletions Brokerages/WebSocketClientWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,12 @@ private void HandleConnection()
{
var messageData = ReceiveMessage(_client, connectionCts.Token, receiveBuffer);

if (messageData.MessageType == WebSocketMessageType.Close)
if (messageData == null)
{
Log.Trace($"WebSocketClientWrapper.HandleConnection({_url}): WebSocketMessageType.Close - Data: {messageData.Data}");
break;
}

OnMessage(new WebSocketMessage(this, messageData.Data));
OnMessage(new WebSocketMessage(this, messageData));
}
}
catch (OperationCanceledException) { }
Expand Down Expand Up @@ -269,18 +268,82 @@ private MessageData ReceiveMessage(
}
while (!result.EndOfMessage);

return new MessageData
if (result.MessageType == WebSocketMessageType.Binary)
{
Data = Encoding.UTF8.GetString(ms.GetBuffer(), 0 , (int)ms.Length),
MessageType = result.MessageType
};
return new BinaryMessage
{
Data = ms.ToArray(),
Count = result.Count,
};
}
else if (result.MessageType == WebSocketMessageType.Text)
{
return new TextMessage
{
Message = Encoding.UTF8.GetString(ms.GetBuffer(), 0 , (int)ms.Length),
};
}
else if (result.MessageType == WebSocketMessageType.Close)
{
Log.Trace($"WebSocketClientWrapper.HandleConnection({_url}): WebSocketMessageType.Close - Data: {Encoding.UTF8.GetString(ms.GetBuffer(), 0 , (int)ms.Length)}");
return null;
}
}
return null;
}

private class MessageData
/// <summary>
/// Defines a message of websocket data
/// </summary>
public abstract class MessageData
{
public string Data { get; set; }
/// <summary>
/// Type of message
/// </summary>
public WebSocketMessageType MessageType { get; set; }
}

/// <summary>
/// Defines a text-Type message of websocket data
/// </summary>
public class TextMessage : MessageData
{
/// <summary>
/// Data contained in message
/// </summary>
public string Message { get; set; }

/// <summary>
/// Constructs default instance of the TextMessage
/// </summary>
public TextMessage()
{
MessageType = WebSocketMessageType.Text;
}
}

/// <summary>
/// Defines a byte-Type message of websocket data
/// </summary>
public class BinaryMessage : MessageData
{
/// <summary>
/// Data contained in message
/// </summary>
public byte[] Data { get; set; }

/// <summary>
/// Count of message
/// </summary>
public int Count { get; set; }

/// <summary>
/// Constructs default instance of the BinaryMessage
/// </summary>
public BinaryMessage()
{
MessageType = WebSocketMessageType.Binary;
}
}
}
}
8 changes: 4 additions & 4 deletions Brokerages/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ public class WebSocketMessage
/// <summary>
/// Gets the raw message data as text
/// </summary>
public string Message { get; }
public WebSocketClientWrapper.MessageData Data { get; }

/// <summary>
/// Initializes a new instance of the <see cref="WebSocketMessage"/> class
/// </summary>
/// <param name="webSocket">The sender websocket instance</param>
/// <param name="message">The message</param>
public WebSocketMessage(IWebSocket webSocket, string message)
/// <param name="data">The message data</param>
public WebSocketMessage(IWebSocket webSocket, WebSocketClientWrapper.MessageData data)
{
WebSocket = webSocket;
Message = message;
Data = data;
}
}
}
44 changes: 25 additions & 19 deletions Brokerages/Zerodha/ZerodhaBrokerage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public partial class ZerodhaBrokerage : Brokerage, IDataQueueHandler
/// <summary>
/// The websockets client instance
/// </summary>
protected ZerodhaWebSocketClientWrapper WebSocket;
protected WebSocketClientWrapper WebSocket;

/// <summary>
/// standard json parsing settings
Expand Down Expand Up @@ -96,7 +96,7 @@ public partial class ZerodhaBrokerage : Brokerage, IDataQueueHandler
private readonly string _wssUrl = "wss://ws.kite.trade/";

private readonly string _tradingSegment;
private readonly BrokerageConcurrentMessageHandler<MessageData> _messageHandler;
private readonly BrokerageConcurrentMessageHandler<WebSocketClientWrapper.MessageData> _messageHandler;
private readonly string _zerodhaProductType;

private DateTime _lastTradeTickTime;
Expand Down Expand Up @@ -127,8 +127,8 @@ public ZerodhaBrokerage(string tradingSegment, string zerodhaProductType, string
_apiKey = apiKey;
_accessToken = apiSecret;
_securityProvider = securityProvider;
_messageHandler = new BrokerageConcurrentMessageHandler<MessageData>(OnMessageImpl);
WebSocket = new ZerodhaWebSocketClientWrapper();
_messageHandler = new BrokerageConcurrentMessageHandler<WebSocketClientWrapper.MessageData>(OnMessageImpl);
WebSocket = new WebSocketClientWrapper();
_wssUrl += string.Format(CultureInfo.InvariantCulture, "?api_key={0}&access_token={1}", _apiKey, _accessToken);
WebSocket.Initialize(_wssUrl);
WebSocket.Message += OnMessage;
Expand Down Expand Up @@ -1001,21 +1001,22 @@ private void OnError(object sender, WebSocketError e)
Log.Error($"ZerodhaBrokerage.OnError(): Message: {e.Message} Exception: {e.Exception}");
}

private void OnMessage(object sender, MessageData e)
private void OnMessage(object sender, WebSocketMessage webSocketMessage)
{
_messageHandler.HandleNewMessage(e);
_messageHandler.HandleNewMessage(webSocketMessage.Data);
}

/// <summary>
/// Implementation of the OnMessage event
/// </summary>
/// <param name="e"></param>
private void OnMessageImpl(MessageData e)
private void OnMessageImpl(WebSocketClientWrapper.MessageData message)
{
try
{
if (e.MessageType == WebSocketMessageType.Binary)
if (message.MessageType == WebSocketMessageType.Binary)
{
var e = (WebSocketClientWrapper.BinaryMessage)message;
if (e.Count > 1)
{
int offset = 0;
Expand Down Expand Up @@ -1058,30 +1059,35 @@ private void OnMessageImpl(MessageData e)
}
}
}
else if (e.MessageType == WebSocketMessageType.Text)
else if (message.MessageType == WebSocketMessageType.Text)
{
string message = Encoding.UTF8.GetString(e.Data.Take(e.Count).ToArray());
var e = (WebSocketClientWrapper.TextMessage)message;

JObject messageDict = Utils.JsonDeserialize(message);
JObject messageDict = Utils.JsonDeserialize(e.Message);
if ((string)messageDict["type"] == "order")
{
OnOrderUpdate(new Messages.Order(messageDict["data"]));
}
else if ((string)messageDict["type"] == "error")
{
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Error, -1, $"Zerodha WSS Error. Data: {e.Data} Exception: {messageDict["data"]}"));

OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Error, -1, $"Zerodha WSS Error. Data: {e.Message} Exception: {messageDict["data"]}"));
}
}
else if (e.MessageType == WebSocketMessageType.Close)
{
WebSocket.Close();
}
}
catch (Exception exception)
{
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Error, -1, $"Parsing wss message failed. Data: {e.Data} Exception: {exception}"));
throw;
if (message.MessageType == WebSocketMessageType.Binary)
{
var e = (WebSocketClientWrapper.BinaryMessage)message;
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Error, -1, $"Parsing wss message failed. Data: {e.Data} Exception: {exception}"));
throw;
}
else
{
var e = (WebSocketClientWrapper.TextMessage)message;
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Error, -1, $"Parsing wss message failed. Data: {e.Message} Exception: {exception}"));
throw;
}
}
}

Expand Down
Loading

0 comments on commit 7881aeb

Please sign in to comment.