Skip to content

Commit

Permalink
exchanges/websocket: Allow configuration of orderbook publish period (t…
Browse files Browse the repository at this point in the history
…hrasher-corp#805)

* Allow configuration of orderbook publish period

For some applications that import GCT it's more interesting to be
immediately notified of an exchange orderbook update instead of
only getting notified every 10 seconds. This option allows that
to happen while keeping the previous default.

* exchanges: allow configuration of orderbook update period
  • Loading branch information
lrascao authored Oct 20, 2021
1 parent a4d792f commit a70224d
Show file tree
Hide file tree
Showing 26 changed files with 70 additions and 14 deletions.
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,14 @@ func (c *Config) CheckExchangeConfigValues() error {
defaultWebsocketOrderbookBufferLimit)
c.Exchanges[i].OrderbookConfig.WebsocketBufferLimit = defaultWebsocketOrderbookBufferLimit
}
if c.Exchanges[i].OrderbookConfig.PublishPeriod == nil || c.Exchanges[i].OrderbookConfig.PublishPeriod.Nanoseconds() < 0 {
log.Warnf(log.ConfigMgr,
"Exchange %s Websocket orderbook publish period value not set, defaulting to %v.",
c.Exchanges[i].Name,
DefaultOrderbookPublishPeriod)
publishPeriod := DefaultOrderbookPublishPeriod
c.Exchanges[i].OrderbookConfig.PublishPeriod = &publishPeriod
}
err := c.CheckPairConsistency(c.Exchanges[i].Name)
if err != nil {
log.Errorf(log.ConfigMgr,
Expand Down
4 changes: 4 additions & 0 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
defaultDataHistoryMonitorCheckTimer = time.Minute
defaultCurrencyStateManagerDelay = time.Minute
defaultMaxJobsPerCycle = 5
DefaultOrderbookPublishPeriod = time.Second * 10
)

// Constants here hold some messages
Expand Down Expand Up @@ -336,4 +337,7 @@ type OrderbookConfig struct {
VerificationBypass bool `json:"verificationBypass"`
WebsocketBufferLimit int `json:"websocketBufferLimit"`
WebsocketBufferEnabled bool `json:"websocketBufferEnabled"`
// PublishPeriod here is a pointer because we want to distinguish
// between zeroed out and missing.
PublishPeriod *time.Duration `json:"publishPeriod"`
}
1 change: 1 addition & 0 deletions exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (b *Binance) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: b.GenerateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
1 change: 1 addition & 0 deletions exchanges/bitfinex/bitfinex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (b *Bitfinex) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
UpdateEntriesByID: true,
})
Expand Down
1 change: 1 addition & 0 deletions exchanges/bithumb/bithumb_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (b *Bithumb) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: b.GenerateSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/bitstamp/bitstamp_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (b *Bitstamp) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: b.generateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions exchanges/bittrex/bittrex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ func (b *Bittrex) Setup(exch *config.ExchangeConfig) error {
// Orderbook buffer specific variables for processing orderbook updates via websocket feed.
// Other orderbook buffer vars:
// UpdateEntriesByID bool
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
SortBufferByUpdateIDs: true,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
SortBufferByUpdateIDs: true,
})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions exchanges/btcmarkets/btcmarkets_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (b *BTCMarkets) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: b.generateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
})
Expand Down
1 change: 1 addition & 0 deletions exchanges/btse/btse_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (b *BTSE) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: b.GenerateDefaultSubscriptions,
Features: &b.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/coinbasepro/coinbasepro_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (c *CoinbasePro) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: c.GenerateDefaultSubscriptions,
Features: &c.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
})
Expand Down
1 change: 1 addition & 0 deletions exchanges/coinbene/coinbene_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (c *Coinbene) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: c.GenerateDefaultSubscriptions,
Features: &c.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
})
Expand Down
1 change: 1 addition & 0 deletions exchanges/coinut/coinut_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (c *COINUT) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: c.GenerateDefaultSubscriptions,
Features: &c.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
1 change: 1 addition & 0 deletions exchanges/ftx/ftx_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func (f *FTX) Setup(exch *config.ExchangeConfig) error {
Features: &f.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions exchanges/gateio/gateio_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (g *Gateio) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
Features: &g.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/gemini/gemini_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (g *Gemini) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: g.GenerateDefaultSubscriptions,
Features: &g.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/hitbtc/hitbtc_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (h *HitBTC) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
1 change: 1 addition & 0 deletions exchanges/huobi/huobi_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (h *HUOBI) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: h.GenerateDefaultSubscriptions,
Features: &h.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/kraken/kraken_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (k *Kraken) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: k.GenerateDefaultSubscriptions,
Features: &k.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
})
Expand Down
1 change: 1 addition & 0 deletions exchanges/okgroup/okgroup_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (o *OKGroup) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: o.GenerateDefaultSubscriptions,
Features: &o.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/poloniex/poloniex_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (p *Poloniex) Setup(exch *config.ExchangeConfig) error {
GenerateSubscriptions: p.GenerateDefaultSubscriptions,
Features: &p.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
SortBuffer: true,
SortBufferByUpdateIDs: true,
Expand Down
20 changes: 19 additions & 1 deletion exchanges/stream/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (w *Orderbook) Setup(obBufferLimit int,
sortBufferByUpdateIDs,
updateEntriesByID,
verbose bool,
publishPeriod time.Duration,
exchangeName string,
dataHandler chan interface{}) error {
if exchangeName == "" {
Expand All @@ -51,6 +52,7 @@ func (w *Orderbook) Setup(obBufferLimit int,
w.dataHandler = dataHandler
w.ob = make(map[currency.Code]map[currency.Code]map[asset.Item]*orderbookHolder)
w.verbose = verbose
w.publishPeriod = publishPeriod
return nil
}

Expand Down Expand Up @@ -127,6 +129,17 @@ func (w *Orderbook) Update(u *Update) error {
}
}

// a nil ticker means that a zero publish period has been requested,
// this means publish now whatever was received with no throttling
if book.ticker == nil {
go func() {
w.dataHandler <- book.ob.Retrieve()
book.ob.Publish()
}()

return nil
}

select {
case <-book.ticker.C:
// Opted to wait for receiver because we are limiting here and the sync
Expand All @@ -143,6 +156,7 @@ func (w *Orderbook) Update(u *Update) error {
book.ob.Publish()
}
}

return nil
}

Expand Down Expand Up @@ -252,7 +266,11 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
}
depth.AssignOptions(book)
buffer := make([]Update, w.obBufferLimit)
ticker := time.NewTicker(timerDefault)

