Skip to content

Commit

Permalink
Bugfix: Websocket ping/pong improvements (thrasher-corp#406)
Browse files Browse the repository at this point in the history
* Renames func. Creates new func to setup pinghander either via gorilla style or our own

* Cleans up all ping pong handlers.......

* Clears up issues, makes naming a bit better

* Adds tests

* Adds ping support to binance

* Cleans up ping pongs and adds a comment

* Cleans up waitgroup stuff.

* DISCREETLY cleans up woeful function

* Fixes Kraken ping message type. Removes unnecessary test property. Adds `if err == websocket.ErrCloseSent {` to ping func

* +1 for +v
  • Loading branch information
gloriousCode authored and thrasher- committed Jan 3, 2020
1 parent 4e05ad4 commit 44ac358
Show file tree
Hide file tree
Showing 19 changed files with 197 additions and 124 deletions.
7 changes: 6 additions & 1 deletion exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

const (
binanceDefaultWebsocketURL = "wss://stream.binance.com:9443"
pingDelay = time.Minute * 9
)

// WsConnect intiates a websocket connection
Expand Down Expand Up @@ -71,7 +72,11 @@ func (b *Binance) WsConnect() error {
b.Name,
err)
}

b.WebsocketConn.SetupPingHandler(wshandler.WebsocketPingHandler{
UseGorillaHandler: true,
MessageType: websocket.PongMessage,
Delay: pingDelay,
})
go b.WsHandleData()

return nil
Expand Down
12 changes: 6 additions & 6 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func (b *Bitfinex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr
}
}

return b.WebsocketConn.SendMessage(req)
return b.WebsocketConn.SendJSONMessage(req)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand All @@ -798,7 +798,7 @@ func (b *Bitfinex) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubs
req[k] = v
}
}
return b.WebsocketConn.SendMessage(req)
return b.WebsocketConn.SendJSONMessage(req)
}

// WsSendAuth sends a autheticated event payload
Expand All @@ -820,7 +820,7 @@ func (b *Bitfinex) WsSendAuth() error {
AuthNonce: nonce,
DeadManSwitch: 0,
}
err := b.AuthenticatedWebsocketConn.SendMessage(request)
err := b.AuthenticatedWebsocketConn.SendJSONMessage(request)
if err != nil {
b.Websocket.SetCanUseAuthenticatedEndpoints(false)
return err
Expand Down Expand Up @@ -907,7 +907,7 @@ func (b *Bitfinex) WsCancelMultiOrders(orderIDs []int64) error {
OrderID: orderIDs,
}
request := makeRequestInterface(wsCancelMultipleOrders, cancel)
return b.AuthenticatedWebsocketConn.SendMessage(request)
return b.AuthenticatedWebsocketConn.SendJSONMessage(request)
}

