From c2a33300f5d183fe9b892c5ad329d4d9cc96b685 Mon Sep 17 00:00:00 2001 From: Scott Date: Wed, 2 Oct 2019 09:06:52 +1000 Subject: [PATCH] Feature+Bugfix: Engine websocket management (#360) * Initial commit tearing down the websocket connection management. The purpose is to remove the traffic monitoring and dropping as syncer.go is a better manager * Adds a readwrite mutex and helper functions to minimise inline lock/unlocks and prevent races * Creates new WebsocketType struct to contain all parameters required. Deletes WebsocketReset. Utilises ReadMessageErrors channel for all websocket readmessages to analyse when an error returned is due to a disconnect * Fixes issue with syncer trying to connect while connecting * Simplifies initialisation function for websocket. Reconnects and resubscribes after disconnection * Adds WebsocketTimeout config value to dictate when the websocket traffic monitor should die. Default to two minutes of no traffic activity. Increases test coverage and updates existing tests to work with new technologic. RE-ADDS TESTS I ACCIDENTALLY DELETED FROM PREVIOUS PR * Removes snapshot override as its always necessary when considering reconnections. Increases test coverage. Re-adds tests that were ACCIDENTALLY DELETED. Removes unused websocket channels. Bug fix for traffic monitor to shutdown via goroutine instead of killing itself * Fixes gateio bug for authentication errors when null. Adds little entry to syncer for when websocket is switched to rest and then back, you get a log notifying of the return. Fixes okgroup bug where ws message is sent on a disconnected ws, causing panic. Renames setConnectionStatus to setConnectedStatus. Puts connection monitor log behind verbose bool * Fixes lingering races. Fixes bug where websocket was enabled whether you liked it or not. Removes demonstration test * Fixes log message, renames unc, removes comments * Fixes data race * Removes verbosity, ensures shutdown sets connection status appropriately * Removes go routine causing CPU spike. Stops timers properly and resets timers properly * Renames `WsEnabled` to `Enabled`. Increases test coverage. Fixes typos. Handles unhandled errors * The forgotten lint * With using RWlocks, removes the channel nil check and relies on !w.IsConnected() to prevent a shutdown from recurring * Removes extra closure step in the defer as it causes all the issues * Prevents timer channel hangups. Minimises use of websocket Connect(). Expands disconnection error definition. Removes routine disconnection error handling. Ensures only one traffic monitor can ever be run. Renames subscriptionLock to subscriptionMutext for consistency * Extends timeout to 30 seconds to cover for non-popular exchanges and non-popular currencies * Updates test from rebase to use new websocket setup function * Fixes test to ensure it tests what it says it does --- config/config.go | 6 + config/config_test.go | 5 + config/config_types.go | 1 + engine/exchange.go | 1 - engine/routines.go | 36 +- engine/syncer.go | 13 +- exchanges/alphapoint/alphapoint_websocket.go | 1 + exchanges/binance/binance_websocket.go | 8 +- exchanges/binance/binance_wrapper.go | 21 +- exchanges/bitfinex/bitfinex_websocket.go | 5 +- exchanges/bitfinex/bitfinex_wrapper.go | 22 +- exchanges/bitmex/bitmex_websocket.go | 7 +- exchanges/bitmex/bitmex_wrapper.go | 22 +- exchanges/bitstamp/bitstamp_websocket.go | 6 +- exchanges/bitstamp/bitstamp_wrapper.go | 22 +- exchanges/btse/btse_websocket.go | 4 +- exchanges/btse/btse_wrapper.go | 22 +- .../coinbasepro/coinbasepro_websocket.go | 4 +- exchanges/coinbasepro/coinbasepro_wrapper.go | 22 +- exchanges/coinut/coinut_websocket.go | 4 +- exchanges/coinut/coinut_wrapper.go | 22 +- exchanges/exchange.go | 2 +- exchanges/exchange_test.go | 5 +- exchanges/gateio/gateio_types.go | 2 +- exchanges/gateio/gateio_websocket.go | 5 +- exchanges/gateio/gateio_wrapper.go | 22 +- exchanges/gemini/gemini_websocket.go | 3 +- exchanges/gemini/gemini_wrapper.go | 20 +- exchanges/hitbtc/hitbtc_websocket.go | 4 +- exchanges/hitbtc/hitbtc_wrapper.go | 22 +- exchanges/huobi/huobi_websocket.go | 2 +- exchanges/huobi/huobi_wrapper.go | 22 +- exchanges/kraken/kraken_websocket.go | 10 +- exchanges/kraken/kraken_wrapper.go | 22 +- exchanges/lakebtc/lakebtc_websocket.go | 2 +- exchanges/lakebtc/lakebtc_wrapper.go | 21 +- exchanges/okgroup/okgroup_websocket.go | 7 +- exchanges/okgroup/okgroup_wrapper.go | 21 +- exchanges/poloniex/poloniex_websocket.go | 4 +- exchanges/poloniex/poloniex_wrapper.go | 22 +- exchanges/websocket/wshandler/wshandler.go | 527 +++++++-------- .../websocket/wshandler/wshandler_test.go | 606 ++++++++++++++---- .../websocket/wshandler/wshandler_types.go | 100 +-- .../websocket/wsorderbook/wsorderbook.go | 9 +- .../websocket/wsorderbook/wsorderbook_test.go | 27 +- exchanges/zb/zb_websocket.go | 5 +- exchanges/zb/zb_wrapper.go | 21 +- 47 files changed, 1079 insertions(+), 688 deletions(-) diff --git a/config/config.go b/config/config.go index 3bb42c567ae..d197ec35c93 100644 --- a/config/config.go +++ b/config/config.go @@ -41,6 +41,7 @@ const ( configDefaultWebsocketResponseCheckTimeout = time.Millisecond * 30 configDefaultWebsocketResponseMaxLimit = time.Second * 7 configDefaultWebsocketOrderbookBufferLimit = 5 + configDefaultWebsocketTrafficTimeout = time.Second * 30 configMaxAuthFailures = 3 defaultNTPAllowedDifference = 50000000 defaultNTPAllowedNegativeDifference = 50000000 @@ -1024,6 +1025,11 @@ func (c *Config) CheckExchangeConfigValues() error { c.Exchanges[i].Name, configDefaultWebsocketResponseMaxLimit) c.Exchanges[i].WebsocketResponseMaxLimit = configDefaultWebsocketResponseMaxLimit } + if c.Exchanges[i].WebsocketTrafficTimeout <= 0 { + log.Warnf(log.ExchangeSys, "Exchange %s Websocket response traffic timeout value not set, defaulting to %v.", + c.Exchanges[i].Name, configDefaultWebsocketTrafficTimeout) + c.Exchanges[i].WebsocketTrafficTimeout = configDefaultWebsocketTrafficTimeout + } if c.Exchanges[i].WebsocketOrderbookBufferLimit <= 0 { log.Warnf(log.ExchangeSys, "Exchange %s Websocket orderbook buffer limit value not set, defaulting to %v.", c.Exchanges[i].Name, configDefaultWebsocketOrderbookBufferLimit) diff --git a/config/config_test.go b/config/config_test.go index 419e19517d7..65249bfd4c2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1451,6 +1451,7 @@ func TestCheckExchangeConfigValues(t *testing.T) { cfg.Exchanges[0].WebsocketResponseMaxLimit = 0 cfg.Exchanges[0].WebsocketResponseCheckTimeout = 0 cfg.Exchanges[0].WebsocketOrderbookBufferLimit = 0 + cfg.Exchanges[0].WebsocketTrafficTimeout = 0 cfg.Exchanges[0].HTTPTimeout = 0 err = cfg.CheckExchangeConfigValues() if err != nil { @@ -1465,6 +1466,10 @@ func TestCheckExchangeConfigValues(t *testing.T) { t.Errorf("expected exchange %s to have updated WebsocketOrderbookBufferLimit value", cfg.Exchanges[0].Name) } + if cfg.Exchanges[0].WebsocketTrafficTimeout == 0 { + t.Errorf("expected exchange %s to have updated WebsocketTrafficTimeout value", + cfg.Exchanges[0].Name) + } if cfg.Exchanges[0].HTTPTimeout == 0 { t.Errorf("expected exchange %s to have updated HTTPTimeout value", cfg.Exchanges[0].Name) diff --git a/config/config_types.go b/config/config_types.go index 33ea91f5030..5c07f39e66a 100644 --- a/config/config_types.go +++ b/config/config_types.go @@ -59,6 +59,7 @@ type ExchangeConfig struct { HTTPRateLimiter *HTTPRateLimitConfig `json:"httpRateLimiter,omitempty"` WebsocketResponseCheckTimeout time.Duration `json:"websocketResponseCheckTimeout"` WebsocketResponseMaxLimit time.Duration `json:"websocketResponseMaxLimit"` + WebsocketTrafficTimeout time.Duration `json:"websocketTrafficTimeout"` WebsocketOrderbookBufferLimit int `json:"websocketOrderbookBufferLimit"` ProxyAddress string `json:"proxyAddress,omitempty"` BaseCurrencies currency.Currencies `json:"baseCurrencies"` diff --git a/engine/exchange.go b/engine/exchange.go index 3faaa836e9a..5da34cdfab7 100644 --- a/engine/exchange.go +++ b/engine/exchange.go @@ -228,7 +228,6 @@ func LoadExchange(name string, useWG bool, wg *sync.WaitGroup) error { if exchCfg.Features.Supports.RESTCapabilities.AutoPairUpdates { exchCfg.Features.Enabled.AutoPairUpdates = false } - } } diff --git a/engine/routines.go b/engine/routines.go index 29466566031..bcbc6902ab1 100644 --- a/engine/routines.go +++ b/engine/routines.go @@ -354,39 +354,12 @@ func Websocketshutdown(ws *wshandler.Websocket) error { } } -// streamDiversion is a diversion switch from websocket to REST or other -// alternative feed -func streamDiversion(ws *wshandler.Websocket) { - wg.Add(1) - defer wg.Done() - - for { - select { - case <-shutdowner: - return - - case <-ws.Connected: - if Bot.Settings.Verbose { - log.Debugf(log.WebsocketMgr, "exchange %s websocket feed connected\n", ws.GetName()) - } - - case <-ws.Disconnected: - if Bot.Settings.Verbose { - log.Debugf(log.WebsocketMgr, "exchange %s websocket feed disconnected, switching to REST functionality\n", - ws.GetName()) - } - } - } -} - // WebsocketDataHandler handles websocket data coming from a websocket feed // associated with an exchange func WebsocketDataHandler(ws *wshandler.Websocket) { wg.Add(1) defer wg.Done() - go streamDiversion(ws) - for { select { case <-shutdowner: @@ -407,14 +380,7 @@ func WebsocketDataHandler(ws *wshandler.Websocket) { } case error: - switch { - case strings.Contains(d.Error(), "close 1006"): - go ws.WebsocketReset() - continue - default: - log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), data) - } - + log.Errorf(log.WebsocketMgr, "routines.go exchange %s websocket error - %s", ws.GetName(), data) case wshandler.TradeData: // Trade Data // if Bot.Settings.Verbose { diff --git a/engine/syncer.go b/engine/syncer.go index 3cb15be223b..598661f53c0 100644 --- a/engine/syncer.go +++ b/engine/syncer.go @@ -286,7 +286,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { supportsRESTTickerBatching := Bot.Exchanges[x].SupportsRESTTickerBatchUpdates() var usingREST bool var usingWebsocket bool - + var switchedToRest bool if Bot.Exchanges[x].SupportsWebsocket() && Bot.Exchanges[x].IsWebsocketEnabled() { ws, err := Bot.Exchanges[x].GetWebsocket() if err != nil { @@ -346,7 +346,12 @@ func (e *ExchangeCurrencyPairSyncer) worker() { log.Errorf(log.SyncMgr, "failed to get item. Err: %s\n", err) continue } - + if switchedToRest && usingWebsocket { + log.Infof(log.SyncMgr, + "%s %s: Websocket re-enabled, switching from rest to websocket\n", + c.Exchange, FormatCurrency(p).String()) + switchedToRest = false + } if e.Cfg.SyncTicker { if !e.isProcessing(exchangeName, c.Pair, c.AssetType, SyncItemTicker) { if c.Ticker.LastUpdated.IsZero() || time.Since(c.Ticker.LastUpdated) > defaultSyncerTimeout { @@ -362,6 +367,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { log.Warnf(log.SyncMgr, "%s %s: No ticker update after 10 seconds, switching from websocket to rest\n", c.Exchange, FormatCurrency(p).String()) + switchedToRest = true e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemTicker, false) } } @@ -425,6 +431,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() { log.Warnf(log.SyncMgr, "%s %s: No orderbook update after 15 seconds, switching from websocket to rest\n", c.Exchange, FormatCurrency(c.Pair).String()) + switchedToRest = true e.setProcessing(c.Exchange, c.Pair, c.AssetType, SyncItemOrderbook, false) } } @@ -491,7 +498,7 @@ func (e *ExchangeCurrencyPairSyncer) Start() { usingREST = true } - if !ws.IsConnected() { + if !ws.IsConnected() && !ws.IsConnecting() { go WebsocketDataHandler(ws) err = ws.Connect() diff --git a/exchanges/alphapoint/alphapoint_websocket.go b/exchanges/alphapoint/alphapoint_websocket.go index 58c6803afec..05f389b2186 100644 --- a/exchanges/alphapoint/alphapoint_websocket.go +++ b/exchanges/alphapoint/alphapoint_websocket.go @@ -38,6 +38,7 @@ func (a *Alphapoint) WebsocketClient() { for a.Enabled { msgType, resp, err := a.WebsocketConn.ReadMessage() if err != nil { + a.Websocket.ReadMessageErrors <- err log.Error(log.ExchangeSys, err) break } diff --git a/exchanges/binance/binance_websocket.go b/exchanges/binance/binance_websocket.go index e8b24c3ba7c..ec48658bd2b 100644 --- a/exchanges/binance/binance_websocket.go +++ b/exchanges/binance/binance_websocket.go @@ -21,8 +21,8 @@ const ( binanceDefaultWebsocketURL = "wss://stream.binance.com:9443" ) -// WSConnect intiates a websocket connection -func (b *Binance) WSConnect() error { +// WsConnect intiates a websocket connection +func (b *Binance) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { return errors.New(wshandler.WebsocketNotEnabled) } @@ -87,7 +87,7 @@ func (b *Binance) WsHandleData() { default: read, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -248,7 +248,7 @@ func (b *Binance) SeedLocalCache(p currency.Pair) error { newOrderBook.Pair = p newOrderBook.AssetType = asset.Spot - return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + return b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // UpdateLocalCache updates and returns the most recent iteration of the orderbook diff --git a/exchanges/binance/binance_wrapper.go b/exchanges/binance/binance_wrapper.go index 707b9858bd3..442dcfb8a8b 100644 --- a/exchanges/binance/binance_wrapper.go +++ b/exchanges/binance/binance_wrapper.go @@ -113,15 +113,18 @@ func (b *Binance) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WSConnect, - nil, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - binanceDefaultWebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: binanceDefaultWebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + }) + if err != nil { return err } diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 557553e54f3..b9b5f569042 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -133,6 +133,7 @@ func (b *Bitfinex) WsConnect() error { resp, err := b.WebsocketConn.ReadMessage() if err != nil { + b.Websocket.ReadMessageErrors <- err return fmt.Errorf("%v unable to read from Websocket. Error: %s", b.Name, err) } b.Websocket.TrafficAlert <- struct{}{} @@ -177,7 +178,7 @@ func (b *Bitfinex) WsDataHandler() { default: stream, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -481,7 +482,7 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books newOrderBook.AssetType = assetType newOrderBook.Bids = bid newOrderBook.Pair = p - err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return fmt.Errorf("bitfinex.go error - %s", err) } diff --git a/exchanges/bitfinex/bitfinex_wrapper.go b/exchanges/bitfinex/bitfinex_wrapper.go index 4fbe2455dc9..1a8229edb24 100644 --- a/exchanges/bitfinex/bitfinex_wrapper.go +++ b/exchanges/bitfinex/bitfinex_wrapper.go @@ -114,15 +114,19 @@ func (b *Bitfinex) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnect, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - bitfinexWebsocket, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: bitfinexWebsocket, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/bitmex/bitmex_websocket.go b/exchanges/bitmex/bitmex_websocket.go index 4af5f0f933b..3524550cdc9 100644 --- a/exchanges/bitmex/bitmex_websocket.go +++ b/exchanges/bitmex/bitmex_websocket.go @@ -66,8 +66,8 @@ var ( pongChan = make(chan int, 1) ) -// WsConnector initiates a new websocket connection -func (b *Bitmex) WsConnector() error { +// WsConnect initiates a new websocket connection +func (b *Bitmex) WsConnect() error { if !b.Websocket.IsEnabled() || !b.IsEnabled() { return errors.New(wshandler.WebsocketNotEnabled) } @@ -79,6 +79,7 @@ func (b *Bitmex) WsConnector() error { p, err := b.WebsocketConn.ReadMessage() if err != nil { + b.Websocket.ReadMessageErrors <- err return err } b.Websocket.TrafficAlert <- struct{}{} @@ -360,7 +361,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, currencyPai newOrderBook.Bids = bids newOrderBook.AssetType = assetType newOrderBook.Pair = currencyPair - err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err := b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return fmt.Errorf("bitmex_websocket.go process orderbook error - %s", err) diff --git a/exchanges/bitmex/bitmex_wrapper.go b/exchanges/bitmex/bitmex_wrapper.go index 167be7e4000..68e0a0169da 100644 --- a/exchanges/bitmex/bitmex_wrapper.go +++ b/exchanges/bitmex/bitmex_wrapper.go @@ -137,15 +137,19 @@ func (b *Bitmex) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnector, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - bitmexWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: bitmexWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/bitstamp/bitstamp_websocket.go b/exchanges/bitstamp/bitstamp_websocket.go index 09d06ac49d0..5260dfd26f3 100644 --- a/exchanges/bitstamp/bitstamp_websocket.go +++ b/exchanges/bitstamp/bitstamp_websocket.go @@ -62,7 +62,7 @@ func (b *Bitstamp) WsHandleData() { default: resp, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -78,7 +78,7 @@ func (b *Bitstamp) WsHandleData() { if b.Verbose { log.Debugf(log.ExchangeSys, "%v - Websocket reconnection request received", b.GetName()) } - go b.Websocket.WebsocketReset() + go b.Websocket.Shutdown() // Connection monitor will reconnect case "data": wsOrderBookTemp := websocketOrderBookResponse{} @@ -248,7 +248,7 @@ func (b *Bitstamp) seedOrderBook() error { newOrderBook.Pair = p[x] newOrderBook.AssetType = asset.Spot - err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err = b.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/bitstamp/bitstamp_wrapper.go b/exchanges/bitstamp/bitstamp_wrapper.go index 790ca785a1a..5f8cdc9f5a0 100644 --- a/exchanges/bitstamp/bitstamp_wrapper.go +++ b/exchanges/bitstamp/bitstamp_wrapper.go @@ -110,15 +110,19 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnect, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - bitstampWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: bitstampWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/btse/btse_websocket.go b/exchanges/btse/btse_websocket.go index 1ce85e65ddd..b5476a451a2 100644 --- a/exchanges/btse/btse_websocket.go +++ b/exchanges/btse/btse_websocket.go @@ -54,7 +54,7 @@ func (b *BTSE) WsHandleData() { default: resp, err := b.WebsocketConn.ReadMessage() if err != nil { - b.Websocket.DataHandler <- err + b.Websocket.ReadMessageErrors <- err return } b.Websocket.TrafficAlert <- struct{}{} @@ -162,7 +162,7 @@ func (b *BTSE) wsProcessSnapshot(snapshot *websocketOrderbookSnapshot) error { base.LastUpdated = time.Now() base.ExchangeName = b.Name - err := b.Websocket.Orderbook.LoadSnapshot(&base, true) + err := b.Websocket.Orderbook.LoadSnapshot(&base) if err != nil { return err } diff --git a/exchanges/btse/btse_wrapper.go b/exchanges/btse/btse_wrapper.go index 32a102beb38..3cec42186c0 100644 --- a/exchanges/btse/btse_wrapper.go +++ b/exchanges/btse/btse_wrapper.go @@ -109,15 +109,19 @@ func (b *BTSE) Setup(exch *config.ExchangeConfig) error { return err } - err = b.Websocket.Setup(b.WsConnect, - b.Subscribe, - b.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - btseWebsocket, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = b.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: btseWebsocket, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: b.WsConnect, + Subscriber: b.Subscribe, + UnSubscriber: b.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 74c3b9d0b1c..ff34058e2b4 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -54,7 +54,7 @@ func (c *CoinbasePro) WsHandleData() { default: resp, err := c.WebsocketConn.ReadMessage() if err != nil { - c.Websocket.DataHandler <- err + c.Websocket.ReadMessageErrors <- err return } c.Websocket.TrafficAlert <- struct{}{} @@ -217,7 +217,7 @@ func (c *CoinbasePro) ProcessSnapshot(snapshot *WebsocketOrderbookSnapshot) erro base.AssetType = asset.Spot base.Pair = pair - err := c.Websocket.Orderbook.LoadSnapshot(&base, false) + err := c.Websocket.Orderbook.LoadSnapshot(&base) if err != nil { return err } diff --git a/exchanges/coinbasepro/coinbasepro_wrapper.go b/exchanges/coinbasepro/coinbasepro_wrapper.go index 3c20797263f..3079db1d1ab 100644 --- a/exchanges/coinbasepro/coinbasepro_wrapper.go +++ b/exchanges/coinbasepro/coinbasepro_wrapper.go @@ -115,15 +115,19 @@ func (c *CoinbasePro) Setup(exch *config.ExchangeConfig) error { return err } - err = c.Websocket.Setup(c.WsConnect, - c.Subscribe, - c.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - coinbaseproWebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = c.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: coinbaseproWebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: c.WsConnect, + Subscriber: c.Subscribe, + UnSubscriber: c.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/coinut/coinut_websocket.go b/exchanges/coinut/coinut_websocket.go index bae03986043..d12e03aa9d5 100644 --- a/exchanges/coinut/coinut_websocket.go +++ b/exchanges/coinut/coinut_websocket.go @@ -77,7 +77,7 @@ func (c *COINUT) WsHandleData() { default: resp, err := c.WebsocketConn.ReadMessage() if err != nil { - c.Websocket.DataHandler <- err + c.Websocket.ReadMessageErrors <- err return } c.Websocket.TrafficAlert <- struct{}{} @@ -289,7 +289,7 @@ func (c *COINUT) WsProcessOrderbookSnapshot(ob *WsOrderbookSnapshot) error { ) newOrderBook.AssetType = asset.Spot - return c.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + return c.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // WsProcessOrderbookUpdate process an orderbook update diff --git a/exchanges/coinut/coinut_wrapper.go b/exchanges/coinut/coinut_wrapper.go index 78e601426a6..08cc097cccc 100644 --- a/exchanges/coinut/coinut_wrapper.go +++ b/exchanges/coinut/coinut_wrapper.go @@ -116,15 +116,19 @@ func (c *COINUT) Setup(exch *config.ExchangeConfig) error { return err } - err = c.Websocket.Setup(c.WsConnect, - c.Subscribe, - c.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - coinutWebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = c.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: coinutWebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: c.WsConnect, + Subscriber: c.Subscribe, + UnSubscriber: c.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 59f3a9f878a..f218aa4accd 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -449,7 +449,7 @@ func (e *Base) SetupDefaults(exch *config.ExchangeConfig) error { } if e.Features.Supports.Websocket { - e.Websocket.SetWsStatusAndConnection(exch.Features.Enabled.Websocket) + e.Websocket.Initialise() } return nil } diff --git a/exchanges/exchange_test.go b/exchanges/exchange_test.go index 96074091c3a..ca3060dafb9 100644 --- a/exchanges/exchange_test.go +++ b/exchanges/exchange_test.go @@ -1233,7 +1233,10 @@ func TestIsWebsocketEnabled(t *testing.T) { } b.Websocket = wshandler.New() - b.Websocket.Setup(nil, nil, nil, "", true, false, "", "", false) + err := b.Websocket.Setup(&wshandler.WebsocketSetup{Enabled: true}) + if err != nil { + t.Error(err) + } if !b.IsWebsocketEnabled() { t.Error("websocket should be enabled") } diff --git a/exchanges/gateio/gateio_types.go b/exchanges/gateio/gateio_types.go index 585bf48f9cb..8336d8e58de 100644 --- a/exchanges/gateio/gateio_types.go +++ b/exchanges/gateio/gateio_types.go @@ -462,7 +462,7 @@ type WebSocketOrderQueryRecords struct { // WebsocketAuthenticationResponse contains the result of a login request type WebsocketAuthenticationResponse struct { - Error string `json:"error"` + Error string `json:"error,omitempty"` Result struct { Status string `json:"status"` } `json:"result"` diff --git a/exchanges/gateio/gateio_websocket.go b/exchanges/gateio/gateio_websocket.go index 60853d4a8de..8bcb8b76fad 100644 --- a/exchanges/gateio/gateio_websocket.go +++ b/exchanges/gateio/gateio_websocket.go @@ -92,7 +92,7 @@ func (g *Gateio) WsHandleData() { default: resp, err := g.WebsocketConn.ReadMessage() if err != nil { - g.Websocket.DataHandler <- err + g.Websocket.ReadMessageErrors <- err return } g.Websocket.TrafficAlert <- struct{}{} @@ -238,8 +238,7 @@ func (g *Gateio) WsHandleData() { newOrderBook.AssetType = asset.Spot newOrderBook.Pair = currency.NewPairFromString(c) - err = g.Websocket.Orderbook.LoadSnapshot(&newOrderBook, - true) + err = g.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { g.Websocket.DataHandler <- err } diff --git a/exchanges/gateio/gateio_wrapper.go b/exchanges/gateio/gateio_wrapper.go index 7bd1db10992..d3d602a0513 100644 --- a/exchanges/gateio/gateio_wrapper.go +++ b/exchanges/gateio/gateio_wrapper.go @@ -117,15 +117,19 @@ func (g *Gateio) Setup(exch *config.ExchangeConfig) error { return err } - err = g.Websocket.Setup(g.WsConnect, - g.Subscribe, - g.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - gateioWebsocketEndpoint, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = g.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: gateioWebsocketEndpoint, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: g.WsConnect, + Subscriber: g.Subscribe, + UnSubscriber: g.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/gemini/gemini_websocket.go b/exchanges/gemini/gemini_websocket.go index fc64e05003e..9d47d9c2351 100644 --- a/exchanges/gemini/gemini_websocket.go +++ b/exchanges/gemini/gemini_websocket.go @@ -282,8 +282,7 @@ func (g *Gemini) wsProcessUpdate(result WsMarketUpdateResponse, pair currency.Pa newOrderBook.Bids = bids newOrderBook.AssetType = asset.Spot newOrderBook.Pair = pair - err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook, - false) + err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { g.Websocket.DataHandler <- err return diff --git a/exchanges/gemini/gemini_wrapper.go b/exchanges/gemini/gemini_wrapper.go index 0e4d76f9251..dd2338a9e78 100644 --- a/exchanges/gemini/gemini_wrapper.go +++ b/exchanges/gemini/gemini_wrapper.go @@ -116,15 +116,17 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) error { g.API.Endpoints.URL = geminiSandboxAPIURL } - err = g.Websocket.Setup(g.WsConnect, - nil, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - geminiWebsocketEndpoint, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = g.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: geminiWebsocketEndpoint, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: g.WsConnect, + }) if err != nil { return err } diff --git a/exchanges/hitbtc/hitbtc_websocket.go b/exchanges/hitbtc/hitbtc_websocket.go index 4e058a0512b..5cfda89ab94 100644 --- a/exchanges/hitbtc/hitbtc_websocket.go +++ b/exchanges/hitbtc/hitbtc_websocket.go @@ -65,7 +65,7 @@ func (h *HitBTC) WsHandleData() { default: resp, err := h.WebsocketConn.ReadMessage() if err != nil { - h.Websocket.DataHandler <- err + h.Websocket.ReadMessageErrors <- err return } h.Websocket.TrafficAlert <- struct{}{} @@ -251,7 +251,7 @@ func (h *HitBTC) WsProcessOrderbookSnapshot(ob WsOrderbook) error { newOrderBook.AssetType = asset.Spot newOrderBook.Pair = p - err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/hitbtc/hitbtc_wrapper.go b/exchanges/hitbtc/hitbtc_wrapper.go index a7cc2534f83..f1fcaa908aa 100644 --- a/exchanges/hitbtc/hitbtc_wrapper.go +++ b/exchanges/hitbtc/hitbtc_wrapper.go @@ -115,15 +115,19 @@ func (h *HitBTC) Setup(exch *config.ExchangeConfig) error { return err } - err = h.Websocket.Setup(h.WsConnect, - h.Subscribe, - h.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - hitbtcWebsocketAddress, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = h.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: hitbtcWebsocketAddress, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: h.WsConnect, + Subscriber: h.Subscribe, + UnSubscriber: h.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/huobi/huobi_websocket.go b/exchanges/huobi/huobi_websocket.go index ce063bd570c..fb5897ec18e 100644 --- a/exchanges/huobi/huobi_websocket.go +++ b/exchanges/huobi/huobi_websocket.go @@ -313,7 +313,7 @@ func (h *HUOBI) WsProcessOrderbook(update *WsDepth, symbol string) error { newOrderBook.Asks = asks newOrderBook.Bids = bids newOrderBook.Pair = p - err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook, true) + err := h.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/huobi/huobi_wrapper.go b/exchanges/huobi/huobi_wrapper.go index f2678e9e621..8aa6ccce379 100644 --- a/exchanges/huobi/huobi_wrapper.go +++ b/exchanges/huobi/huobi_wrapper.go @@ -119,15 +119,19 @@ func (h *HUOBI) Setup(exch *config.ExchangeConfig) error { h.API.PEMKeySupport = exch.API.PEMKeySupport h.API.Credentials.PEMKey = exch.API.Credentials.PEMKey - err = h.Websocket.Setup(h.WsConnect, - h.Subscribe, - h.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - wsMarketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = h.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: wsMarketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: h.WsConnect, + Subscriber: h.Subscribe, + UnSubscriber: h.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index 7d8559662e1..025d5aea6c2 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -110,9 +110,7 @@ func (k *Kraken) WsHandleData() { default: resp, err := k.WebsocketConn.ReadMessage() if err != nil { - k.Websocket.DataHandler <- fmt.Errorf("%v WsHandleData: %v", - k.Name, - err) + k.Websocket.ReadMessageErrors <- err return } k.Websocket.TrafficAlert <- struct{}{} @@ -384,7 +382,7 @@ func (k *Kraken) wsProcessOrderBookPartial(channelData *WebsocketChannelData, ob } } base.LastUpdated = highestLastUpdate - err := k.Websocket.Orderbook.LoadSnapshot(&base, true) + err := k.Websocket.Orderbook.LoadSnapshot(&base) if err != nil { k.Websocket.DataHandler <- err return @@ -509,7 +507,7 @@ func (k *Kraken) Subscribe(channelToSubscribe wshandler.WebsocketChannelSubscrip Subscription: WebsocketSubscriptionData{ Name: channelToSubscribe.Channel, }, - RequestID: k.WebsocketConn.GenerateMessageID(true), + RequestID: k.WebsocketConn.GenerateMessageID(false), } _, err := k.WebsocketConn.SendMessageReturnResponse(resp.RequestID, resp) return err @@ -523,7 +521,7 @@ func (k *Kraken) Unsubscribe(channelToSubscribe wshandler.WebsocketChannelSubscr Subscription: WebsocketSubscriptionData{ Name: channelToSubscribe.Channel, }, - RequestID: k.WebsocketConn.GenerateMessageID(true), + RequestID: k.WebsocketConn.GenerateMessageID(false), } _, err := k.WebsocketConn.SendMessageReturnResponse(resp.RequestID, resp) return err diff --git a/exchanges/kraken/kraken_wrapper.go b/exchanges/kraken/kraken_wrapper.go index 5d4912b6a6d..3763a672241 100644 --- a/exchanges/kraken/kraken_wrapper.go +++ b/exchanges/kraken/kraken_wrapper.go @@ -119,15 +119,19 @@ func (k *Kraken) Setup(exch *config.ExchangeConfig) error { return err } - err = k.Websocket.Setup(k.WsConnect, - k.Subscribe, - k.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - krakenWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = k.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: krakenWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: k.WsConnect, + Subscriber: k.Subscribe, + UnSubscriber: k.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/lakebtc/lakebtc_websocket.go b/exchanges/lakebtc/lakebtc_websocket.go index 690e2f410c8..3b1abae5b56 100644 --- a/exchanges/lakebtc/lakebtc_websocket.go +++ b/exchanges/lakebtc/lakebtc_websocket.go @@ -205,7 +205,7 @@ func (l *LakeBTC) processOrderbook(obUpdate, channel string) error { Price: price, }) } - return l.Websocket.Orderbook.LoadSnapshot(&book, true) + return l.Websocket.Orderbook.LoadSnapshot(&book) } func (l *LakeBTC) getCurrencyFromChannel(channel string) currency.Pair { diff --git a/exchanges/lakebtc/lakebtc_wrapper.go b/exchanges/lakebtc/lakebtc_wrapper.go index f865d0e6101..c7c9be6833e 100644 --- a/exchanges/lakebtc/lakebtc_wrapper.go +++ b/exchanges/lakebtc/lakebtc_wrapper.go @@ -110,15 +110,18 @@ func (l *LakeBTC) Setup(exch *config.ExchangeConfig) error { return err } - err = l.Websocket.Setup(l.WsConnect, - l.Subscribe, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - lakeBTCWSURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = l.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: lakeBTCWSURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: l.WsConnect, + Subscriber: l.Subscribe, + }) if err != nil { return err } diff --git a/exchanges/okgroup/okgroup_websocket.go b/exchanges/okgroup/okgroup_websocket.go index 558252d6263..fe10fee83a6 100644 --- a/exchanges/okgroup/okgroup_websocket.go +++ b/exchanges/okgroup/okgroup_websocket.go @@ -193,6 +193,9 @@ func (o *OKGroup) wsPingHandler(wg *sync.WaitGroup) { return case <-ticker.C: + if !o.Websocket.IsConnected() { + continue + } err := o.WebsocketConn.Connection.WriteMessage(websocket.TextMessage, []byte("ping")) if o.Verbose { log.Debugf(log.ExchangeSys, "%v sending ping", o.GetName()) @@ -221,7 +224,7 @@ func (o *OKGroup) WsHandleData(wg *sync.WaitGroup) { default: resp, err := o.WebsocketConn.ReadMessage() if err != nil { - o.Websocket.DataHandler <- err + o.Websocket.ReadMessageErrors <- err return } o.Websocket.TrafficAlert <- struct{}{} @@ -475,7 +478,7 @@ func (o *OKGroup) WsProcessPartialOrderBook(wsEventData *WebsocketDataWrapper, i ExchangeName: o.GetName(), } - err := o.Websocket.Orderbook.LoadSnapshot(&newOrderBook, true) + err := o.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { return err } diff --git a/exchanges/okgroup/okgroup_wrapper.go b/exchanges/okgroup/okgroup_wrapper.go index 863af600484..a0ae6828db1 100644 --- a/exchanges/okgroup/okgroup_wrapper.go +++ b/exchanges/okgroup/okgroup_wrapper.go @@ -31,15 +31,18 @@ func (o *OKGroup) Setup(exch *config.ExchangeConfig) error { return err } - err = o.Websocket.Setup(o.WsConnect, - o.Subscribe, - o.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - o.API.Endpoints.WebsocketURL, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = o.Websocket.Setup(&wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: o.API.Endpoints.WebsocketURL, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: o.WsConnect, + Subscriber: o.Subscribe, + UnSubscriber: o.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index a6ef2ed19a4..5096241742c 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -88,7 +88,7 @@ func (p *Poloniex) WsHandleData() { default: resp, err := p.WebsocketConn.ReadMessage() if err != nil { - p.Websocket.DataHandler <- err + p.Websocket.ReadMessageErrors <- err return } p.Websocket.TrafficAlert <- struct{}{} @@ -330,7 +330,7 @@ func (p *Poloniex) WsProcessOrderbookSnapshot(ob []interface{}, symbol string) e newOrderBook.AssetType = asset.Spot newOrderBook.Pair = currency.NewPairFromString(symbol) - return p.Websocket.Orderbook.LoadSnapshot(&newOrderBook, false) + return p.Websocket.Orderbook.LoadSnapshot(&newOrderBook) } // WsProcessOrderbookUpdate processes new orderbook updates diff --git a/exchanges/poloniex/poloniex_wrapper.go b/exchanges/poloniex/poloniex_wrapper.go index 13611997e01..2febf6873bb 100644 --- a/exchanges/poloniex/poloniex_wrapper.go +++ b/exchanges/poloniex/poloniex_wrapper.go @@ -113,15 +113,19 @@ func (p *Poloniex) Setup(exch *config.ExchangeConfig) error { return err } - err = p.Websocket.Setup(p.WsConnect, - p.Subscribe, - p.Unsubscribe, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - poloniexWebsocketAddress, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = p.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: poloniexWebsocketAddress, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: p.WsConnect, + Subscriber: p.Subscribe, + UnSubscriber: p.Unsubscribe, + }) if err != nil { return err } diff --git a/exchanges/websocket/wshandler/wshandler.go b/exchanges/websocket/wshandler/wshandler.go index 931ee5c8055..1bc9502de3b 100644 --- a/exchanges/websocket/wshandler/wshandler.go +++ b/exchanges/websocket/wshandler/wshandler.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io/ioutil" + "net" "net/http" "net/url" "strings" @@ -31,42 +32,28 @@ func New() *Websocket { } // Setup sets main variables for websocket connection -func (w *Websocket) Setup(connector func() error, - subscriber func(channelToSubscribe WebsocketChannelSubscription) error, - unsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error, - exchangeName string, - wsEnabled, - verbose bool, - defaultURL, - runningURL string, - authenticatedWebsocketAPISupport bool) error { - +func (w *Websocket) Setup(setupData *WebsocketSetup) error { w.DataHandler = make(chan interface{}, 1) - w.Connected = make(chan struct{}, 1) - w.Disconnected = make(chan struct{}, 1) w.TrafficAlert = make(chan struct{}, 1) - w.verbose = verbose - - w.SetChannelSubscriber(subscriber) - w.SetChannelUnsubscriber(unsubscriber) - err := w.SetWsStatusAndConnection(wsEnabled) + w.verbose = setupData.Verbose + w.SetChannelSubscriber(setupData.Subscriber) + w.SetChannelUnsubscriber(setupData.UnSubscriber) + w.enabled = setupData.Enabled + w.SetDefaultURL(setupData.DefaultURL) + w.SetConnector(setupData.Connector) + w.SetWebsocketURL(setupData.RunningURL) + w.SetExchangeName(setupData.ExchangeName) + w.SetCanUseAuthenticatedEndpoints(setupData.AuthenticatedWebsocketAPISupport) + w.trafficTimeout = setupData.WebsocketTimeout + err := w.Initialise() if err != nil { return err } - w.SetDefaultURL(defaultURL) - w.SetConnector(connector) - w.SetWebsocketURL(runningURL) - w.SetExchangeName(exchangeName) - w.SetCanUseAuthenticatedEndpoints(authenticatedWebsocketAPISupport) - - w.init = false - w.noConnectionCheckLimit = 5 - w.reconnectionLimit = 10 return nil } -// Connect intiates a websocket connection by using a package defined connection +// Connect initiates a websocket connection by using a package defined connection // function func (w *Websocket) Connect() error { w.m.Lock() @@ -75,32 +62,33 @@ func (w *Websocket) Connect() error { if !w.IsEnabled() { return errors.New(WebsocketNotEnabled) } - - if w.connected { - w.connecting = false - return errors.New("exchange_websocket.go error - already connected, cannot connect again") + if w.IsConnecting() { + return fmt.Errorf("%v Websocket already attempting to connect", + w.exchangeName) } - - w.connecting = true + if w.IsConnected() { + return fmt.Errorf("%v Websocket already connected", + w.exchangeName) + } + w.setConnectingStatus(true) w.ShutdownC = make(chan struct{}, 1) + w.ReadMessageErrors = make(chan error, 1) err := w.connector() if err != nil { - w.connecting = false - return fmt.Errorf("exchange_websocket.go connection error %s", - err) + w.setConnectingStatus(false) + return fmt.Errorf("%v Error connecting %s", + w.exchangeName, err) } - if !w.connected { - w.Connected <- struct{}{} - w.connected = true - w.connecting = false - } + w.setConnectedStatus(true) + w.setConnectingStatus(false) + w.setInit(true) var anotherWG sync.WaitGroup anotherWG.Add(1) go w.trafficMonitor(&anotherWG) anotherWG.Wait() - if !w.connectionMonitorRunning { + if !w.IsConnectionMonitorRunning() { go w.connectionMonitor() } if w.SupportsFunctionality(WebsocketSubscribeSupported) || w.SupportsFunctionality(WebsocketUnsubscribeSupported) { @@ -112,88 +100,82 @@ func (w *Websocket) Connect() error { // connectionMonitor ensures that the WS keeps connecting func (w *Websocket) connectionMonitor() { - w.m.Lock() - w.connectionMonitorRunning = true - w.m.Unlock() + if w.IsConnectionMonitorRunning() { + return + } + w.setConnectionMonitorRunning(true) + timer := time.NewTimer(connectionMonitorDelay) + defer func() { - w.connectionMonitorRunning = false + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + w.setConnectionMonitorRunning(false) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v websocket connection monitor exiting", + w.exchangeName) + } }() for { - time.Sleep(connectionMonitorDelay) - w.m.Lock() - if !w.enabled { - w.m.Unlock() - w.DataHandler <- fmt.Errorf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName) - err := w.Shutdown() - if err != nil { - log.Error(log.WebsocketMgr, err) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v running connection monitor cycle", + w.exchangeName) + } + if !w.IsEnabled() { + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v connectionMonitor: websocket disabled, shutting down", w.exchangeName) + } + if w.IsConnected() { + err := w.Shutdown() + if err != nil { + log.Error(log.WebsocketMgr, err) + } } if w.verbose { - log.Debugf(log.WebsocketMgr, "%v connectionMonitor exiting", + log.Debugf(log.WebsocketMgr, "%v websocket connection monitor exiting", w.exchangeName) } return } - w.m.Unlock() - err := w.checkConnection() - if err != nil { - log.Error(log.WebsocketMgr, err) - } - } -} - -// checkConnection ensures the connection is maintained -// Will reconnect on disconnect -func (w *Websocket) checkConnection() error { - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v checking connection", w.exchangeName) - } - switch { - case !w.IsConnected() && !w.IsConnecting(): - w.m.Lock() - defer w.m.Unlock() - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v no connection. Attempt %v/%v", w.exchangeName, w.noConnectionChecks, w.noConnectionCheckLimit) - } - if w.noConnectionChecks >= w.noConnectionCheckLimit { - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v resetting connection", w.exchangeName) + select { + case err := <-w.ReadMessageErrors: + // check if this error is a disconnection error + if isDisconnectionError(err) { + w.setConnectedStatus(false) + w.setConnectingStatus(false) + w.setInit(false) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", + w.exchangeName, err) + } + err = w.Connect() + if err != nil { + log.Error(log.WebsocketMgr, err) + } + } else { + // pass off non disconnect errors to datahandler to manage + w.DataHandler <- err } - w.connecting = true - go w.WebsocketReset() - w.noConnectionChecks = 0 - } - w.noConnectionChecks++ - case w.IsConnecting(): - if w.reconnectionChecks >= w.reconnectionLimit { - return fmt.Errorf("%v websocket failed to reconnect after %v seconds", - w.exchangeName, - w.reconnectionLimit*int(connectionMonitorDelay.Seconds())) - } - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v Busy reconnecting", w.exchangeName) + case <-timer.C: + if !w.IsConnecting() && !w.IsConnected() { + err := w.Connect() + if err != nil { + log.Error(log.WebsocketMgr, err) + } + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(connectionMonitorDelay) } - w.reconnectionChecks++ - default: - w.noConnectionChecks = 0 - w.reconnectionChecks = 0 } - return nil -} - -// IsConnected exposes websocket connection status -func (w *Websocket) IsConnected() bool { - w.m.Lock() - defer w.m.Unlock() - return w.connected -} - -// IsConnecting checks whether websocket is busy connecting -func (w *Websocket) IsConnecting() bool { - w.m.Lock() - defer w.m.Unlock() - return w.connecting } // Shutdown attempts to shut down a websocket connection and associated routines @@ -204,124 +186,145 @@ func (w *Websocket) Shutdown() error { w.Orderbook.FlushCache() w.m.Unlock() }() - if !w.connected && w.ShutdownC == nil { + if !w.IsConnected() { return fmt.Errorf("%v cannot shutdown a disconnected websocket", w.exchangeName) } if w.verbose { log.Debugf(log.WebsocketMgr, "%v shutting down websocket channels", w.exchangeName) } - timer := time.NewTimer(15 * time.Second) - c := make(chan struct{}, 1) - - go func(c chan struct{}) { - close(w.ShutdownC) - w.Wg.Wait() - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v completed websocket channel shutdown", w.exchangeName) - } - c <- struct{}{} - }(c) - - select { - case <-c: - w.connected = false - return nil - case <-timer.C: - return fmt.Errorf("%s websocket routines failed to shutdown after 15 seconds", - w.GetName()) - } -} - -// WebsocketReset sends the shutdown command, waits for channel/func closure and then reconnects -func (w *Websocket) WebsocketReset() { - err := w.Shutdown() - if err != nil { - // does not return here to allow connection to be made if already shut down - w.DataHandler <- fmt.Errorf("%v shutdown error: %v", w.exchangeName, err) - } - log.Infof(log.WebsocketMgr, "%v reconnecting to websocket", w.exchangeName) - w.m.Lock() - w.init = true - w.m.Unlock() - err = w.Connect() - if err != nil { - w.DataHandler <- fmt.Errorf("%v connection error: %v", w.exchangeName, err) + close(w.ShutdownC) + w.Wg.Wait() + w.setConnectedStatus(false) + w.setConnectingStatus(false) + if w.verbose { + log.Debugf(log.WebsocketMgr, "%v completed websocket channel shutdown", w.exchangeName) } + return nil } -// trafficMonitor monitors traffic and switches connection modes for websocket +// trafficMonitor uses a timer of WebsocketTrafficLimitTime and once it expires +// Will reconnect if the TrafficAlert channel has not received any data +// The trafficTimer will reset on each traffic alert func (w *Websocket) trafficMonitor(wg *sync.WaitGroup) { w.Wg.Add(1) - wg.Done() // Makes sure we are unlocking after we add to waitgroup + wg.Done() + trafficTimer := time.NewTimer(w.trafficTimeout) defer func() { - if w.connected { - w.Disconnected <- struct{}{} + if !trafficTimer.Stop() { + select { + case <-trafficTimer.C: + default: + } } + w.setTrafficMonitorRunning(false) w.Wg.Done() }() - - // Define an initial traffic timer which will be a delay then fall over to - // WebsocketTrafficLimitTime after first response - trafficTimer := time.NewTimer(5 * time.Second) + if w.IsTrafficMonitorRunning() { + return + } + w.setTrafficMonitorRunning(true) for { select { - case <-w.ShutdownC: // Returns on shutdown channel close + case <-w.ShutdownC: if w.verbose { log.Debugf(log.WebsocketMgr, "%v trafficMonitor shutdown message received", w.exchangeName) } return - case <-w.TrafficAlert: // Resets timer on traffic - w.m.Lock() - if !w.connected { - w.Connected <- struct{}{} - w.connected = true + case <-w.TrafficAlert: + if !trafficTimer.Stop() { + select { + case <-trafficTimer.C: + default: + } } - w.m.Unlock() - trafficTimer.Reset(WebsocketTrafficLimitTime) + trafficTimer.Reset(w.trafficTimeout) case <-trafficTimer.C: // Falls through when timer runs out - newtimer := time.NewTimer(10 * time.Second) // New secondary timer set if w.verbose { - log.Debugf(log.WebsocketMgr, "%v has not received a traffic alert in 5 seconds.", w.exchangeName) - } - w.m.Lock() - if w.connected { - // If connected divert traffic to rest - w.Disconnected <- struct{}{} - w.connected = false - } - w.m.Unlock() - - select { - case <-w.ShutdownC: // Returns on shutdown channel close - w.m.Lock() - w.connected = false - w.m.Unlock() - return - - case <-newtimer.C: // If secondary timer runs state timeout is sent to the data handler - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v has not received a traffic alert in 15 seconds, exiting", w.exchangeName) - } - w.DataHandler <- fmt.Errorf("trafficMonitor %v", WebsocketStateTimeout) - return - - case <-w.TrafficAlert: // If in this time response traffic comes through - trafficTimer.Reset(WebsocketTrafficLimitTime) - w.m.Lock() - if !w.connected { - // If not connected dive rt traffic from REST to websocket - w.Connected <- struct{}{} - if w.verbose { - log.Debugf(log.WebsocketMgr, "%v has received a traffic alert. Setting status to connected", w.exchangeName) - } - w.connected = true - } - w.m.Unlock() + log.Warnf(log.WebsocketMgr, "%v has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout) } + go w.Shutdown() } } } +func (w *Websocket) setConnectedStatus(b bool) { + w.connectionMutex.Lock() + w.connected = b + w.connectionMutex.Unlock() +} + +// IsConnected returns status of connection +func (w *Websocket) IsConnected() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connected +} + +func (w *Websocket) setConnectingStatus(b bool) { + w.connectionMutex.Lock() + w.connecting = b + w.connectionMutex.Unlock() +} + +// IsConnecting returns status of connecting +func (w *Websocket) IsConnecting() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connecting +} + +func (w *Websocket) setEnabled(b bool) { + w.connectionMutex.Lock() + w.enabled = b + w.connectionMutex.Unlock() +} + +// IsEnabled returns status of enabled +func (w *Websocket) IsEnabled() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.enabled +} + +func (w *Websocket) setInit(b bool) { + w.connectionMutex.Lock() + w.init = b + w.connectionMutex.Unlock() +} + +// IsInit returns status of init +func (w *Websocket) IsInit() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.init +} + +func (w *Websocket) setTrafficMonitorRunning(b bool) { + w.connectionMutex.Lock() + w.trafficMonitorRunning = b + w.connectionMutex.Unlock() +} + +// IsTrafficMonitorRunning returns status of the traffic monitor +func (w *Websocket) IsTrafficMonitorRunning() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.trafficMonitorRunning +} + +func (w *Websocket) setConnectionMonitorRunning(b bool) { + w.connectionMutex.Lock() + w.connectionMonitorRunning = b + w.connectionMutex.Unlock() +} + +// IsConnectionMonitorRunning returns status of connection monitor +func (w *Websocket) IsConnectionMonitorRunning() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connectionMonitorRunning +} + // SetWebsocketURL sets websocket URL func (w *Websocket) SetWebsocketURL(websocketURL string) { if websocketURL == "" || websocketURL == config.WebsocketURLNonDefaultMessage { @@ -336,55 +339,28 @@ func (w *Websocket) GetWebsocketURL() string { return w.runningURL } -// SetWsStatusAndConnection sets if websocket is enabled -// it will also connect/disconnect the websocket connection -func (w *Websocket) SetWsStatusAndConnection(enabled bool) error { - w.m.Lock() - if w.enabled == enabled { - if w.init { - w.m.Unlock() +// Initialise verifies status and connects +func (w *Websocket) Initialise() error { + if w.IsEnabled() { + if w.IsInit() { return nil } - w.m.Unlock() - return fmt.Errorf("exchange_websocket.go error - already set as %t", - enabled) - } - w.enabled = enabled - if !w.init { - if enabled { - if w.connected { - w.m.Unlock() - return nil - } - w.m.Unlock() - return w.Connect() - } - - if !w.connected { - w.m.Unlock() - return nil - } - w.m.Unlock() - return w.Shutdown() + return fmt.Errorf("%v Websocket already initialised", + w.exchangeName) } - w.m.Unlock() + w.setEnabled(w.enabled) return nil } -// IsEnabled returns bool -func (w *Websocket) IsEnabled() bool { - return w.enabled -} - // SetProxyAddress sets websocket proxy address func (w *Websocket) SetProxyAddress(proxyAddr string) error { if w.proxyAddr == proxyAddr { - return errors.New("exchange_websocket.go error - Setting proxy address - same address") + return fmt.Errorf("%v Cannot set proxy address to the same address '%v'", w.exchangeName, w.proxyAddr) } w.proxyAddr = proxyAddr - if !w.init && w.enabled { - if w.connected { + if !w.IsInit() && w.IsEnabled() { + if w.IsConnected() { err := w.Shutdown() if err != nil { return err @@ -532,19 +508,28 @@ func (w *Websocket) manageSubscriptions() { for { select { case <-w.ShutdownC: + w.subscriptionMutex.Lock() w.subscribedChannels = []WebsocketChannelSubscription{} + w.subscriptionMutex.Unlock() if w.verbose { log.Debugf(log.WebsocketMgr, "%v shutdown manageSubscriptions", w.exchangeName) } return default: time.Sleep(manageSubscriptionsDelay) + if !w.IsConnected() { + w.subscriptionMutex.Lock() + w.subscribedChannels = []WebsocketChannelSubscription{} + w.subscriptionMutex.Unlock() + + continue + } if w.verbose { log.Debugf(log.WebsocketMgr, "%v checking subscriptions", w.exchangeName) } // Subscribe to channels Pending a subscription if w.SupportsFunctionality(WebsocketSubscribeSupported) { - err := w.subscribeToChannels() + err := w.appendSubscribedChannels() if err != nil { w.DataHandler <- err } @@ -559,11 +544,11 @@ func (w *Websocket) manageSubscriptions() { } } -// subscribeToChannels compares channelsToSubscribe to subscribedChannels +// appendSubscribedChannels compares channelsToSubscribe to subscribedChannels // and subscribes to any channels not present in subscribedChannels -func (w *Websocket) subscribeToChannels() error { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() +func (w *Websocket) appendSubscribedChannels() error { + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() for i := 0; i < len(w.channelsToSubscribe); i++ { channelIsSubscribed := false for j := 0; j < len(w.subscribedChannels); j++ { @@ -589,8 +574,8 @@ func (w *Websocket) subscribeToChannels() error { // unsubscribeToChannels compares subscribedChannels to channelsToSubscribe // and unsubscribes to any channels not present in channelsToSubscribe func (w *Websocket) unsubscribeToChannels() error { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() for i := 0; i < len(w.subscribedChannels); i++ { subscriptionFound := false for j := 0; j < len(w.channelsToSubscribe); j++ { @@ -622,8 +607,8 @@ func (w *Websocket) RemoveSubscribedChannels(channels []WebsocketChannelSubscrip // removeChannelToSubscribe removes an entry from w.channelsToSubscribe // so an unsubscribe event can be triggered func (w *Websocket) removeChannelToSubscribe(subscribedChannel WebsocketChannelSubscription) { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() channelLength := len(w.channelsToSubscribe) i := 0 for j := 0; j < len(w.channelsToSubscribe); j++ { @@ -644,8 +629,8 @@ func (w *Websocket) removeChannelToSubscribe(subscribedChannel WebsocketChannelS // ResubscribeToChannel calls unsubscribe func and // removes it from subscribedChannels to trigger a subscribe event func (w *Websocket) ResubscribeToChannel(subscribedChannel WebsocketChannelSubscription) { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() err := w.channelUnsubscriber(subscribedChannel) if err != nil { w.DataHandler <- err @@ -675,7 +660,6 @@ func (w *Websocket) SubscribeToChannels(channels []WebsocketChannelSubscription) w.channelsToSubscribe = append(w.channelsToSubscribe, channels[i]) } } - w.noConnectionChecks = 0 } // Equal two WebsocketChannelSubscription to determine equality @@ -693,16 +677,16 @@ func (w *Websocket) GetSubscriptions() []WebsocketChannelSubscription { // SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in // a thread safe manner func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool) { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() w.canUseAuthenticatedEndpoints = val } // CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in // a thread safe manner func (w *Websocket) CanUseAuthenticatedEndpoints() bool { - w.subscriptionLock.Lock() - defer w.subscriptionLock.Unlock() + w.subscriptionMutex.Lock() + defer w.subscriptionMutex.Unlock() return w.canUseAuthenticatedEndpoints } @@ -735,6 +719,10 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header } return fmt.Errorf("%v Error: %v", w.URL, err) } + if w.Verbose { + log.Infof(log.WebsocketMgr, "%v Websocket connected", w.ExchangeName) + } + w.setConnectedStatus(true) return nil } @@ -742,6 +730,9 @@ func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header func (w *WebsocketConnection) SendMessage(data interface{}) error { w.Lock() defer w.Unlock() + if !w.IsConnected() { + return fmt.Errorf("%v cannot send message to a disconnected websocket", w.ExchangeName) + } json, err := common.JSONEncode(data) if err != nil { return err @@ -801,10 +792,26 @@ func (w *WebsocketConnection) WaitForResult(id int64, wg *sync.WaitGroup) { } } +func (w *WebsocketConnection) setConnectedStatus(b bool) { + w.connectionMutex.Lock() + w.connected = b + w.connectionMutex.Unlock() +} + +// IsConnected exposes websocket connection status +func (w *WebsocketConnection) IsConnected() bool { + w.connectionMutex.RLock() + defer w.connectionMutex.RUnlock() + return w.connected +} + // ReadMessage reads messages, can handle text, gzip and binary func (w *WebsocketConnection) ReadMessage() (WebsocketResponse, error) { mType, resp, err := w.Connection.ReadMessage() if err != nil { + if isDisconnectionError(err) { + w.setConnectedStatus(false) + } return WebsocketResponse{}, err } var standardMessage []byte @@ -866,3 +873,15 @@ func (w *WebsocketConnection) GenerateMessageID(useNano bool) int64 { } return time.Now().Unix() } + +// isDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error +func isDisconnectionError(err error) bool { + if websocket.IsUnexpectedCloseError(err) { + return true + } + switch err.(type) { + case *websocket.CloseError, *net.OpError: + return true + } + return false +} diff --git a/exchanges/websocket/wshandler/wshandler_test.go b/exchanges/websocket/wshandler/wshandler_test.go index f3699a8b8c9..931ec6617fb 100644 --- a/exchanges/websocket/wshandler/wshandler_test.go +++ b/exchanges/websocket/wshandler/wshandler_test.go @@ -1,38 +1,151 @@ package wshandler import ( - "fmt" + "bytes" + "compress/flate" + "compress/gzip" + "errors" + "net" + "net/http" + "os" "strings" + "sync" "testing" "time" + + "github.com/gorilla/websocket" + "github.com/thrasher-corp/gocryptotrader/common" + "github.com/thrasher-corp/gocryptotrader/currency" ) -var ws *Websocket +func TestTrafficMonitorTimeout(t *testing.T) { + ws := New() + err := ws.Setup( + &WebsocketSetup{ + Enabled: true, + AuthenticatedWebsocketAPISupport: true, + WebsocketTimeout: 10000, + DefaultURL: "testDefaultURL", + ExchangeName: "exchangeName", + RunningURL: "testRunningURL", + Connector: func() error { return nil }, + Subscriber: func(test WebsocketChannelSubscription) error { return nil }, + UnSubscriber: func(test WebsocketChannelSubscription) error { return nil }, + }) + if err != nil { + t.Error(err) + } + ws.setConnectedStatus(true) + ws.TrafficAlert = make(chan struct{}, 2) + ws.ShutdownC = make(chan struct{}) + var anotherWG sync.WaitGroup + anotherWG.Add(1) + go ws.trafficMonitor(&anotherWG) + anotherWG.Wait() + ws.TrafficAlert <- struct{}{} + trafficTimer := time.NewTimer(5 * time.Second) + select { + case <-trafficTimer.C: + t.Error("should be exiting") + default: + ws.Wg.Wait() + } +} + +func TestIsDisconnectionError(t *testing.T) { + isADisconnectionError := isDisconnectionError(errors.New("errorText")) + if isADisconnectionError { + t.Error("Its not") + } + isADisconnectionError = isDisconnectionError(&websocket.CloseError{ + Code: 1006, + Text: "errorText", + }) + if !isADisconnectionError { + t.Error("It is") + } + + isADisconnectionError = isDisconnectionError(&net.OpError{ + Op: "", + Net: "", + Source: nil, + Addr: nil, + Err: errors.New("errorText"), + }) + if !isADisconnectionError { + t.Error("It is") + } +} -func TestWebsocketInit(t *testing.T) { - ws = New() - if ws == nil { - t.Error("test failed - Websocket New() error") +func TestConnectionMessageErrors(t *testing.T) { + ws := New() + ws.connected = true + ws.enabled = true + ws.ReadMessageErrors = make(chan error) + ws.DataHandler = make(chan interface{}) + ws.ShutdownC = make(chan struct{}) + ws.connector = func() error { return nil } + go ws.connectionMonitor() + timer := time.NewTimer(900 * time.Millisecond) + ws.ReadMessageErrors <- errors.New("errorText") + select { + case err := <-ws.DataHandler: + if err.(error).Error() != "errorText" { + t.Errorf("Expected 'errorText', received %v", err) + } + case <-timer.C: + t.Error("Timeout waiting for datahandler to receive error") + } + timer = time.NewTimer(900 * time.Millisecond) + ws.ReadMessageErrors <- &websocket.CloseError{ + Code: 1006, + Text: "errorText", + } +outer: + for { + select { + case <-ws.DataHandler: + t.Fatal("Error is a disconnection error") + case <-timer.C: + break outer + } } } func TestWebsocket(t *testing.T) { - if err := ws.SetProxyAddress("testProxy"); err != nil { + ws := Websocket{} + ws.setInit(true) + err := ws.Setup(&WebsocketSetup{ + ExchangeName: "test", + Enabled: true, + }) + if err != nil && err.Error() != "test Websocket already initialised" { + t.Errorf("Expected 'test Websocket already initialised', received %v", err) + } + + ws = *New() + err = ws.SetProxyAddress("testProxy") + if err != nil { t.Error("test failed - SetProxyAddress", err) } - ws.Setup(func() error { return nil }, - func(test WebsocketChannelSubscription) error { return nil }, - func(test WebsocketChannelSubscription) error { return nil }, - "testName", - true, - false, - "testDefaultURL", - "testRunningURL", - false) + err = ws.Setup( + &WebsocketSetup{ + Enabled: true, + AuthenticatedWebsocketAPISupport: true, + WebsocketTimeout: 2, + DefaultURL: "testDefaultURL", + ExchangeName: "exchangeName", + RunningURL: "testRunningURL", + Connector: func() error { return nil }, + Subscriber: func(test WebsocketChannelSubscription) error { return nil }, + UnSubscriber: func(test WebsocketChannelSubscription) error { return nil }, + }) + if err != nil { + t.Error(err) + } - // Test variable setting and retreival - if ws.GetName() != "testName" { + if ws.GetName() != "exchangeName" { t.Error("test failed - WebsocketSetup") } @@ -52,25 +165,11 @@ func TestWebsocket(t *testing.T) { t.Error("test failed - WebsocketSetup") } - // Test websocket connect and shutdown functions - comms := make(chan struct{}, 1) - go func() { - var count int - for { - if count == 4 { - close(comms) - return - } - select { - case <-ws.Connected: - count++ - case <-ws.Disconnected: - count++ - } - } - }() + if ws.trafficTimeout != time.Duration(2) { + t.Error("test failed - WebsocketSetup") + } // -- Not connected shutdown - err := ws.Shutdown() + err = ws.Shutdown() if err == nil { t.Fatal("test failed - should not be connected to able to shut down") } @@ -80,70 +179,61 @@ func TestWebsocket(t *testing.T) { if err != nil { t.Fatal("test failed - WebsocketSetup", err) } - + ws.SetWebsocketURL("ws://demos.kaazing.com/echo") // -- Already connected connect err = ws.Connect() if err == nil { t.Fatal("test failed - should not connect, already connected") } - - ws.SetWebsocketURL("") - - // -- Set true when already true - err = ws.SetWsStatusAndConnection(true) - if err == nil { - t.Fatal("test failed - setting enabled should not work") - } - - // -- Set false normal - err = ws.SetWsStatusAndConnection(false) - if err != nil { - t.Fatal("test failed - setting enabled should not work") - } - - // -- Set true normal - err = ws.SetWsStatusAndConnection(true) - if err != nil { - t.Fatal("test failed - setting enabled should not work") - } - // -- Normal shutdown err = ws.Shutdown() if err != nil { t.Fatal("test failed - WebsocketSetup", err) } - - timer := time.NewTimer(5 * time.Second) - select { - case <-comms: - case <-timer.C: - t.Fatal("test failed - WebsocketSetup - timeout") - } + ws.Wg.Wait() } func TestFunctionality(t *testing.T) { - var w Websocket - - if w.FormatFunctionality() != NoWebsocketSupportText { + ws := New() + if ws.FormatFunctionality() != NoWebsocketSupportText { t.Fatalf("Test Failed - FormatFunctionality error expected %s but received %s", - NoWebsocketSupportText, w.FormatFunctionality()) + NoWebsocketSupportText, ws.FormatFunctionality()) } - w.Functionality = 1 << 31 + ws.Functionality = 1 << 31 - if w.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" { + if ws.FormatFunctionality() != UnknownWebsocketFunctionality+"[1<<31]" { t.Fatal("Test Failed - GetFunctionality error incorrect error returned") } - w.Functionality = WebsocketOrderbookSupported + ws.Functionality = WebsocketOrderbookSupported - if w.GetFunctionality() != WebsocketOrderbookSupported { + if ws.GetFunctionality() != WebsocketOrderbookSupported { t.Fatal("Test Failed - GetFunctionality error incorrect bitmask returned") } - if !w.SupportsFunctionality(WebsocketOrderbookSupported) { + if !ws.SupportsFunctionality(WebsocketOrderbookSupported) { t.Fatal("Test Failed - SupportsFunctionality error should be true") } + + ws.Functionality = WebsocketTickerSupported | WebsocketOrderbookSupported | WebsocketKlineSupported | + WebsocketTradeDataSupported | WebsocketAccountSupported | WebsocketAllowsRequests | + WebsocketSubscribeSupported | WebsocketUnsubscribeSupported | WebsocketAuthenticatedEndpointsSupported | + WebsocketAccountDataSupported | WebsocketSubmitOrderSupported | WebsocketCancelOrderSupported | + WebsocketWithdrawSupported | WebsocketMessageCorrelationSupported | WebsocketSequenceNumberSupported | + WebsocketDeadMansSwitchSupported + formatted := ws.FormatFunctionality() + + if !strings.Contains(formatted, WebsocketTickerSupportedText) || !strings.Contains(formatted, WebsocketOrderbookSupportedText) || + !strings.Contains(formatted, WebsocketKlineSupportedText) || !strings.Contains(formatted, WebsocketTradeDataSupportedText) || + !strings.Contains(formatted, WebsocketAccountSupportedText) || !strings.Contains(formatted, WebsocketAllowsRequestsText) || + !strings.Contains(formatted, WebsocketSubscribeSupportedText) || !strings.Contains(formatted, WebsocketUnsubscribeSupportedText) || + !strings.Contains(formatted, WebsocketAuthenticatedEndpointsSupportedText) || !strings.Contains(formatted, WebsocketAccountDataSupportedText) || + !strings.Contains(formatted, WebsocketSubmitOrderSupportedText) || !strings.Contains(formatted, WebsocketCancelOrderSupportedText) || + !strings.Contains(formatted, WebsocketWithdrawSupportedText) || !strings.Contains(formatted, WebsocketMessageCorrelationSupportedText) || + !strings.Contains(formatted, WebsocketSequenceNumberSupportedText) || !strings.Contains(formatted, WebsocketDeadMansSwitchSupportedText) { + t.Error("Failed to format and include supported websocket features") + } } // placeholderSubscriber basic function to test subscriptions @@ -162,12 +252,32 @@ func TestSubscribe(t *testing.T) { subscribedChannels: []WebsocketChannelSubscription{}, } w.SetChannelSubscriber(placeholderSubscriber) - w.subscribeToChannels() + err := w.appendSubscribedChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 1 { t.Errorf("Subscription did not occur") } } +// TestSubscribe logic test +func TestSubscribeToChannels(t *testing.T) { + w := Websocket{ + channelsToSubscribe: []WebsocketChannelSubscription{ + { + Channel: "hello", + }, + }, + subscribedChannels: []WebsocketChannelSubscription{}, + } + w.SetChannelSubscriber(placeholderSubscriber) + w.SubscribeToChannels([]WebsocketChannelSubscription{{Channel: "hello"}, {Channel: "hello2"}}) + if len(w.channelsToSubscribe) != 2 { + t.Errorf("Subscription did not occur") + } +} + // TestUnsubscribe logic test func TestUnsubscribe(t *testing.T) { w := Websocket{ @@ -179,7 +289,10 @@ func TestUnsubscribe(t *testing.T) { }, } w.SetChannelUnsubscriber(placeholderSubscriber) - w.unsubscribeToChannels() + err := w.unsubscribeToChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 0 { t.Errorf("Unsubscription did not occur") } @@ -200,7 +313,10 @@ func TestSubscriptionWithExistingEntry(t *testing.T) { }, } w.SetChannelSubscriber(placeholderSubscriber) - w.subscribeToChannels() + err := w.appendSubscribedChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 1 { t.Errorf("Subscription should not have occurred") } @@ -221,7 +337,10 @@ func TestUnsubscriptionWithExistingEntry(t *testing.T) { }, } w.SetChannelUnsubscriber(placeholderSubscriber) - w.unsubscribeToChannels() + err := w.unsubscribeToChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 1 { t.Errorf("Unsubscription should not have occurred") } @@ -230,67 +349,49 @@ func TestUnsubscriptionWithExistingEntry(t *testing.T) { // TestManageSubscriptionsStartStop logic test func TestManageSubscriptionsStartStop(t *testing.T) { w := Websocket{ - ShutdownC: make(chan struct{}, 1), + ShutdownC: make(chan struct{}), Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported, } go w.manageSubscriptions() - time.Sleep(time.Second) close(w.ShutdownC) + w.Wg.Wait() } -// TestConnectionMonitorNoConnection logic test -func TestConnectionMonitorNoConnection(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.exchangeName = "hello" - go w.connectionMonitor() - err := <-w.DataHandler - if !strings.EqualFold(err.(error).Error(), - fmt.Sprintf("%v connectionMonitor: websocket disabled, shutting down", w.exchangeName)) { - t.Errorf("expecting error 'connectionMonitor: websocket disabled, shutting down', received '%v'", err) - } -} - -// TestWsNoConnectionTolerance logic test -func TestWsNoConnectionTolerance(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.enabled = true - w.noConnectionCheckLimit = 500 - w.checkConnection() - if w.noConnectionChecks == 0 { - t.Errorf("Expected noConnectionTolerance to increment, received '%v'", w.noConnectionChecks) - } -} - -// TestConnecting logic test -func TestConnecting(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.enabled = true - w.connecting = true - w.reconnectionLimit = 500 - w.checkConnection() - if w.reconnectionChecks != 1 { - t.Errorf("Expected reconnectionLimit to increment, received '%v'", w.reconnectionChecks) +// TestManageSubscriptionsStartStop logic test +func TestManageSubscriptions(t *testing.T) { + w := Websocket{ + ShutdownC: make(chan struct{}), + Functionality: WebsocketSubscribeSupported | WebsocketUnsubscribeSupported, + subscribedChannels: []WebsocketChannelSubscription{ + { + Channel: "hello", + }, + }, } + w.SetChannelUnsubscriber(placeholderSubscriber) + w.SetChannelSubscriber(placeholderSubscriber) + w.setConnectedStatus(true) + go w.manageSubscriptions() + time.Sleep(8 * time.Second) + w.setConnectedStatus(false) + time.Sleep(manageSubscriptionsDelay) + w.subscriptionMutex.Lock() + if len(w.subscribedChannels) > 0 { + t.Error("Expected empty subscribed channels") + } + w.subscriptionMutex.Unlock() } -// TestReconnectionLimit logic test -func TestReconnectionLimit(t *testing.T) { - w := Websocket{} - w.DataHandler = make(chan interface{}, 1) - w.ShutdownC = make(chan struct{}, 1) - w.enabled = true - w.connecting = true - w.reconnectionChecks = 99 - w.reconnectionLimit = 1 - err := w.checkConnection() - if err == nil { - t.Error("Expected error") +// TestConnectionMonitorNoConnection logic test +func TestConnectionMonitorNoConnection(t *testing.T) { + ws := New() + ws.DataHandler = make(chan interface{}, 1) + ws.ShutdownC = make(chan struct{}, 1) + ws.exchangeName = "hello" + ws.trafficTimeout = 1 + go ws.connectionMonitor() + if ws.IsConnectionMonitorRunning() { + t.Fatal("Should have exited") } } @@ -360,26 +461,259 @@ func TestSliceCopyDoesntImpactBoth(t *testing.T) { }, } w.SetChannelUnsubscriber(placeholderSubscriber) - w.unsubscribeToChannels() + err := w.unsubscribeToChannels() + if err != nil { + t.Error(err) + } if len(w.subscribedChannels) != 2 { t.Errorf("Unsubscription did not occur") } w.subscribedChannels[0].Channel = "test" if strings.EqualFold(w.subscribedChannels[0].Channel, w.channelsToSubscribe[0].Channel) { - t.Errorf("Slice has not been copies appropriately") + t.Errorf("Slice has not been copied appropriately") + } +} + +// TestSliceCopyDoesntImpactBoth logic test +func TestGetSubscriptions(t *testing.T) { + w := Websocket{ + subscribedChannels: []WebsocketChannelSubscription{ + { + Channel: "hello3", + }, + }, + } + + subs := w.GetSubscriptions() + subs[0].Channel = "noHELLO" + if strings.EqualFold(w.subscribedChannels[0].Channel, subs[0].Channel) { + t.Error("Subscriptions was not copied properly") } } // TestSetCanUseAuthenticatedEndpoints logic test func TestSetCanUseAuthenticatedEndpoints(t *testing.T) { - w := Websocket{} - result := w.CanUseAuthenticatedEndpoints() + ws := New() + result := ws.CanUseAuthenticatedEndpoints() if result { t.Error("expected `canUseAuthenticatedEndpoints` to be false") } - w.SetCanUseAuthenticatedEndpoints(true) - result = w.CanUseAuthenticatedEndpoints() + ws.SetCanUseAuthenticatedEndpoints(true) + result = ws.CanUseAuthenticatedEndpoints() if !result { t.Error("expected `canUseAuthenticatedEndpoints` to be true") } } + +func TestRemoveSubscribedChannels(t *testing.T) { + w := Websocket{ + channelsToSubscribe: []WebsocketChannelSubscription{ + { + Channel: "hello3", + }, + }, + } + + w.RemoveSubscribedChannels([]WebsocketChannelSubscription{{Channel: "hello3"}}) + if len(w.channelsToSubscribe) == 1 { + t.Error("Did not remove subscription") + } +} + +const ( + websocketTestURL = "wss://www.bitmex.com/realtime" + returnResponseURL = "wss://ws.kraken.com" + useProxyTests = false // Disabled by default. Freely available proxy servers that work all the time are difficult to find + proxyURL = "http://212.186.171.4:80" // Replace with a usable proxy server +) + +var wc *WebsocketConnection +var dialer websocket.Dialer + +type testStruct struct { + Error error + WC WebsocketConnection +} + +type testRequest struct { + Event string `json:"event"` + RequestID int64 `json:"reqid,omitempty"` + Pairs []string `json:"pair"` + Subscription testRequestData `json:"subscription,omitempty"` +} + +// testRequestData contains details on WS channel +type testRequestData struct { + Name string `json:"name,omitempty"` + Interval int64 `json:"interval,omitempty"` + Depth int64 `json:"depth,omitempty"` +} + +type testResponse struct { + RequestID int64 `json:"reqid,omitempty"` +} + +// TestMain setup test +func TestMain(m *testing.M) { + wc = &WebsocketConnection{ + ExchangeName: "test", + URL: returnResponseURL, + ResponseMaxLimit: 7000000000, + ResponseCheckTimeout: 30000000, + } + os.Exit(m.Run()) +} + +// TestDial logic test +func TestDial(t *testing.T) { + var testCases = []testStruct{ + {Error: nil, WC: WebsocketConnection{ExchangeName: "test1", Verbose: true, URL: websocketTestURL, RateLimit: 10, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: errors.New(" Error: malformed ws or wss URL"), WC: WebsocketConnection{ExchangeName: "test2", Verbose: true, URL: "", ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: nil, WC: WebsocketConnection{ExchangeName: "test3", Verbose: true, URL: websocketTestURL, ProxyURL: proxyURL, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + } + for i := 0; i < len(testCases); i++ { + testData := &testCases[i] + t.Run(testData.WC.ExchangeName, func(t *testing.T) { + if testData.WC.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + err := testData.WC.Dial(&dialer, http.Header{}) + if err != nil { + if testData.Error != nil && err.Error() == testData.Error.Error() { + return + } + t.Fatal(err) + } + }) + } +} + +// TestSendMessage logic test +func TestSendMessage(t *testing.T) { + var testCases = []testStruct{ + {Error: nil, WC: WebsocketConnection{ExchangeName: "test1", Verbose: true, URL: websocketTestURL, RateLimit: 10, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: errors.New(" Error: malformed ws or wss URL"), WC: WebsocketConnection{ExchangeName: "test2", Verbose: true, URL: "", ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + {Error: nil, WC: WebsocketConnection{ExchangeName: "test3", Verbose: true, URL: websocketTestURL, ProxyURL: proxyURL, ResponseCheckTimeout: 30000000, ResponseMaxLimit: 7000000000}}, + } + for i := 0; i < len(testCases); i++ { + testData := &testCases[i] + t.Run(testData.WC.ExchangeName, func(t *testing.T) { + if testData.WC.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + err := testData.WC.Dial(&dialer, http.Header{}) + if err != nil { + if testData.Error != nil && err.Error() == testData.Error.Error() { + return + } + t.Fatal(err) + } + err = testData.WC.SendMessage("ping") + if err != nil { + t.Error(err) + } + }) + } +} + +// TestSendMessageWithResponse logic test +func TestSendMessageWithResponse(t *testing.T) { + if wc.ProxyURL != "" && !useProxyTests { + t.Skip("Proxy testing not enabled, skipping") + } + err := wc.Dial(&dialer, http.Header{}) + if err != nil { + t.Fatal(err) + } + go readMessages(wc, t) + + request := testRequest{ + Event: "subscribe", + Pairs: []string{currency.NewPairWithDelimiter("XBT", "USD", "/").String()}, + Subscription: testRequestData{ + Name: "ticker", + }, + RequestID: wc.GenerateMessageID(false), + } + _, err = wc.SendMessageReturnResponse(request.RequestID, request) + if err != nil { + t.Error(err) + } +} + +// TestParseBinaryResponse logic test +func TestParseBinaryResponse(t *testing.T) { + var b bytes.Buffer + w := gzip.NewWriter(&b) + _, err := w.Write([]byte("hello")) + if err != nil { + t.Error(err) + } + err = w.Close() + if err != nil { + t.Error(err) + } + var resp []byte + resp, err = wc.parseBinaryResponse(b.Bytes()) + if err != nil { + t.Error(err) + } + if !strings.EqualFold(string(resp), "hello") { + t.Errorf("GZip conversion failed. Received: '%v', Expected: 'hello'", string(resp)) + } + + var b2 bytes.Buffer + w2, err2 := flate.NewWriter(&b2, 1) + if err2 != nil { + t.Error(err2) + } + _, err2 = w2.Write([]byte("hello")) + if err2 != nil { + t.Error(err) + } + err2 = w2.Close() + if err2 != nil { + t.Error(err) + } + resp2, err3 := wc.parseBinaryResponse(b2.Bytes()) + if err3 != nil { + t.Error(err3) + } + if !strings.EqualFold(string(resp2), "hello") { + t.Errorf("GZip conversion failed. Received: '%v', Expected: 'hello'", string(resp2)) + } +} + +// TestAddResponseWithID logic test +func TestAddResponseWithID(t *testing.T) { + wc.IDResponses = nil + wc.AddResponseWithID(0, []byte("hi")) + wc.AddResponseWithID(1, []byte("hi")) +} + +// readMessages helper func +func readMessages(wc *WebsocketConnection, t *testing.T) { + timer := time.NewTimer(20 * time.Second) + for { + select { + case <-timer.C: + return + default: + resp, err := wc.ReadMessage() + if err != nil { + t.Error(err) + return + } + var incoming testResponse + err = common.JSONDecode(resp.Raw, &incoming) + if err != nil { + t.Error(err) + return + } + if incoming.RequestID > 0 { + wc.AddResponseWithID(incoming.RequestID, resp.Raw) + return + } + } + } +} diff --git a/exchanges/websocket/wshandler/wshandler_types.go b/exchanges/websocket/wshandler/wshandler_types.go index 529c124d809..6469efe4127 100644 --- a/exchanges/websocket/wshandler/wshandler_types.go +++ b/exchanges/websocket/wshandler/wshandler_types.go @@ -48,51 +48,40 @@ const ( WebsocketMessageCorrelationSupportedText = "WEBSOCKET MESSAGE CORRELATION SUPPORTED" WebsocketSequenceNumberSupportedText = "WEBSOCKET SEQUENCE NUMBER SUPPORTED" WebsocketDeadMansSwitchSupportedText = "WEBSOCKET DEAD MANS SWITCH SUPPORTED" - // WebsocketNotEnabled alerts of a disabled websocket - WebsocketNotEnabled = "exchange_websocket_not_enabled" - // WebsocketTrafficLimitTime defines a standard time for no traffic from the - // websocket connection - WebsocketTrafficLimitTime = 5 * time.Second - websocketRestablishConnection = time.Second - manageSubscriptionsDelay = 5 * time.Second + WebsocketNotEnabled = "exchange_websocket_not_enabled" + manageSubscriptionsDelay = 5 * time.Second // connection monitor time delays and limits connectionMonitorDelay = 2 * time.Second - // WebsocketStateTimeout defines a const for when a websocket connection - // times out, will be handled by the routine management system - WebsocketStateTimeout = "TIMEOUT" ) // Websocket defines a return type for websocket connections via the interface // wrapper for routine processing in routines.go type Websocket struct { - proxyAddr string - defaultURL string - runningURL string - exchangeName string - enabled bool - init bool - connected bool - connecting bool - verbose bool - connector func() error - m sync.Mutex - subscriptionLock sync.Mutex - connectionMonitorRunning bool - reconnectionLimit int - noConnectionChecks int - reconnectionChecks int - noConnectionCheckLimit int - subscribedChannels []WebsocketChannelSubscription - channelsToSubscribe []WebsocketChannelSubscription - channelSubscriber func(channelToSubscribe WebsocketChannelSubscription) error - channelUnsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error - // Connected denotes a channel switch for diversion of request flow - Connected chan struct{} - // Disconnected denotes a channel switch for diversion of request flow - Disconnected chan struct{} - // DataHandler pipes websocket data to an exchange websocket data handler - DataHandler chan interface{} + // Functionality defines websocket stream capabilities + Functionality uint32 + canUseAuthenticatedEndpoints bool + enabled bool + init bool + connected bool + connecting bool + trafficMonitorRunning bool + verbose bool + connectionMonitorRunning bool + trafficTimeout time.Duration + proxyAddr string + defaultURL string + runningURL string + exchangeName string + m sync.Mutex + subscriptionMutex sync.Mutex + connectionMutex sync.RWMutex + connector func() error + subscribedChannels []WebsocketChannelSubscription + channelsToSubscribe []WebsocketChannelSubscription + channelSubscriber func(channelToSubscribe WebsocketChannelSubscription) error + channelUnsubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error + DataHandler chan interface{} // ShutdownC is the main shutdown channel which controls all websocket go funcs ShutdownC chan struct{} // Orderbook is a local cache of orderbooks @@ -102,9 +91,21 @@ type Websocket struct { Wg sync.WaitGroup // TrafficAlert monitors if there is a halt in traffic throughput TrafficAlert chan struct{} - // Functionality defines websocket stream capabilities - Functionality uint32 - canUseAuthenticatedEndpoints bool + // ReadMessageErrors will received all errors from ws.ReadMessage() and verify if its a disconnection + ReadMessageErrors chan error +} + +type WebsocketSetup struct { + Enabled bool + Verbose bool + AuthenticatedWebsocketAPISupport bool + WebsocketTimeout time.Duration + DefaultURL string + ExchangeName string + RunningURL string + Connector func() error + Subscriber func(channelToSubscribe WebsocketChannelSubscription) error + UnSubscriber func(channelToUnsubscribe WebsocketChannelSubscription) error } // WebsocketChannelSubscription container for websocket subscriptions @@ -187,16 +188,19 @@ type WebsocketPositionUpdated struct { // WebsocketConnection contains all the data needed to send a message to a WS type WebsocketConnection struct { sync.Mutex - Verbose bool - RateLimit float64 - ExchangeName string - URL string - ProxyURL string - Wg sync.WaitGroup - Connection *websocket.Conn - Shutdown chan struct{} + Verbose bool + connected bool + connectionMutex sync.RWMutex + RateLimit float64 + ExchangeName string + URL string + ProxyURL string + Wg sync.WaitGroup + Connection *websocket.Conn + Shutdown chan struct{} // These are the request IDs and the corresponding response JSON IDResponses map[int64][]byte ResponseCheckTimeout time.Duration ResponseMaxLimit time.Duration + TrafficTimeout time.Duration } diff --git a/exchanges/websocket/wsorderbook/wsorderbook.go b/exchanges/websocket/wsorderbook/wsorderbook.go index 66b59d13238..3446ddbd112 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook.go +++ b/exchanges/websocket/wsorderbook/wsorderbook.go @@ -201,7 +201,7 @@ func (w *WebsocketOrderbookLocal) updateByIDAndAction(orderbookUpdate *Websocket // LoadSnapshot loads initial snapshot of ob data, overwrite allows full // ob to be completely rewritten because the exchange is a doing a full // update not an incremental one -func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, overwrite bool) error { +func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base) error { if len(newOrderbook.Asks) == 0 || len(newOrderbook.Bids) == 0 { return fmt.Errorf("%v snapshot ask and bids are nil", w.exchangeName) } @@ -216,11 +216,8 @@ func (w *WebsocketOrderbookLocal) LoadSnapshot(newOrderbook *orderbook.Base, ove if w.ob[newOrderbook.Pair][newOrderbook.AssetType] != nil && (len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Asks) > 0 || len(w.ob[newOrderbook.Pair][newOrderbook.AssetType].Bids) > 0) { - if overwrite { - w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook - return newOrderbook.Process() - } - return fmt.Errorf("%v snapshot instance already found", w.exchangeName) + w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook + return newOrderbook.Process() } w.ob[newOrderbook.Pair][newOrderbook.AssetType] = newOrderbook return newOrderbook.Process() diff --git a/exchanges/websocket/wsorderbook/wsorderbook_test.go b/exchanges/websocket/wsorderbook/wsorderbook_test.go index b3edf789764..acc00095908 100644 --- a/exchanges/websocket/wsorderbook/wsorderbook_test.go +++ b/exchanges/websocket/wsorderbook/wsorderbook_test.go @@ -38,7 +38,7 @@ func createSnapshot() (obl *WebsocketOrderbookLocal, curr currency.Pair, asks, b snapShot1.AssetType = asset.Spot snapShot1.Pair = curr obl = &WebsocketOrderbookLocal{} - err = obl.LoadSnapshot(&snapShot1, false) + err = obl.LoadSnapshot(&snapShot1) return } @@ -382,8 +382,7 @@ func TestRunSnapshotWithNoData(t *testing.T) { snapShot1.Pair = curr snapShot1.ExchangeName = "test" obl.exchangeName = "test" - err := obl.LoadSnapshot(&snapShot1, - false) + err := obl.LoadSnapshot(&snapShot1) if err == nil { t.Fatal("expected an error loading a snapshot") } @@ -392,8 +391,8 @@ func TestRunSnapshotWithNoData(t *testing.T) { } } -// TestLoadSnapshotWithOverride logic test -func TestLoadSnapshotWithOverride(t *testing.T) { +// TestLoadSnapshot logic test +func TestLoadSnapshot(t *testing.T) { var obl WebsocketOrderbookLocal var snapShot1 orderbook.Base curr := currency.NewPairFromString("BTCUSD") @@ -407,21 +406,13 @@ func TestLoadSnapshotWithOverride(t *testing.T) { snapShot1.Bids = bids snapShot1.AssetType = asset.Spot snapShot1.Pair = curr - err := obl.LoadSnapshot(&snapShot1, false) - if err != nil { - t.Error(err) - } - err = obl.LoadSnapshot(&snapShot1, false) - if err == nil { - t.Error("expected error: 'snapshot instance already found'") - } - err = obl.LoadSnapshot(&snapShot1, true) + err := obl.LoadSnapshot(&snapShot1) if err != nil { t.Error(err) } } -// TestInsertWithIDs logic test +// TestFlushCache logic test func TestFlushCache(t *testing.T) { obl, curr, _, _, err := createSnapshot() if err != nil { @@ -473,7 +464,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot1.Bids = bids snapShot1.AssetType = asset.Spot snapShot1.Pair = currency.NewPairFromString("BTCUSD") - err := obl.LoadSnapshot(&snapShot1, false) + err := obl.LoadSnapshot(&snapShot1) if err != nil { t.Fatal(err) } @@ -510,7 +501,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot2.Bids = bids snapShot2.AssetType = asset.Spot snapShot2.Pair = currency.NewPairFromString("LTCUSD") - err = obl.LoadSnapshot(&snapShot2, false) + err = obl.LoadSnapshot(&snapShot2) if err != nil { t.Fatal(err) } @@ -547,7 +538,7 @@ func TestInsertingSnapShots(t *testing.T) { snapShot3.Bids = bids snapShot3.AssetType = "FUTURES" snapShot3.Pair = currency.NewPairFromString("LTCUSD") - err = obl.LoadSnapshot(&snapShot3, false) + err = obl.LoadSnapshot(&snapShot3) if err != nil { t.Fatal(err) } diff --git a/exchanges/zb/zb_websocket.go b/exchanges/zb/zb_websocket.go index 7cd2386681d..f52c0aed8ba 100644 --- a/exchanges/zb/zb_websocket.go +++ b/exchanges/zb/zb_websocket.go @@ -58,7 +58,7 @@ func (z *ZB) WsHandleData() { default: resp, err := z.WebsocketConn.ReadMessage() if err != nil { - z.Websocket.DataHandler <- err + z.Websocket.ReadMessageErrors <- err return } z.Websocket.TrafficAlert <- struct{}{} @@ -143,8 +143,7 @@ func (z *ZB) WsHandleData() { newOrderBook.AssetType = asset.Spot newOrderBook.Pair = cPair - err = z.Websocket.Orderbook.LoadSnapshot(&newOrderBook, - true) + err = z.Websocket.Orderbook.LoadSnapshot(&newOrderBook) if err != nil { z.Websocket.DataHandler <- err continue diff --git a/exchanges/zb/zb_wrapper.go b/exchanges/zb/zb_wrapper.go index e415043d63a..37a2a6374c2 100644 --- a/exchanges/zb/zb_wrapper.go +++ b/exchanges/zb/zb_wrapper.go @@ -117,15 +117,18 @@ func (z *ZB) Setup(exch *config.ExchangeConfig) error { return err } - err = z.Websocket.Setup(z.WsConnect, - z.Subscribe, - nil, - exch.Name, - exch.Features.Enabled.Websocket, - exch.Verbose, - zbWebsocketAPI, - exch.API.Endpoints.WebsocketURL, - exch.API.AuthenticatedWebsocketSupport) + err = z.Websocket.Setup( + &wshandler.WebsocketSetup{ + Enabled: exch.Features.Enabled.Websocket, + Verbose: exch.Verbose, + AuthenticatedWebsocketAPISupport: exch.API.AuthenticatedWebsocketSupport, + WebsocketTimeout: exch.WebsocketTrafficTimeout, + DefaultURL: zbWebsocketAPI, + ExchangeName: exch.Name, + RunningURL: exch.API.Endpoints.WebsocketURL, + Connector: z.WsConnect, + Subscriber: z.Subscribe, + }) if err != nil { return err }