Skip to content

Commit

Permalink
engine/websocket: subscribe to default channels only when actually ne…
Browse files Browse the repository at this point in the history
…eded (thrasher-corp#610)

* if this is required by ws routines or sync manager
* restore previous subscriptions on reconnect
  • Loading branch information
Rots authored Dec 29, 2020
1 parent b95cfac commit d1b206c
Show file tree
Hide file tree
Showing 24 changed files with 75 additions and 146 deletions.
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (bot *Engine) Start() error {
}

if bot.Settings.EnableWebsocketRoutine {
go WebsocketRoutine()
go bot.WebsocketRoutine()
}

if bot.Settings.EnableGCTScriptManager {
Expand Down
9 changes: 5 additions & 4 deletions engine/orders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@ import (

var ordersSetupRan bool

func OrdersSetup(t *testing.T) {
SetupTestHelpers(t)
func OrdersSetup(t *testing.T) *Engine {
bot := SetupTestHelpers(t)
if !ordersSetupRan {
err := Bot.OrderManager.Start()
err := bot.OrderManager.Start()
if err != nil {
t.Fatal(err)
}
if !Bot.OrderManager.Started() {
if !bot.OrderManager.Started() {
t.Fatal("Order manager not started")
}
ordersSetupRan = true
}
return bot
}

func TestOrdersGet(t *testing.T) {
Expand Down
46 changes: 25 additions & 21 deletions engine/routines.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,16 @@ func relayWebsocketEvent(result interface{}, event, assetType, exchangeName stri
}

// WebsocketRoutine Initial routine management system for websocket
func WebsocketRoutine() {
if Bot.Settings.Verbose {
func (bot *Engine) WebsocketRoutine() {
if bot.Settings.Verbose {
log.Debugln(log.WebsocketMgr, "Connecting exchange websocket services...")
}

exchanges := Bot.GetExchanges()
exchanges := bot.GetExchanges()
for i := range exchanges {
go func(i int) {
if exchanges[i].SupportsWebsocket() {
if Bot.Settings.Verbose {
if bot.Settings.Verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: Yes Enabled: %v\n",
exchanges[i].GetName(),
Expand All @@ -235,15 +235,19 @@ func WebsocketRoutine() {
}

// Data handler routine
go WebsocketDataReceiver(ws)
go bot.WebsocketDataReceiver(ws)

if ws.IsEnabled() {
err = ws.Connect()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v\n", err)
}
err = ws.FlushChannels()
if err != nil {
log.Errorf(log.WebsocketMgr, "Failed to subscribe: %v\n", err)
}
}
} else if Bot.Settings.Verbose {
} else if bot.Settings.Verbose {
log.Debugf(log.WebsocketMgr,
"Exchange %s websocket support: No\n",
exchanges[i].GetName(),
Expand All @@ -258,7 +262,7 @@ var wg sync.WaitGroup

// WebsocketDataReceiver handles websocket data coming from a websocket feed
// associated with an exchange
func WebsocketDataReceiver(ws *stream.Websocket) {
func (bot *Engine) WebsocketDataReceiver(ws *stream.Websocket) {
wg.Add(1)
defer wg.Done()

Expand All @@ -267,7 +271,7 @@ func WebsocketDataReceiver(ws *stream.Websocket) {
case <-shutdowner:
return
case data := <-ws.ToRoutine:
err := WebsocketDataHandler(ws.GetName(), data)
err := bot.WebsocketDataHandler(ws.GetName(), data)
if err != nil {
log.Error(log.WebsocketMgr, err)
}
Expand All @@ -277,7 +281,7 @@ func WebsocketDataReceiver(ws *stream.Websocket) {

// WebsocketDataHandler is a central point for exchange websocket implementations to send
// processed data. WebsocketDataHandler will then pass that to an appropriate handler
func WebsocketDataHandler(exchName string, data interface{}) error {
func (bot *Engine) WebsocketDataHandler(exchName string, data interface{}) error {
if data == nil {
return fmt.Errorf("routines.go - exchange %s nil data sent to websocket",
exchName)
Expand All @@ -289,16 +293,16 @@ func WebsocketDataHandler(exchName string, data interface{}) error {
case error:
return fmt.Errorf("routines.go exchange %s websocket error - %s", exchName, data)
case stream.FundingData:
if Bot.Settings.Verbose {
if bot.Settings.Verbose {
log.Infof(log.WebsocketMgr, "%s websocket %s %s funding updated %+v",
exchName,
FormatCurrency(d.CurrencyPair),
d.AssetType,
d)
}
case *ticker.Price:
if Bot.Settings.EnableExchangeSyncManager && Bot.ExchangeCurrencyPairManager != nil {
Bot.ExchangeCurrencyPairManager.update(exchName,
if bot.Settings.EnableExchangeSyncManager && bot.ExchangeCurrencyPairManager != nil {
bot.ExchangeCurrencyPairManager.update(exchName,
d.Pair,
d.AssetType,
SyncItemTicker,
Expand All @@ -307,39 +311,39 @@ func WebsocketDataHandler(exchName string, data interface{}) error {
err := ticker.ProcessTicker(d)
printTickerSummary(d, "websocket", err)
case stream.KlineData:
if Bot.Settings.Verbose {
if bot.Settings.Verbose {
log.Infof(log.WebsocketMgr, "%s websocket %s %s kline updated %+v",
exchName,
FormatCurrency(d.Pair),
d.AssetType,
d)
}
case *orderbook.Base:
if Bot.Settings.EnableExchangeSyncManager && Bot.ExchangeCurrencyPairManager != nil {
Bot.ExchangeCurrencyPairManager.update(exchName,
if bot.Settings.EnableExchangeSyncManager && bot.ExchangeCurrencyPairManager != nil {
bot.ExchangeCurrencyPairManager.update(exchName,
d.Pair,
d.AssetType,
SyncItemOrderbook,
nil)
}
printOrderbookSummary(d, "websocket", nil)
case *order.Detail:
if !Bot.OrderManager.orderStore.exists(d) {
err := Bot.OrderManager.orderStore.Add(d)
if !bot.OrderManager.orderStore.exists(d) {
err := bot.OrderManager.orderStore.Add(d)
if err != nil {
return err
}
} else {
od, err := Bot.OrderManager.orderStore.GetByExchangeAndID(d.Exchange, d.ID)
od, err := bot.OrderManager.orderStore.GetByExchangeAndID(d.Exchange, d.ID)
if err != nil {
return err
}
od.UpdateOrderFromDetail(d)
}
case *order.Cancel:
return Bot.OrderManager.Cancel(d)
return bot.OrderManager.Cancel(d)
case *order.Modify:
od, err := Bot.OrderManager.orderStore.GetByExchangeAndID(d.Exchange, d.ID)
od, err := bot.OrderManager.orderStore.GetByExchangeAndID(d.Exchange, d.ID)
if err != nil {
return err
}
Expand All @@ -349,7 +353,7 @@ func WebsocketDataHandler(exchName string, data interface{}) error {
case stream.UnhandledMessageWarning:
log.Warn(log.WebsocketMgr, d.Message)
default:
if Bot.Settings.Verbose {
if bot.Settings.Verbose {
log.Warnf(log.WebsocketMgr,
"%s websocket Unknown type: %+v",
exchName,
Expand Down
32 changes: 16 additions & 16 deletions engine/routines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,33 @@ import (

func TestWebsocketDataHandlerProcess(t *testing.T) {
ws := sharedtestvalues.NewTestWebsocket()
go WebsocketDataReceiver(ws)
go Bot.WebsocketDataReceiver(ws)
ws.DataHandler <- "string"
time.Sleep(time.Second)
close(shutdowner)
}

func TestHandleData(t *testing.T) {
OrdersSetup(t)
b := OrdersSetup(t)
var exchName = "exch"
var orderID = "testOrder.Detail"
err := WebsocketDataHandler(exchName, errors.New("error"))
err := b.WebsocketDataHandler(exchName, errors.New("error"))
if err == nil {
t.Error("Error not handled correctly")
}
err = WebsocketDataHandler(exchName, nil)
err = b.WebsocketDataHandler(exchName, nil)
if err == nil {
t.Error("Expected nil data error")
}
err = WebsocketDataHandler(exchName, stream.FundingData{})
err = b.WebsocketDataHandler(exchName, stream.FundingData{})
if err != nil {
t.Error(err)
}
err = WebsocketDataHandler(exchName, &ticker.Price{})
err = b.WebsocketDataHandler(exchName, &ticker.Price{})
if err != nil {
t.Error(err)
}
err = WebsocketDataHandler(exchName, stream.KlineData{})
err = b.WebsocketDataHandler(exchName, stream.KlineData{})
if err != nil {
t.Error(err)
}
Expand All @@ -51,12 +51,12 @@ func TestHandleData(t *testing.T) {
Amount: 1337,
Price: 1337,
}
err = WebsocketDataHandler(exchName, origOrder)
err = b.WebsocketDataHandler(exchName, origOrder)
if err != nil {
t.Error(err)
}
// Send it again since it exists now
err = WebsocketDataHandler(exchName, &order.Detail{
err = b.WebsocketDataHandler(exchName, &order.Detail{
Exchange: fakePassExchange,
ID: orderID,
Amount: 1338,
Expand All @@ -68,7 +68,7 @@ func TestHandleData(t *testing.T) {
t.Error("Bad pipeline")
}

err = WebsocketDataHandler(exchName, &order.Modify{
err = b.WebsocketDataHandler(exchName, &order.Modify{
Exchange: fakePassExchange,
ID: orderID,
Status: order.Active,
Expand All @@ -80,7 +80,7 @@ func TestHandleData(t *testing.T) {
t.Error("Expected order to be modified to Active")
}

err = WebsocketDataHandler(exchName, &order.Cancel{
err = b.WebsocketDataHandler(exchName, &order.Cancel{
Exchange: fakePassExchange,
ID: orderID,
})
Expand All @@ -91,12 +91,12 @@ func TestHandleData(t *testing.T) {
t.Error("Expected order status to be cancelled")
}
// Send some gibberish
err = WebsocketDataHandler(exchName, order.Stop)
err = b.WebsocketDataHandler(exchName, order.Stop)
if err != nil {
t.Error(err)
}

err = WebsocketDataHandler(exchName, stream.UnhandledMessageWarning{
err = b.WebsocketDataHandler(exchName, stream.UnhandledMessageWarning{
Message: "there's an issue here's a tissue"},
)
if err != nil {
Expand All @@ -108,22 +108,22 @@ func TestHandleData(t *testing.T) {
OrderID: "one",
Err: errors.New("lol"),
}
err = WebsocketDataHandler(exchName, classificationError)
err = b.WebsocketDataHandler(exchName, classificationError)
if err == nil {
t.Error("Expected error")
}
if err.Error() != classificationError.Error() {
t.Errorf("Problem formatting error. Expected %v Received %v", classificationError.Error(), err.Error())
}

err = WebsocketDataHandler(exchName, &orderbook.Base{
err = b.WebsocketDataHandler(exchName, &orderbook.Base{
ExchangeName: fakePassExchange,
Pair: currency.NewPair(currency.BTC, currency.USD),
})
if err != nil {
t.Error(err)
}
err = WebsocketDataHandler(exchName, "this is a test string")
err = b.WebsocketDataHandler(exchName, "this is a test string")
if err != nil {
t.Error(err)
}
Expand Down
5 changes: 4 additions & 1 deletion engine/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,12 @@ func (e *ExchangeCurrencyPairSyncer) Start() {
}

if !ws.IsConnected() && !ws.IsConnecting() {
go WebsocketDataReceiver(ws)
go Bot.WebsocketDataReceiver(ws)

err = ws.Connect()
if err == nil {
err = ws.FlushChannels()
}
if err != nil {
log.Errorf(log.SyncMgr,
"%s websocket failed to connect. Err: %s\n",
Expand Down
6 changes: 1 addition & 5 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ func (b *Binance) WsConnect() error {

go b.wsReadData()

subs, err := b.GenerateSubscriptions()
if err != nil {
return err
}
return b.Websocket.SubscribeToChannels(subs)
return nil
}

// KeepAuthKeyAlive will continuously send messages to
Expand Down
6 changes: 1 addition & 5 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,8 @@ func (b *Bitfinex) WsConnect() error {
}
}

subs, err := b.GenerateDefaultSubscriptions()
if err != nil {
return err
}
go b.WsDataHandler()
return b.Websocket.SubscribeToChannels(subs)
return nil
}

// wsReadData receives and passes on websocket messages for processing
Expand Down
9 changes: 0 additions & 9 deletions exchanges/bitmex/bitmex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ func (b *Bitmex) WsConnect() error {
}

go b.wsReadData()
subs, err := b.GenerateDefaultSubscriptions()
if err != nil {
return err
}

err = b.Websocket.SubscribeToChannels(subs)
if err != nil {
return err
}

err = b.websocketSendAuth()
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ func (b *Bitstamp) WsConnect() error {
if err != nil {
b.Websocket.DataHandler <- err
}
subs, err := b.generateDefaultSubscriptions()
if err != nil {
return err
}
go b.wsReadData()
return b.Websocket.SubscribeToChannels(subs)
return nil
}

// wsReadData receives and passes on websocket messages for processing
Expand Down
6 changes: 1 addition & 5 deletions exchanges/btcmarkets/btcmarkets_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ func (b *BTCMarkets) WsConnect() error {
log.Debugf(log.ExchangeSys, "%s Connected to Websocket.\n", b.Name)
}
go b.wsReadData()
subs, err := b.generateDefaultSubscriptions()
if err != nil {
return err
}
return b.Websocket.SubscribeToChannels(subs)
return nil
}

// wsReadData receives and passes on websocket messages for processing
Expand Down
6 changes: 1 addition & 5 deletions exchanges/btse/btse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ func (b *BTSE) WsConnect() error {
}
}

subs, err := b.GenerateDefaultSubscriptions()
if err != nil {
return err
}
return b.Websocket.SubscribeToChannels(subs)
return nil
}

// WsAuthenticate Send an authentication message to receive auth data
Expand Down
Loading

0 comments on commit d1b206c

Please sign in to comment.