// WsCancelOrder authenticated cancel order request
Expand Down Expand Up @@ -942,13 +942,13 @@ func (b *Bitfinex) WsCancelOrder(orderID int64) error {
func (b *Bitfinex) WsCancelAllOrders() error {
cancelAll := WsCancelAllOrdersRequest{All: 1}
request := makeRequestInterface(wsCancelMultipleOrders, cancelAll)
return b.AuthenticatedWebsocketConn.SendMessage(request)
return b.AuthenticatedWebsocketConn.SendJSONMessage(request)
}

// WsNewOffer authenticated new offer request
func (b *Bitfinex) WsNewOffer(data *WsNewOfferRequest) error {
request := makeRequestInterface(wsFundingOrderNew, data)
return b.AuthenticatedWebsocketConn.SendMessage(request)
return b.AuthenticatedWebsocketConn.SendJSONMessage(request)
}

// WsCancelOffer authenticated cancel offer request
Expand Down
24 changes: 3 additions & 21 deletions exchanges/bitmex/bitmex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ const (
bitmexActionUpdateData = "update"
)

var (
pongChan = make(chan int, 1)
)

// WsConnect initiates a new websocket connection
func (b *Bitmex) WsConnect() error {
if !b.Websocket.IsEnabled() || !b.IsEnabled() {
Expand Down Expand Up @@ -99,7 +95,6 @@ func (b *Bitmex) WsConnect() error {

go b.wsHandleIncomingData()
b.GenerateDefaultSubscriptions()

err = b.websocketSendAuth()
if err != nil {
log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", b.Name, err)
Expand Down Expand Up @@ -128,19 +123,6 @@ func (b *Bitmex) wsHandleIncomingData() {
return
}
b.Websocket.TrafficAlert <- struct{}{}
message := string(resp.Raw)
if strings.Contains(message, "pong") {
pongChan <- 1
continue
}

if strings.Contains(message, "ping") {
err = b.WebsocketConn.SendMessage("pong")
if err != nil {
b.Websocket.DataHandler <- err
continue
}
}

quickCapture := make(map[string]interface{})
err = json.Unmarshal(resp.Raw, &quickCapture)
Expand Down Expand Up @@ -487,7 +469,7 @@ func (b *Bitmex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip
var subscriber WebsocketRequest
subscriber.Command = "subscribe"
subscriber.Arguments = append(subscriber.Arguments, channelToSubscribe.Channel)
return b.WebsocketConn.SendMessage(subscriber)
return b.WebsocketConn.SendJSONMessage(subscriber)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand All @@ -497,7 +479,7 @@ func (b *Bitmex) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscr
subscriber.Arguments = append(subscriber.Arguments,
channelToSubscribe.Params["args"],
channelToSubscribe.Channel+":"+channelToSubscribe.Currency.String())
return b.WebsocketConn.SendMessage(subscriber)
return b.WebsocketConn.SendJSONMessage(subscriber)
}

// WebsocketSendAuth sends an authenticated subscription
Expand All @@ -517,7 +499,7 @@ func (b *Bitmex) websocketSendAuth() error {
sendAuth.Command = "authKeyExpires"
sendAuth.Arguments = append(sendAuth.Arguments, b.API.Credentials.Key, timestamp,
signature)
err := b.WebsocketConn.SendMessage(sendAuth)
err := b.WebsocketConn.SendJSONMessage(sendAuth)
if err != nil {
b.Websocket.SetCanUseAuthenticatedEndpoints(false)
return err
Expand Down
4 changes: 2 additions & 2 deletions exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (b *Bitstamp) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr
Channel: channelToSubscribe.Channel,
},
}
return b.WebsocketConn.SendMessage(req)
return b.WebsocketConn.SendJSONMessage(req)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand All @@ -153,7 +153,7 @@ func (b *Bitstamp) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubs
Channel: channelToSubscribe.Channel,
},
}
return b.WebsocketConn.SendMessage(req)
return b.WebsocketConn.SendJSONMessage(req)
}

func (b *Bitstamp) wsUpdateOrderbook(update websocketOrderBook, p currency.Pair, assetType asset.Item) error {
Expand Down
4 changes: 2 additions & 2 deletions exchanges/btcmarkets/btcmarkets_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (b *BTCMarkets) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubs
Channels: []string{channelToSubscribe.Channel},
MessageType: subscribe,
}
err := b.WebsocketConn.SendMessage(req)
err := b.WebsocketConn.SendJSONMessage(req)
if err != nil {
return err
}
Expand All @@ -251,7 +251,7 @@ func (b *BTCMarkets) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubs
message.Key = tempAuthData.Key
message.Signature = tempAuthData.Signature
message.Timestamp = tempAuthData.Timestamp
err := b.WebsocketConn.SendMessage(message)
err := b.WebsocketConn.SendJSONMessage(message)
if err != nil {
return err
}
Expand Down
25 changes: 6 additions & 19 deletions exchanges/btse/btse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func (b *BTSE) WsConnect() error {
if err != nil {
return err
}
go b.Pinger()
b.WebsocketConn.SetupPingHandler(wshandler.WebsocketPingHandler{
MessageType: websocket.PingMessage,
Delay: btseWebsocketTimer,
})
go b.WsHandleData()
b.GenerateDefaultSubscriptions()

Expand Down Expand Up @@ -176,29 +179,13 @@ func (b *BTSE) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscripti
var sub wsSub
sub.Operation = "subscribe"
sub.Arguments = []string{channelToSubscribe.Channel}
return b.WebsocketConn.SendMessage(sub)
return b.WebsocketConn.SendJSONMessage(sub)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
func (b *BTSE) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscription) error {
var unSub wsSub
unSub.Operation = "unsubscribe"
unSub.Arguments = []string{channelToSubscribe.Channel}
return b.WebsocketConn.SendMessage(unSub)
}

// Pinger pings
func (b *BTSE) Pinger() {
ticker := time.NewTicker(btseWebsocketTimer)

for {
select {
case <-b.Websocket.ShutdownC:
ticker.Stop()
return

case <-ticker.C:
b.WebsocketConn.Connection.WriteMessage(websocket.PingMessage, nil)
}
}
return b.WebsocketConn.SendJSONMessage(unSub)
}
4 changes: 2 additions & 2 deletions exchanges/coinbasepro/coinbasepro_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (c *CoinbasePro) Subscribe(channelToSubscribe wshandler.WebsocketChannelSub
subscribe.Passphrase = c.API.Credentials.ClientID
subscribe.Timestamp = n
}
return c.WebsocketConn.SendMessage(subscribe)
return c.WebsocketConn.SendJSONMessage(subscribe)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand All @@ -337,5 +337,5 @@ func (c *CoinbasePro) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelS
},
},
}
return c.WebsocketConn.SendMessage(subscribe)
return c.WebsocketConn.SendJSONMessage(subscribe)
}
15 changes: 8 additions & 7 deletions exchanges/coinbene/coinbene_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ func (c *Coinbene) WsDataHandler() {
return
}
c.Websocket.TrafficAlert <- struct{}{}
if string(stream.Raw) == "ping" {
c.WebsocketConn.Lock()
c.WebsocketConn.Connection.WriteMessage(websocket.TextMessage, []byte("pong"))
c.WebsocketConn.Unlock()
if string(stream.Raw) == wshandler.Ping {
err = c.WebsocketConn.SendRawMessage(websocket.TextMessage, []byte(wshandler.Pong))
if err != nil {
c.Websocket.DataHandler <- err
}
continue
}
var result map[string]interface{}
Expand Down Expand Up @@ -342,15 +343,15 @@ func (c *Coinbene) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr
var sub WsSub
sub.Operation = "subscribe"
sub.Arguments = []string{channelToSubscribe.Channel}
return c.WebsocketConn.SendMessage(sub)
return c.WebsocketConn.SendJSONMessage(sub)
}

