Skip to content

Commit

Permalink
Binance: websocket orderbook reconnection fix, some bug fix (thrasher…
Browse files Browse the repository at this point in the history
…-corp#630)

* binance websocket orderbook reconnection fix

* add context.WithDeadline

* Deadline() context still not resolved

* stage1

* fmt

* cleanup

* fix applyBufferUpdate err returning

* remove comment

* remove extra return

* increase Binance maxWSUpdateBuffer

Co-authored-by: Vazha Bezhanishvili <[email protected]>
Co-authored-by: gloriousCode <[email protected]>
  • Loading branch information
3 people authored Mar 3, 2021
1 parent 49bd39e commit 2064743
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 68 deletions.
2 changes: 1 addition & 1 deletion engine/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (e *ExchangeCurrencyPairSyncer) worker() {
continue
}
if switchedToRest && usingWebsocket {
log.Infof(log.SyncMgr,
log.Warnf(log.SyncMgr,
"%s %s: Websocket re-enabled, switching from rest to websocket\n",
c.Exchange, FormatCurrency(enabledPairs[i]).String())
switchedToRest = false
Expand Down
115 changes: 55 additions & 60 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var listenKey string
var (
// maxWSUpdateBuffer defines max websocket updates to apply when an
// orderbook is initially fetched
maxWSUpdateBuffer = 100
maxWSUpdateBuffer = 150
// maxWSOrderbookJobs defines max websocket orderbook jobs in queue to fetch
// an orderbook snapshot via REST
maxWSOrderbookJobs = 2000
Expand All @@ -47,6 +47,7 @@ func (b *Binance) WsConnect() error {
}

var dialer websocket.Dialer
dialer.HandshakeTimeout = b.Config.HTTPTimeout
var err error
if b.Websocket.CanUseAuthenticatedEndpoints() {
listenKey, err = b.GetWsAuthStreamKey()
Expand Down Expand Up @@ -84,28 +85,9 @@ func (b *Binance) WsConnect() error {
Delay: pingDelay,
})

go b.orderBookProcess()
return nil
}

// orderBookProcess prepare orderbook handling
func (b *Binance) orderBookProcess() {
enabledPairs, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
log.Errorf(log.ExchangeSys, "%s orderBookProcess, GetEnabledPairs error: %s", b.Name, err)
return
}

for i := range enabledPairs {
err = b.SeedLocalCache(enabledPairs[i])
if err != nil {
log.Errorf(log.ExchangeSys, "%s orderBookProcess, SeedLocalCache error: %s", b.Name, err)
return
}
}

go b.wsReadData()
b.setupOrderbookManager()
return nil
}

func (b *Binance) setupOrderbookManager() {
Expand Down Expand Up @@ -400,8 +382,11 @@ func (b *Binance) wsHandleData(respRaw []byte) error {
err)
}

err = b.UpdateLocalBuffer(&depth)
init, err := b.UpdateLocalBuffer(&depth)
if err != nil {
if init {
return nil
}
return fmt.Errorf("%v - UpdateLocalCache error: %s",
b.Name,
err)
Expand Down Expand Up @@ -472,51 +457,39 @@ func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBoo
}

// UpdateLocalBuffer updates and returns the most recent iteration of the orderbook
func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) error {
func (b *Binance) UpdateLocalBuffer(wsdp *WebsocketDepthStream) (bool, error) {
enabledPairs, err := b.GetEnabledPairs(asset.Spot)
if err != nil {
return err
return false, err
}

format, err := b.GetPairFormat(asset.Spot, true)
if err != nil {
return err
return false, err
}

currencyPair, err := currency.NewPairFromFormattedPairs(wsdp.Pair,
enabledPairs,
format)
if err != nil {
return err
return false, err
}

err = b.obm.stageWsUpdate(wsdp, currencyPair, asset.Spot)
if err != nil {
return err
init, err2 := b.obm.checkIsInitialSync(currencyPair)
if err2 != nil {
return false, err2
}
return init, err
}

err = b.applyBufferUpdate(currencyPair)
if err != nil {
cleanupErr := b.Websocket.Orderbook.FlushOrderbook(currencyPair, asset.Spot)
if cleanupErr != nil {
log.Errorf(log.WebsocketMgr,
"%s flushing websocket error: %v",
b.Name,
cleanupErr)
}

cleanupErr = b.obm.cleanup(currencyPair)
if cleanupErr != nil {
log.Errorf(log.WebsocketMgr,
"%s cleanup websocket orderbook error: %v",
b.Name,
cleanupErr)
}

return err
b.flushAndCleanup(currencyPair)
}

return nil
return false, err
}

// GenerateSubscriptions generates the default subscription set
Expand Down Expand Up @@ -645,7 +618,7 @@ func (b *Binance) applyBufferUpdate(pair currency.Pair) error {
}

recent := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot)
if recent == nil {
if recent == nil || (recent.Asks == nil && recent.Bids == nil) {
return b.obm.fetchBookViaREST(pair)
}

Expand Down Expand Up @@ -691,24 +664,29 @@ func (b *Binance) processJob(p currency.Pair) error {
// new update to initiate this.
err = b.applyBufferUpdate(p)
if err != nil {
errClean := b.Websocket.Orderbook.FlushOrderbook(p, asset.Spot)
if errClean != nil {
log.Errorf(log.WebsocketMgr,
"%s flushing websocket error: %v",
b.Name,
errClean)
}
errClean = b.obm.cleanup(p)
if errClean != nil {
log.Errorf(log.WebsocketMgr, "%s cleanup websocket error: %v",
b.Name,
errClean)
}
b.flushAndCleanup(p)
return err
}
return nil
}

// flushAndCleanup flushes orderbook and clean local cache
func (b *Binance) flushAndCleanup(p currency.Pair) {
errClean := b.Websocket.Orderbook.FlushOrderbook(p, asset.Spot)
if errClean != nil {
log.Errorf(log.WebsocketMgr,
"%s flushing websocket error: %v",
b.Name,
errClean)
}
errClean = b.obm.cleanup(p)
if errClean != nil {
log.Errorf(log.WebsocketMgr, "%s cleanup websocket error: %v",
b.Name,
errClean)
}
}

// stageWsUpdate stages websocket update to roll through updates that need to
// be applied to a fetched orderbook via REST.
func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency.Pair, a asset.Item) error {
Expand Down Expand Up @@ -743,6 +721,8 @@ func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency.
case state.buffer <- u:
return nil
default:
<-state.buffer // pop one element
state.buffer <- u // to shift buffer on fail
return fmt.Errorf("channel blockage for %s, asset %s and connection",
pair, a)
}
Expand Down Expand Up @@ -801,6 +781,21 @@ func (o *orderbookManager) completeInitialSync(pair currency.Pair) error {
return nil
}

// checkIsInitialSync checks status if the book is Initial Sync being via the REST
// protocol.
func (o *orderbookManager) checkIsInitialSync(pair currency.Pair) (bool, error) {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
if !ok {
return false,
fmt.Errorf("checkIsInitialSync of orderbook cannot match currency pair %s asset type %s",
pair,
asset.Spot)
}
return state.initialSync, nil
}

// fetchBookViaREST pushes a job of fetching the orderbook via the REST protocol
// to get an initial full book that we can apply our buffered updates too.
func (o *orderbookManager) fetchBookViaREST(pair currency.Pair) error {
Expand Down Expand Up @@ -871,7 +866,7 @@ func (u *update) validate(updt *WebsocketDepthStream, recent *orderbook.Base) (b
if u.initialSync {
// The first processed event should have U <= lastUpdateId+1 AND
// u >= lastUpdateId+1.
if updt.FirstUpdateID > id && updt.LastUpdateID < id {
if updt.FirstUpdateID > id || updt.LastUpdateID < id {
return false, fmt.Errorf("initial websocket orderbook sync failure for pair %s and asset %s",
recent.Pair,
asset.Spot)
Expand Down
12 changes: 6 additions & 6 deletions exchanges/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,11 @@ func (e *Base) SetupDefaults(exch *config.ExchangeConfig) error {

if exch.HTTPTimeout <= time.Duration(0) {
exch.HTTPTimeout = DefaultHTTPTimeout
} else {
err := e.SetHTTPClientTimeout(exch.HTTPTimeout)
if err != nil {
return err
}
}

err := e.SetHTTPClientTimeout(exch.HTTPTimeout)
if err != nil {
return err
}

if exch.CurrencyPairs == nil {
Expand All @@ -573,7 +573,7 @@ func (e *Base) SetupDefaults(exch *config.ExchangeConfig) error {
e.SetHTTPClientUserAgent(exch.HTTPUserAgent)
e.SetCurrencyPairFormat()

err := e.SetConfigPairs()
err = e.SetConfigPairs()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *Requester) doRequest(req *http.Request, p *Item) error {
delay = after
}

if d, ok := req.Context().Deadline(); ok && d.After(time.Now().Add(delay)) {
if d, ok := req.Context().Deadline(); ok && d.After(time.Now()) && time.Now().Add(delay).After(d) {
if err != nil {
return fmt.Errorf("request.go error - deadline would be exceeded by retry, err: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions exchanges/stream/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (w *Orderbook) LoadSnapshot(book *orderbook.Base) error {
m3 = &orderbookHolder{ob: book, buffer: &[]Update{}}
m2[book.AssetType] = m3
} else {
m3.ob.LastUpdateID = book.LastUpdateID
m3.ob.Bids = book.Bids
m3.ob.Asks = book.Asks
}
Expand Down

0 comments on commit 2064743

Please sign in to comment.