var ticker *time.Ticker
if w.publishPeriod != 0 {
ticker = time.NewTicker(w.publishPeriod)
}
holder = &orderbookHolder{
ob: depth,
buffer: &buffer,
Expand Down
10 changes: 5 additions & 5 deletions exchanges/stream/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,22 +712,22 @@ func TestGetOrderbook(t *testing.T) {
func TestSetup(t *testing.T) {
t.Parallel()
w := Orderbook{}
err := w.Setup(0, false, false, false, false, true, "", nil)
err := w.Setup(0, false, false, false, false, true, 0, "", nil)
if !errors.Is(err, errUnsetExchangeName) {
t.Fatalf("expected error %v but received %v", errUnsetExchangeName, err)
}

err = w.Setup(0, false, false, false, false, false, "test", nil)
err = w.Setup(0, false, false, false, false, false, 0, "test", nil)
if !errors.Is(err, errUnsetDataHandler) {
t.Fatalf("expected error %v but received %v", errUnsetDataHandler, err)
}

err = w.Setup(0, true, false, false, false, true, "test", make(chan interface{}))
err = w.Setup(0, true, false, false, false, true, 0, "test", make(chan interface{}))
if !errors.Is(err, errIssueBufferEnabledButNoLimit) {
t.Fatalf("expected error %v but received %v", errIssueBufferEnabledButNoLimit, err)
}

err = w.Setup(1337, true, true, true, true, false, "test", make(chan interface{}))
err = w.Setup(1337, true, true, true, true, false, 0, "test", make(chan interface{}))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1002,7 +1002,7 @@ func TestUpdateByIDAndAction(t *testing.T) {
func TestFlushOrderbook(t *testing.T) {
t.Parallel()
w := &Orderbook{}
err := w.Setup(5, false, false, false, false, false, "test", make(chan interface{}, 2))
err := w.Setup(5, false, false, false, false, false, 0, "test", make(chan interface{}, 2))
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 1 addition & 4 deletions exchanges/stream/buffer/buffer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
)

// timerDefault defines the amount of time between alerting the sync manager of
// an update.
var timerDefault = time.Second * 10

// Orderbook defines a local cache of orderbooks for amending, appending
// and deleting changes and updates the main store for a stream
type Orderbook struct {
Expand All @@ -25,6 +21,7 @@ type Orderbook struct {
exchangeName string
dataHandler chan interface{}
verbose bool
publishPeriod time.Duration
m sync.Mutex
}

Expand Down
7 changes: 7 additions & 0 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,19 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
w.Wg = new(sync.WaitGroup)
w.SetCanUseAuthenticatedEndpoints(s.AuthenticatedWebsocketAPISupport)

// default publish period if missing
orderbookPublishPeriod := config.DefaultOrderbookPublishPeriod
if s.OrderbookPublishPeriod != nil {
orderbookPublishPeriod = *s.OrderbookPublishPeriod
}

return w.Orderbook.Setup(s.OrderbookBufferLimit,
s.BufferEnabled,
s.SortBuffer,
s.SortBufferByUpdateIDs,
s.UpdateEntriesByID,
s.Verbose,
orderbookPublishPeriod,
w.exchangeName,
w.DataHandler)
}
Expand Down
3 changes: 3 additions & 0 deletions exchanges/stream/websocket_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type WebsocketSetup struct {
SortBuffer bool
SortBufferByUpdateIDs bool
UpdateEntriesByID bool
// OrderbookPublishPeriod is a pointer for the same reason as it is in `OrderbookConfig`:
// to allow distinguishing between a zeroed out value and a missing one
OrderbookPublishPeriod *time.Duration
}

// WebsocketConnection contains all the data needed to send a message to a WS
Expand Down
1 change: 1 addition & 0 deletions exchanges/zb/zb_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (z *ZB) Setup(exch *config.ExchangeConfig) error {
Subscriber: z.Subscribe,
Features: &z.Features.Supports.WebsocketCapabilities,
OrderbookBufferLimit: exch.OrderbookConfig.WebsocketBufferLimit,
OrderbookPublishPeriod: exch.OrderbookConfig.PublishPeriod,
BufferEnabled: exch.OrderbookConfig.WebsocketBufferEnabled,
})
if err != nil {
Expand Down

0 comments on commit a70224d

Please sign in to comment.