// Unsubscribe sends a websocket message to receive data from the channel
func (c *Coinbene) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscription) error {
var sub WsSub
sub.Operation = "unsubscribe"
sub.Arguments = []string{channelToSubscribe.Channel}
return c.WebsocketConn.SendMessage(sub)
return c.WebsocketConn.SendJSONMessage(sub)
}

// Login logs in
Expand All @@ -364,5 +365,5 @@ func (c *Coinbene) Login() error {
sign := crypto.HexEncodeToString(tempSign)
sub.Operation = "login"
sub.Arguments = []string{c.API.Credentials.Key, expTime, sign}
return c.WebsocketConn.SendMessage(sub)
return c.WebsocketConn.SendJSONMessage(sub)
}
2 changes: 1 addition & 1 deletion exchanges/coinut/coinut_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (c *COINUT) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip
Subscribe: true,
Nonce: c.WebsocketConn.GenerateMessageID(false),
}
return c.WebsocketConn.SendMessage(subscribe)
return c.WebsocketConn.SendJSONMessage(subscribe)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand Down
1 change: 0 additions & 1 deletion exchanges/gateio/gateio_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

const (
gateioWebsocketEndpoint = "wss://ws.gateio.ws/v3/"
gatioWsMethodPing = "ping"
gateioWebsocketRateLimit = 120
)

Expand Down
6 changes: 3 additions & 3 deletions exchanges/hitbtc/hitbtc_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (h *HitBTC) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip
}
}

return h.WebsocketConn.SendMessage(subscribe)
return h.WebsocketConn.SendJSONMessage(subscribe)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand Down Expand Up @@ -388,7 +388,7 @@ func (h *HitBTC) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscr
}
}

