Skip to content

Commit

Permalink
stream: set connection monitor delay. (thrasher-corp#1120)
Browse files Browse the repository at this point in the history
* stream: set connection monitor delay.

- this fixes a bug where the connection monitor delay config value does not
get set to the websocket on intialization.

* multi: add connection monitor delay to exchange config.

- this adds the connection monitor delay config option to the exchange type.
- the validate function of the exchange type has been updated to validate the
connection monitor delay value as well.

* multi: resolve review issues.
  • Loading branch information
dnldd authored Jan 30, 2023
1 parent b079049 commit 82c79a9
Show file tree
Hide file tree
Showing 26 changed files with 200 additions and 172 deletions.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1821,5 +1821,10 @@ func (c *Exchange) Validate() error {
if c == nil {
return errExchangeConfigIsNil
}

if c.ConnectionMonitorDelay <= 0 {
c.ConnectionMonitorDelay = DefaultConnectionMonitorDelay
}

return nil
}
2 changes: 2 additions & 0 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
defaultWebsocketResponseMaxLimit = time.Second * 7
defaultWebsocketOrderbookBufferLimit = 5
defaultWebsocketTrafficTimeout = time.Second * 30
DefaultConnectionMonitorDelay = time.Second * 2
maxAuthFailures = 3
defaultNTPAllowedDifference = 50000000
defaultNTPAllowedNegativeDifference = 50000000
Expand Down Expand Up @@ -149,6 +150,7 @@ type Exchange struct {
WebsocketResponseCheckTimeout time.Duration `json:"websocketResponseCheckTimeout"`
WebsocketResponseMaxLimit time.Duration `json:"websocketResponseMaxLimit"`
WebsocketTrafficTimeout time.Duration `json:"websocketTrafficTimeout"`
ConnectionMonitorDelay time.Duration `json:"connectionMonitorDelay"`
ProxyAddress string `json:"proxyAddress,omitempty"`
BaseCurrencies currency.Currencies `json:"baseCurrencies"`
CurrencyPairs *currency.PairsManager `json:"currencyPairs"`
Expand Down
17 changes: 9 additions & 8 deletions exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ func (b *Binance) Setup(exch *config.Exchange) error {
return err
}
err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: binanceDefaultWebsocketURL,
RunningURL: ePoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: binanceDefaultWebsocketURL,
RunningURL: ePoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
17 changes: 9 additions & 8 deletions exchanges/binanceus/binanceus_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,15 @@ func (bi *Binanceus) Setup(exch *config.Exchange) error {
}

err = bi.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: binanceusDefaultWebsocketURL,
RunningURL: ePoint,
Connector: bi.WsConnect,
Subscriber: bi.Subscribe,
Unsubscriber: bi.Unsubscribe,
GenerateSubscriptions: bi.GenerateSubscriptions,
Features: &bi.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: binanceusDefaultWebsocketURL,
RunningURL: ePoint,
Connector: bi.WsConnect,
Subscriber: bi.Subscribe,
Unsubscriber: bi.Unsubscribe,
GenerateSubscriptions: bi.GenerateSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &bi.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
17 changes: 9 additions & 8 deletions exchanges/bitfinex/bitfinex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,15 @@ func (b *Bitfinex) Setup(exch *config.Exchange) error {
}

err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: publicBitfinexWebsocketEndpoint,
RunningURL: wsEndpoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: publicBitfinexWebsocketEndpoint,
RunningURL: wsEndpoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
UpdateEntriesByID: true,
},
Expand Down
15 changes: 8 additions & 7 deletions exchanges/bithumb/bithumb_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ func (b *Bithumb) Setup(exch *config.Exchange) error {
return err
}
err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: wsEndpoint,
RunningURL: ePoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
GenerateSubscriptions: b.GenerateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: wsEndpoint,
RunningURL: ePoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
GenerateSubscriptions: b.GenerateSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities,
})
if err != nil {
return err
Expand Down
17 changes: 9 additions & 8 deletions exchanges/bitmex/bitmex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,15 @@ func (b *Bitmex) Setup(exch *config.Exchange) error {
}

err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: bitmexWSURL,
RunningURL: wsEndpoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: bitmexWSURL,
RunningURL: wsEndpoint,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
UpdateEntriesByID: true,
},
Expand Down
17 changes: 9 additions & 8 deletions exchanges/bitstamp/bitstamp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,15 @@ func (b *Bitstamp) Setup(exch *config.Exchange) error {
}

