diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index 8bccea52531..74d4e2a8366 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -20,6 +20,7 @@ import ( const ( binanceDefaultWebsocketURL = "wss://stream.binance.com:9443" + pingDelay = time.Minute * 9 ) // WsConnect intiates a websocket connection @@ -71,7 +72,11 @@ func (b *Binance) WsConnect() error { b.Name, err) } - + b.WebsocketConn.SetupPingHandler(wshandler.WebsocketPingHandler{ + UseGorillaHandler: true, + MessageType: websocket.PongMessage, + Delay: pingDelay, + }) go b.WsHandleData() return nil diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index ac8e1a61413..f94d99c2d04 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -784,7 +784,7 @@ func (b *Bitfinex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr } } - return b.WebsocketConn.SendMessage(req) + return b.WebsocketConn.SendJSONMessage(req) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -798,7 +798,7 @@ func (b *Bitfinex) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubs req[k] = v } } - return b.WebsocketConn.SendMessage(req) + return b.WebsocketConn.SendJSONMessage(req) } // WsSendAuth sends a autheticated event payload @@ -820,7 +820,7 @@ func (b *Bitfinex) WsSendAuth() error { AuthNonce: nonce, DeadManSwitch: 0, } - err := b.AuthenticatedWebsocketConn.SendMessage(request) + err := b.AuthenticatedWebsocketConn.SendJSONMessage(request) if err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -907,7 +907,7 @@ func (b *Bitfinex) WsCancelMultiOrders(orderIDs []int64) error { OrderID: orderIDs, } request := makeRequestInterface(wsCancelMultipleOrders, cancel) - return b.AuthenticatedWebsocketConn.SendMessage(request) + return b.AuthenticatedWebsocketConn.SendJSONMessage(request) } // WsCancelOrder authenticated cancel order request @@ -942,13 +942,13 @@ func (b *Bitfinex) WsCancelOrder(orderID int64) error { func (b *Bitfinex) WsCancelAllOrders() error { cancelAll := WsCancelAllOrdersRequest{All: 1} request := makeRequestInterface(wsCancelMultipleOrders, cancelAll) - return b.AuthenticatedWebsocketConn.SendMessage(request) + return b.AuthenticatedWebsocketConn.SendJSONMessage(request) } // WsNewOffer authenticated new offer request func (b *Bitfinex) WsNewOffer(data *WsNewOfferRequest) error { request := makeRequestInterface(wsFundingOrderNew, data) - return b.AuthenticatedWebsocketConn.SendMessage(request) + return b.AuthenticatedWebsocketConn.SendJSONMessage(request) } // WsCancelOffer authenticated cancel offer request diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index 61db8928d5e..320fd9f8532 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -63,10 +63,6 @@ const ( bitmexActionUpdateData = "update" ) -var ( - pongChan = make(chan int, 1) -) - // WsConnect initiates a new websocket connection func (b *Bitmex) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { @@ -99,7 +95,6 @@ func (b *Bitmex) WsConnect() error { go b.wsHandleIncomingData() b.GenerateDefaultSubscriptions() - err = b.websocketSendAuth() if err != nil { log.Errorf(log.ExchangeSys, "%v - authentication failed: %v\n", b.Name, err) @@ -128,19 +123,6 @@ func (b *Bitmex) wsHandleIncomingData() { return } b.Websocket.TrafficAlert <- struct{}{} - message := string(resp.Raw) - if strings.Contains(message, "pong") { - pongChan <- 1 - continue - } - - if strings.Contains(message, "ping") { - err = b.WebsocketConn.SendMessage("pong") - if err != nil { - b.Websocket.DataHandler <- err - continue - } - } quickCapture := make(map[string]interface{}) err = json.Unmarshal(resp.Raw, &quickCapture) @@ -487,7 +469,7 @@ func (b *Bitmex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip var subscriber WebsocketRequest subscriber.Command = "subscribe" subscriber.Arguments = append(subscriber.Arguments, channelToSubscribe.Channel) - return b.WebsocketConn.SendMessage(subscriber) + return b.WebsocketConn.SendJSONMessage(subscriber) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -497,7 +479,7 @@ func (b *Bitmex) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscr subscriber.Arguments = append(subscriber.Arguments, channelToSubscribe.Params["args"], channelToSubscribe.Channel+":"+channelToSubscribe.Currency.String()) - return b.WebsocketConn.SendMessage(subscriber) + return b.WebsocketConn.SendJSONMessage(subscriber) } // WebsocketSendAuth sends an authenticated subscription @@ -517,7 +499,7 @@ func (b *Bitmex) websocketSendAuth() error { sendAuth.Command = "authKeyExpires" sendAuth.Arguments = append(sendAuth.Arguments, b.API.Credentials.Key, timestamp, signature) - err := b.WebsocketConn.SendMessage(sendAuth) + err := b.WebsocketConn.SendJSONMessage(sendAuth) if err != nil { b.Websocket.SetCanUseAuthenticatedEndpoints(false) return err diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 65be546a021..a1468201c05 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -142,7 +142,7 @@ func (b *Bitstamp) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr Channel: channelToSubscribe.Channel, }, } - return b.WebsocketConn.SendMessage(req) + return b.WebsocketConn.SendJSONMessage(req) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -153,7 +153,7 @@ func (b *Bitstamp) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubs Channel: channelToSubscribe.Channel, }, } - return b.WebsocketConn.SendMessage(req) + return b.WebsocketConn.SendJSONMessage(req) } func (b *Bitstamp) wsUpdateOrderbook(update websocketOrderBook, p currency.Pair, assetType asset.Item) error { diff --git a/exchanges/btcmarkets/btcmarkets_websocket.go b/exchanges/btcmarkets/btcmarkets_websocket.go index 205509ade45..4f86b6fc6ff 100644 --- a/exchanges/btcmarkets/btcmarkets_websocket.go +++ b/exchanges/btcmarkets/btcmarkets_websocket.go @@ -237,7 +237,7 @@ func (b *BTCMarkets) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubs Channels: []string{channelToSubscribe.Channel}, MessageType: subscribe, } - err := b.WebsocketConn.SendMessage(req) + err := b.WebsocketConn.SendJSONMessage(req) if err != nil { return err } @@ -251,7 +251,7 @@ func (b *BTCMarkets) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubs message.Key = tempAuthData.Key message.Signature = tempAuthData.Signature message.Timestamp = tempAuthData.Timestamp - err := b.WebsocketConn.SendMessage(message) + err := b.WebsocketConn.SendJSONMessage(message) if err != nil { return err } diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 4293b2ac946..1b9e214e722 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -33,7 +33,10 @@ func (b *BTSE) WsConnect() error { if err != nil { return err } - go b.Pinger() + b.WebsocketConn.SetupPingHandler(wshandler.WebsocketPingHandler{ + MessageType: websocket.PingMessage, + Delay: btseWebsocketTimer, + }) go b.WsHandleData() b.GenerateDefaultSubscriptions() @@ -176,7 +179,7 @@ func (b *BTSE) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscripti var sub wsSub sub.Operation = "subscribe" sub.Arguments = []string{channelToSubscribe.Channel} - return b.WebsocketConn.SendMessage(sub) + return b.WebsocketConn.SendJSONMessage(sub) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -184,21 +187,5 @@ func (b *BTSE) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip var unSub wsSub unSub.Operation = "unsubscribe" unSub.Arguments = []string{channelToSubscribe.Channel} - return b.WebsocketConn.SendMessage(unSub) -} - -// Pinger pings -func (b *BTSE) Pinger() { - ticker := time.NewTicker(btseWebsocketTimer) - - for { - select { - case <-b.Websocket.ShutdownC: - ticker.Stop() - return - - case <-ticker.C: - b.WebsocketConn.Connection.WriteMessage(websocket.PingMessage, nil) - } - } + return b.WebsocketConn.SendJSONMessage(unSub) } diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 71e3f2b3e03..e57a1c478c3 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -320,7 +320,7 @@ func (c *CoinbasePro) Subscribe(channelToSubscribe wshandler.WebsocketChannelSub subscribe.Passphrase = c.API.Credentials.ClientID subscribe.Timestamp = n } - return c.WebsocketConn.SendMessage(subscribe) + return c.WebsocketConn.SendJSONMessage(subscribe) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -337,5 +337,5 @@ func (c *CoinbasePro) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelS }, }, } - return c.WebsocketConn.SendMessage(subscribe) + return c.WebsocketConn.SendJSONMessage(subscribe) } diff --git a/exchanges/coinbene/coinbene_websocket.go b/exchanges/coinbene/coinbene_websocket.go index 69c8c214b7f..1e8ddbcf4af 100644 --- a/exchanges/coinbene/coinbene_websocket.go +++ b/exchanges/coinbene/coinbene_websocket.go @@ -96,10 +96,11 @@ func (c *Coinbene) WsDataHandler() { return } c.Websocket.TrafficAlert <- struct{}{} - if string(stream.Raw) == "ping" { - c.WebsocketConn.Lock() - c.WebsocketConn.Connection.WriteMessage(websocket.TextMessage, []byte("pong")) - c.WebsocketConn.Unlock() + if string(stream.Raw) == wshandler.Ping { + err = c.WebsocketConn.SendRawMessage(websocket.TextMessage, []byte(wshandler.Pong)) + if err != nil { + c.Websocket.DataHandler <- err + } continue } var result map[string]interface{} @@ -342,7 +343,7 @@ func (c *Coinbene) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr var sub WsSub sub.Operation = "subscribe" sub.Arguments = []string{channelToSubscribe.Channel} - return c.WebsocketConn.SendMessage(sub) + return c.WebsocketConn.SendJSONMessage(sub) } // Unsubscribe sends a websocket message to receive data from the channel @@ -350,7 +351,7 @@ func (c *Coinbene) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubs var sub WsSub sub.Operation = "unsubscribe" sub.Arguments = []string{channelToSubscribe.Channel} - return c.WebsocketConn.SendMessage(sub) + return c.WebsocketConn.SendJSONMessage(sub) } // Login logs in @@ -364,5 +365,5 @@ func (c *Coinbene) Login() error { sign := crypto.HexEncodeToString(tempSign) sub.Operation = "login" sub.Arguments = []string{c.API.Credentials.Key, expTime, sign} - return c.WebsocketConn.SendMessage(sub) + return c.WebsocketConn.SendJSONMessage(sub) } diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index 9c1b286ec9e..8b9f8db0dc2 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -346,7 +346,7 @@ func (c *COINUT) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip Subscribe: true, Nonce: c.WebsocketConn.GenerateMessageID(false), } - return c.WebsocketConn.SendMessage(subscribe) + return c.WebsocketConn.SendJSONMessage(subscribe) } // Unsubscribe sends a websocket message to stop receiving data from the channel diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 7ab7d5acd9f..42b5be07888 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -23,7 +23,6 @@ import ( const ( gateioWebsocketEndpoint = "wss://ws.gateio.ws/v3/" - gatioWsMethodPing = "ping" gateioWebsocketRateLimit = 120 ) diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 91d3d713648..f1d6c2f874b 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -359,7 +359,7 @@ func (h *HitBTC) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip } } - return h.WebsocketConn.SendMessage(subscribe) + return h.WebsocketConn.SendJSONMessage(subscribe) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -388,7 +388,7 @@ func (h *HitBTC) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscr } } - return h.WebsocketConn.SendMessage(subscribe) + return h.WebsocketConn.SendJSONMessage(subscribe) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -409,7 +409,7 @@ func (h *HitBTC) wsLogin() error { }, } - err := h.WebsocketConn.SendMessage(request) + err := h.WebsocketConn.SendJSONMessage(request) if err != nil { h.Websocket.SetCanUseAuthenticatedEndpoints(false) return err diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index 1c1d53739fb..9d21a9a4e26 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -146,10 +146,7 @@ func (h *HUOBI) wsHandleAuthenticatedData(resp WsMessage) { return } if init.Ping != 0 { - err = h.WebsocketConn.SendMessage(WsPong{Pong: init.Ping}) - if err != nil { - log.Error(log.ExchangeSys, err) - } + h.sendPingResponse(init.Ping) return } if init.ErrorMessage == "api-signature-not-valid" { @@ -219,10 +216,7 @@ func (h *HUOBI) wsHandleMarketData(resp WsMessage) { return } if init.Ping != 0 { - err = h.WebsocketConn.SendMessage(WsPong{Pong: init.Ping}) - if err != nil { - log.Error(log.ExchangeSys, err) - } + h.sendPingResponse(init.Ping) return } @@ -301,6 +295,13 @@ func (h *HUOBI) wsHandleMarketData(resp WsMessage) { } } +func (h *HUOBI) sendPingResponse(pong int64) { + err := h.WebsocketConn.SendJSONMessage(WsPong{Pong: pong}) + if err != nil { + log.Error(log.ExchangeSys, err) + } +} + // WsProcessOrderbook processes new orderbook data func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error { p := currency.NewPairFromFormattedPairs(symbol, @@ -372,7 +373,7 @@ func (h *HUOBI) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscript strings.Contains(channelToSubscribe.Channel, "accounts") { return h.wsAuthenticatedSubscribe("sub", wsAccountsOrdersEndPoint+channelToSubscribe.Channel, channelToSubscribe.Channel) } - return h.WebsocketConn.SendMessage(WsRequest{Subscribe: channelToSubscribe.Channel}) + return h.WebsocketConn.SendJSONMessage(WsRequest{Subscribe: channelToSubscribe.Channel}) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -381,7 +382,7 @@ func (h *HUOBI) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscri strings.Contains(channelToSubscribe.Channel, "accounts") { return h.wsAuthenticatedSubscribe("unsub", wsAccountsOrdersEndPoint+channelToSubscribe.Channel, channelToSubscribe.Channel) } - return h.WebsocketConn.SendMessage(WsRequest{Unsubscribe: channelToSubscribe.Channel}) + return h.WebsocketConn.SendJSONMessage(WsRequest{Unsubscribe: channelToSubscribe.Channel}) } func (h *HUOBI) wsGenerateSignature(timestamp, endpoint string) []byte { @@ -411,7 +412,7 @@ func (h *HUOBI) wsLogin() error { } hmac := h.wsGenerateSignature(timestamp, wsAccountsOrdersEndPoint) request.Signature = crypto.Base64Encode(hmac) - err := h.AuthenticatedWebsocketConn.SendMessage(request) + err := h.AuthenticatedWebsocketConn.SendJSONMessage(request) if err != nil { h.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -433,7 +434,7 @@ func (h *HUOBI) wsAuthenticatedSubscribe(operation, endpoint, topic string) erro } hmac := h.wsGenerateSignature(timestamp, endpoint) request.Signature = crypto.Base64Encode(hmac) - return h.AuthenticatedWebsocketConn.SendMessage(request) + return h.AuthenticatedWebsocketConn.SendJSONMessage(request) } func (h *HUOBI) wsGetAccountsList() (*WsAuthenticatedAccountsListResponse, error) { diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 798377bdeec..89d6c74d115 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -29,8 +29,6 @@ const ( krakenWSSupportedVersion = "0.3.0" // WS endpoints krakenWsHeartbeat = "heartbeat" - krakenWsPing = "ping" - krakenWsPong = "pong" krakenWsSystemStatus = "systemStatus" krakenWsSubscribe = "subscribe" krakenWsSubscriptionStatus = "subscriptionStatus" @@ -45,12 +43,14 @@ const ( krakenWsAddOrder = "addOrder" krakenWsCancelOrder = "cancelOrder" krakenWsRateLimit = 50 + krakenWsPingDelay = time.Second * 27 ) // orderbookMutex Ensures if two entries arrive at once, only one can be processed at a time var subscriptionChannelPair []WebsocketChannelData var comms = make(chan wshandler.WebsocketResponse) var authToken string +var pingRequest = WebsocketBaseEventRequest{Event: wshandler.Ping} // Channels require a topic and a currency // Format [[ticker,but-t4u],[orderbook,nce-btt]] @@ -84,7 +84,10 @@ func (k *Kraken) WsConnect() error { go k.WsReadData(k.WebsocketConn) go k.WsHandleData() - go k.wsPingHandler() + err = k.wsPingHandler() + if err != nil { + log.Errorf(log.ExchangeSys, "%v - failed setup ping handler. Websocket may disconnect unexpectedly. %v\n", k.Name, err) + } k.GenerateDefaultSubscriptions() return nil @@ -148,28 +151,17 @@ func (k *Kraken) WsHandleData() { } // wsPingHandler sends a message "ping" every 27 to maintain the connection to the websocket -func (k *Kraken) wsPingHandler() { - k.Websocket.Wg.Add(1) - defer k.Websocket.Wg.Done() - ticker := time.NewTicker(time.Second * 27) - defer ticker.Stop() - - for { - select { - case <-k.Websocket.ShutdownC: - return - case <-ticker.C: - pingEvent := WebsocketBaseEventRequest{Event: krakenWsPing} - if k.Verbose { - log.Debugf(log.ExchangeSys, "%v sending ping", - k.Name) - } - err := k.WebsocketConn.SendMessage(pingEvent) - if err != nil { - k.Websocket.DataHandler <- err - } - } +func (k *Kraken) wsPingHandler() error { + message, err := json.Marshal(pingRequest) + if err != nil { + return err } + k.WebsocketConn.SetupPingHandler(wshandler.WebsocketPingHandler{ + Message: message, + Delay: krakenWsPingDelay, + MessageType: websocket.TextMessage, + }) + return nil } // WsHandleDataResponse classifies the WS response and sends to appropriate handler @@ -219,16 +211,13 @@ func (k *Kraken) WsHandleDataResponse(response WebsocketDataResponse) { // WsHandleEventResponse classifies the WS response and sends to appropriate handler func (k *Kraken) WsHandleEventResponse(response *WebsocketEventResponse, rawResponse []byte) { switch response.Event { + case wshandler.Pong: + break case krakenWsHeartbeat: if k.Verbose { log.Debugf(log.ExchangeSys, "%v Websocket heartbeat data received", k.Name) } - case krakenWsPong: - if k.Verbose { - log.Debugf(log.ExchangeSys, "%v Websocket pong data received", - k.Name) - } case krakenWsSystemStatus: if k.Verbose { log.Debugf(log.ExchangeSys, "%v Websocket status data received", @@ -925,5 +914,5 @@ func (k *Kraken) wsCancelOrders(orderIDs []string) error { Token: authToken, TransactionIDs: orderIDs, } - return k.AuthenticatedWebsocketConn.SendMessage(request) + return k.AuthenticatedWebsocketConn.SendJSONMessage(request) } diff --git a/exchanges/okgroup/okgroup_websocket.go b/exchanges/okgroup/okgroup_websocket.go index ed0df0d58ac..b3763a4f9c6 100644 --- a/exchanges/okgroup/okgroup_websocket.go +++ b/exchanges/okgroup/okgroup_websocket.go @@ -284,7 +284,7 @@ func (o *OKGroup) WsLogin() error { base64, }, } - err := o.WebsocketConn.SendMessage(request) + err := o.WebsocketConn.SendJSONMessage(request) if err != nil { o.Websocket.SetCanUseAuthenticatedEndpoints(false) return err @@ -827,7 +827,7 @@ func (o *OKGroup) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscri channelToSubscribe.Currency.Base.String()} } - return o.WebsocketConn.SendMessage(request) + return o.WebsocketConn.SendJSONMessage(request) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -838,5 +838,5 @@ func (o *OKGroup) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubsc delimiterColon + channelToSubscribe.Currency.String()}, } - return o.WebsocketConn.SendMessage(request) + return o.WebsocketConn.SendJSONMessage(request) } diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index c54120d0289..99720d0fbf9 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -502,7 +502,7 @@ func (p *Poloniex) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscr default: subscriptionRequest.Channel = channelToSubscribe.Currency.String() } - return p.WebsocketConn.SendMessage(subscriptionRequest) + return p.WebsocketConn.SendJSONMessage(subscriptionRequest) } // Unsubscribe sends a websocket message to stop receiving data from the channel @@ -518,7 +518,7 @@ func (p *Poloniex) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubs default: unsubscriptionRequest.Channel = channelToSubscribe.Currency.String() } - return p.WebsocketConn.SendMessage(unsubscriptionRequest) + return p.WebsocketConn.SendJSONMessage(unsubscriptionRequest) } func (p *Poloniex) wsSendAuthorisedCommand(command string) error { @@ -531,5 +531,5 @@ func (p *Poloniex) wsSendAuthorisedCommand(command string) error { Key: p.API.Credentials.Key, Payload: nonce, } - return p.WebsocketConn.SendMessage(request) + return p.WebsocketConn.SendJSONMessage(request) } diff --git a/exchanges/websocket/wshandler/wshandler.go b/exchanges/websocket/wshandler/wshandler.go index be0ee3c22f3..278a67357cb 100644 --- a/exchanges/websocket/wshandler/wshandler.go +++ b/exchanges/websocket/wshandler/wshandler.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/flate" "compress/gzip" - "encoding/json" "errors" "fmt" "io/ioutil" @@ -656,32 +655,82 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header return nil } -// SendMessage the one true message request. Sends message to WS -func (w *WebsocketConnection) SendMessage(data interface{}) error { +// SendJSONMessage sends a JSON encoded message over the connection +func (w *WebsocketConnection) SendJSONMessage(data interface{}) error { w.Lock() defer w.Unlock() if !w.IsConnected() { return fmt.Errorf("%v cannot send message to a disconnected websocket", w.ExchangeName) } - json, err := json.Marshal(data) - if err != nil { - return err + if w.Verbose { + log.Debugf(log.WebsocketMgr, + "%v sending message to websocket %+v", w.ExchangeName, data) + } + if w.RateLimit > 0 { + time.Sleep(time.Duration(w.RateLimit) * time.Millisecond) + } + return w.Connection.WriteJSON(data) +} + +// SendRawMessage sends a message over the connection without JSON encoding it +func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error { + w.Lock() + defer w.Unlock() + if !w.IsConnected() { + return fmt.Errorf("%v cannot send message to a disconnected websocket", w.ExchangeName) } if w.Verbose { log.Debugf(log.WebsocketMgr, - "%v sending message to websocket %v", w.ExchangeName, string(json)) + "%v sending message to websocket %s", w.ExchangeName, message) } if w.RateLimit > 0 { time.Sleep(time.Duration(w.RateLimit) * time.Millisecond) } - return w.Connection.WriteMessage(websocket.TextMessage, json) + return w.Connection.WriteMessage(messageType, message) +} + +// SetupPingHandler will automatically send ping or pong messages based on +// WebsocketPingHandler configuration +func (w *WebsocketConnection) SetupPingHandler(handler WebsocketPingHandler) { + if handler.UseGorillaHandler { + h := func(msg string) error { + err := w.Connection.WriteControl(handler.MessageType, []byte(msg), time.Now().Add(handler.Delay)) + if err == websocket.ErrCloseSent { + return nil + } else if e, ok := err.(net.Error); ok && e.Temporary() { + return nil + } + return err + } + w.Connection.SetPingHandler(h) + return + } + w.Wg.Add(1) + defer w.Wg.Done() + go func() { + ticker := time.NewTicker(handler.Delay) + for { + select { + case <-w.Shutdown: + ticker.Stop() + return + case <-ticker.C: + err := w.SendRawMessage(handler.MessageType, handler.Message) + if err != nil { + log.Errorf(log.WebsocketMgr, + "%v failed to send message to websocket %s", w.ExchangeName, handler.Message) + return + } + } + } + }() } // SendMessageReturnResponse will send a WS message to the connection // It will then run a goroutine to await a JSON response // If there is no response it will return an error func (w *WebsocketConnection) SendMessageReturnResponse(id int64, request interface{}) ([]byte, error) { - err := w.SendMessage(request) + err := w.SendJSONMessage(request) if err != nil { return nil, err } diff --git a/exchanges/websocket/wshandler/wshandler_test.go b/exchanges/websocket/wshandler/wshandler_test.go index bfb20816caa..b12c95b759a 100644 --- a/exchanges/websocket/wshandler/wshandler_test.go +++ b/exchanges/websocket/wshandler/wshandler_test.go @@ -156,6 +156,16 @@ func TestWebsocket(t *testing.T) { t.Error("WebsocketSetup") } + ws.setEnabled(false) + if ws.IsEnabled() { + t.Error("WebsocketSetup") + } + + ws.setEnabled(true) + if !ws.IsEnabled() { + t.Error("WebsocketSetup") + } + if ws.GetProxyAddress() != "testProxy" { t.Error("WebsocketSetup") } @@ -569,7 +579,11 @@ func TestSendMessage(t *testing.T) { } t.Fatal(err) } - err = testData.WC.SendMessage("ping") + err = testData.WC.SendJSONMessage(Ping) + if err != nil { + t.Error(err) + } + err = testData.WC.SendRawMessage(websocket.TextMessage, []byte(Ping)) if err != nil { t.Error(err) } @@ -602,6 +616,42 @@ func TestSendMessageWithResponse(t *testing.T) { } } +// TestSetupPingHandler logic test +func TestSetupPingHandler(t *testing.T) { + if wc.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + wc.Shutdown = make(chan struct{}) + err := wc.Dial(&dialer, http.Header{}) + if err != nil { + t.Fatal(err) + } + + wc.SetupPingHandler(WebsocketPingHandler{ + UseGorillaHandler: true, + MessageType: websocket.PingMessage, + Delay: 1000, + }) + + err = wc.Connection.Close() + if err != nil { + t.Error(err) + } + + err = wc.Dial(&dialer, http.Header{}) + if err != nil { + t.Fatal(err) + } + wc.SetupPingHandler(WebsocketPingHandler{ + MessageType: websocket.TextMessage, + Message: []byte(Ping), + Delay: 200, + }) + time.Sleep(time.Millisecond * 500) + close(wc.Shutdown) + wc.Wg.Wait() +} + // TestParseBinaryResponse logic test func TestParseBinaryResponse(t *testing.T) { var b bytes.Buffer diff --git a/exchanges/websocket/wshandler/wshandler_types.go b/exchanges/websocket/wshandler/wshandler_types.go index f6d551ae149..31e8b4fe88c 100644 --- a/exchanges/websocket/wshandler/wshandler_types.go +++ b/exchanges/websocket/wshandler/wshandler_types.go @@ -19,6 +19,8 @@ const ( // connection monitor time delays and limits connectionMonitorDelay = 2 * time.Second WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST" + Ping = "ping" + Pong = "pong" ) // Websocket defines a return type for websocket connections via the interface @@ -164,3 +166,11 @@ type WebsocketConnection struct { ResponseMaxLimit time.Duration TrafficTimeout time.Duration } + +// WebsocketPingHandler container for ping handler settings +type WebsocketPingHandler struct { + UseGorillaHandler bool + MessageType int + Message []byte + Delay time.Duration +} diff --git a/exchanges/zb/zb_websocket.go b/exchanges/zb/zb_websocket.go index 80777ec4733..a1d069bf4ad 100644 --- a/exchanges/zb/zb_websocket.go +++ b/exchanges/zb/zb_websocket.go @@ -214,7 +214,7 @@ func (z *ZB) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscription Event: zWebsocketAddChannel, Channel: channelToSubscribe.Channel, } - return z.WebsocketConn.SendMessage(subscriptionRequest) + return z.WebsocketConn.SendJSONMessage(subscriptionRequest) } func (z *ZB) wsGenerateSignature(request interface{}) string {