return h.WebsocketConn.SendMessage(subscribe)
return h.WebsocketConn.SendJSONMessage(subscribe)
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand All @@ -409,7 +409,7 @@ func (h *HitBTC) wsLogin() error {
},
}

err := h.WebsocketConn.SendMessage(request)
err := h.WebsocketConn.SendJSONMessage(request)
if err != nil {
h.Websocket.SetCanUseAuthenticatedEndpoints(false)
return err
Expand Down
25 changes: 13 additions & 12 deletions exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,7 @@ func (h *HUOBI) wsHandleAuthenticatedData(resp WsMessage) {
return
}
if init.Ping != 0 {
err = h.WebsocketConn.SendMessage(WsPong{Pong: init.Ping})
if err != nil {
log.Error(log.ExchangeSys, err)
}
h.sendPingResponse(init.Ping)
return
}
if init.ErrorMessage == "api-signature-not-valid" {
Expand Down Expand Up @@ -219,10 +216,7 @@ func (h *HUOBI) wsHandleMarketData(resp WsMessage) {
return
}
if init.Ping != 0 {
err = h.WebsocketConn.SendMessage(WsPong{Pong: init.Ping})
if err != nil {
log.Error(log.ExchangeSys, err)
}
h.sendPingResponse(init.Ping)
return
}

Expand Down Expand Up @@ -301,6 +295,13 @@ func (h *HUOBI) wsHandleMarketData(resp WsMessage) {
}
}

func (h *HUOBI) sendPingResponse(pong int64) {
err := h.WebsocketConn.SendJSONMessage(WsPong{Pong: pong})
if err != nil {
log.Error(log.ExchangeSys, err)
}
}

// WsProcessOrderbook processes new orderbook data
func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error {
p := currency.NewPairFromFormattedPairs(symbol,
Expand Down Expand Up @@ -372,7 +373,7 @@ func (h *HUOBI) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscript
strings.Contains(channelToSubscribe.Channel, "accounts") {
return h.wsAuthenticatedSubscribe("sub", wsAccountsOrdersEndPoint+channelToSubscribe.Channel, channelToSubscribe.Channel)
}
return h.WebsocketConn.SendMessage(WsRequest{Subscribe: channelToSubscribe.Channel})
return h.WebsocketConn.SendJSONMessage(WsRequest{Subscribe: channelToSubscribe.Channel})
}

// Unsubscribe sends a websocket message to stop receiving data from the channel
Expand All @@ -381,7 +382,7 @@ func (h *HUOBI) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscri
strings.Contains(channelToSubscribe.Channel, "accounts") {
return h.wsAuthenticatedSubscribe("unsub", wsAccountsOrdersEndPoint+channelToSubscribe.Channel, channelToSubscribe.Channel)
}
return h.WebsocketConn.SendMessage(WsRequest{Unsubscribe: channelToSubscribe.Channel})
return h.WebsocketConn.SendJSONMessage(WsRequest{Unsubscribe: channelToSubscribe.Channel})
}

func (h *HUOBI) wsGenerateSignature(timestamp, endpoint string) []byte {
Expand Down Expand Up @@ -411,7 +412,7 @@ func (h *HUOBI) wsLogin() error {
}
hmac := h.wsGenerateSignature(timestamp, wsAccountsOrdersEndPoint)
request.Signature = crypto.Base64Encode(hmac)
err := h.AuthenticatedWebsocketConn.SendMessage(request)
err := h.AuthenticatedWebsocketConn.SendJSONMessage(request)
if err != nil {
h.Websocket.SetCanUseAuthenticatedEndpoints(false)
return err
Expand All @@ -433,7 +434,7 @@ func (h *HUOBI) wsAuthenticatedSubscribe(operation, endpoint, topic string) erro
}
hmac := h.wsGenerateSignature(timestamp, endpoint)
request.Signature = crypto.Base64Encode(hmac)
return h.AuthenticatedWebsocketConn.SendMessage(request)
return h.AuthenticatedWebsocketConn.SendJSONMessage(request)
}

func (h *HUOBI) wsGetAccountsList() (*WsAuthenticatedAccountsListResponse, error) {
Expand Down
Loading

0 comments on commit 44ac358

Please sign in to comment.