err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: bitstampWSURL,
RunningURL: wsURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.generateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: bitstampWSURL,
RunningURL: wsURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.generateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities,
})
if err != nil {
return err
Expand Down
17 changes: 9 additions & 8 deletions exchanges/bittrex/bittrex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,15 @@ func (b *Bittrex) Setup(exch *config.Exchange) error {

// Websocket details setup below
err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: bittrexAPIWSURL, // Default ws endpoint so we can roll back via CLI if needed.
RunningURL: wsRunningEndpoint,
Connector: b.WsConnect, // Connector function outlined above.
Subscriber: b.Subscribe, // Subscriber function outlined above.
Unsubscriber: b.Unsubscribe, // Unsubscriber function outlined above.
GenerateSubscriptions: b.GenerateDefaultSubscriptions, // GenerateDefaultSubscriptions function outlined above.
Features: &b.Features.Supports.WebsocketCapabilities, // Defines the capabilities of the websocket outlined in supported features struct. This allows the websocket connection to be flushed appropriately if we have a pair/asset enable/disable change. This is outlined below.
ExchangeConfig: exch,
DefaultURL: bittrexAPIWSURL, // Default ws endpoint so we can roll back via CLI if needed.
RunningURL: wsRunningEndpoint,
Connector: b.WsConnect, // Connector function outlined above.
Subscriber: b.Subscribe, // Subscriber function outlined above.
Unsubscriber: b.Unsubscribe, // Unsubscriber function outlined above.
GenerateSubscriptions: b.GenerateDefaultSubscriptions, // GenerateDefaultSubscriptions function outlined above.
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities, // Defines the capabilities of the websocket outlined in supported features struct. This allows the websocket connection to be flushed appropriately if we have a pair/asset enable/disable change. This is outlined below.
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
17 changes: 9 additions & 8 deletions exchanges/btcmarkets/btcmarkets_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,15 @@ func (b *BTCMarkets) Setup(exch *config.Exchange) error {
}

err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: btcMarketsWSURL,
RunningURL: wsURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.generateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: btcMarketsWSURL,
RunningURL: wsURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.generateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
UpdateIDProgression: true,
Expand Down
17 changes: 9 additions & 8 deletions exchanges/btse/btse_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,15 @@ func (b *BTSE) Setup(exch *config.Exchange) error {
}

err = b.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: btseWebsocket,
RunningURL: wsRunningURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: btseWebsocket,
RunningURL: wsRunningURL,
Connector: b.WsConnect,
Subscriber: b.Subscribe,
Unsubscriber: b.Unsubscribe,
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &b.Features.Supports.WebsocketCapabilities,
})
if err != nil {
return err
Expand Down
19 changes: 10 additions & 9 deletions exchanges/bybit/bybit_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,16 @@ func (by *Bybit) Setup(exch *config.Exchange) error {

err = by.Websocket.Setup(
&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: bybitWSBaseURL + wsSpotPublicTopicV2,
RunningURL: wsRunningEndpoint,
RunningURLAuth: bybitWSBaseURL + wsSpotPrivate,
Connector: by.WsConnect,
Subscriber: by.Subscribe,
Unsubscriber: by.Unsubscribe,
GenerateSubscriptions: by.GenerateDefaultSubscriptions,
Features: &by.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: bybitWSBaseURL + wsSpotPublicTopicV2,
RunningURL: wsRunningEndpoint,
RunningURLAuth: bybitWSBaseURL + wsSpotPrivate,
Connector: by.WsConnect,
Subscriber: by.Subscribe,
Unsubscriber: by.Unsubscribe,
GenerateSubscriptions: by.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &by.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
17 changes: 9 additions & 8 deletions exchanges/coinbasepro/coinbasepro_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,15 @@ func (c *CoinbasePro) Setup(exch *config.Exchange) error {
}

err = c.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: coinbaseproWebsocketURL,
RunningURL: wsRunningURL,
Connector: c.WsConnect,
Subscriber: c.Subscribe,
Unsubscriber: c.Unsubscribe,
GenerateSubscriptions: c.GenerateDefaultSubscriptions,
Features: &c.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: coinbaseproWebsocketURL,
RunningURL: wsRunningURL,
Connector: c.WsConnect,
Subscriber: c.Subscribe,
Unsubscriber: c.Unsubscribe,
GenerateSubscriptions: c.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &c.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
},
Expand Down
17 changes: 9 additions & 8 deletions exchanges/coinut/coinut_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,15 @@ func (c *COINUT) Setup(exch *config.Exchange) error {
}

err = c.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: coinutWebsocketURL,
RunningURL: wsRunningURL,
Connector: c.WsConnect,
Subscriber: c.Subscribe,
Unsubscriber: c.Unsubscribe,
GenerateSubscriptions: c.GenerateDefaultSubscriptions,
Features: &c.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: coinutWebsocketURL,
RunningURL: wsRunningURL,
Connector: c.WsConnect,
Subscriber: c.Subscribe,
Unsubscriber: c.Unsubscribe,
GenerateSubscriptions: c.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &c.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
1 change: 1 addition & 0 deletions exchanges/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ func TestSetupDefaults(t *testing.T) {
API: config.APIConfig{
AuthenticatedSupport: true,
},
ConnectionMonitorDelay: time.Second * 5,
}

err = b.SetupDefaults(&cfg)
Expand Down
15 changes: 8 additions & 7 deletions exchanges/gateio/gateio_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,14 @@ func (g *Gateio) Setup(exch *config.Exchange) error {
}

err = g.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: gateioWebsocketEndpoint,
RunningURL: wsRunningURL,
Connector: g.WsConnect,
Subscriber: g.Subscribe,
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
Features: &g.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: gateioWebsocketEndpoint,
RunningURL: wsRunningURL,
Connector: g.WsConnect,
Subscriber: g.Subscribe,
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &g.Features.Supports.WebsocketCapabilities,
})
if err != nil {
return err
Expand Down
17 changes: 9 additions & 8 deletions exchanges/gemini/gemini_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,15 @@ func (g *Gemini) Setup(exch *config.Exchange) error {
}

err = g.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: geminiWebsocketEndpoint,
RunningURL: wsRunningURL,
Connector: g.WsConnect,
Subscriber: g.Subscribe,
Unsubscriber: g.Unsubscribe,
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
Features: &g.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: geminiWebsocketEndpoint,
RunningURL: wsRunningURL,
Connector: g.WsConnect,
Subscriber: g.Subscribe,
Unsubscriber: g.Unsubscribe,
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &g.Features.Supports.WebsocketCapabilities,
})
if err != nil {
return err
Expand Down
17 changes: 9 additions & 8 deletions exchanges/hitbtc/hitbtc_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,15 @@ func (h *HitBTC) Setup(exch *config.Exchange) error {
}

err = h.Websocket.Setup(&stream.WebsocketSetup{
ExchangeConfig: exch,
DefaultURL: hitbtcWebsocketAddress,
RunningURL: wsRunningURL,
Connector: h.WsConnect,
Subscriber: h.Subscribe,
Unsubscriber: h.Unsubscribe,
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
ExchangeConfig: exch,
DefaultURL: hitbtcWebsocketAddress,
RunningURL: wsRunningURL,
Connector: h.WsConnect,
Subscriber: h.Subscribe,
Unsubscriber: h.Unsubscribe,
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
ConnectionMonitorDelay: exch.ConnectionMonitorDelay,
Features: &h.Features.Supports.WebsocketCapabilities,
OrderbookBufferConfig: buffer.Config{
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
Loading

0 comments on commit 82c79a9

Please sign in to comment.