Skip to content

Commit

Permalink
Bittrex: Enable ws orderbook sync recovery (resolves thrasher-corp#746)…
Browse files Browse the repository at this point in the history
… (thrasher-corp#747)

* [FIX] Enable ws orderbook sync recovery by:

- Testing if books have been cleared
- Assigning options when loading snapshot

* orderbooks: remove setlastupdate method and on select depth method that updates linked list, this reduced lock contention across code base and fixes buffer bug on applying buffered updates

* WS - Introduce signaling for the need to fetch the orderbook

* Address nits

* Update error messages to include exchange name

Co-authored-by: shazbert <[email protected]>
  • Loading branch information
TaltaM and shazbert authored Aug 18, 2021
1 parent 08df015 commit 4ba2c71
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 93 deletions.
7 changes: 4 additions & 3 deletions exchanges/bittrex/bittrex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ type orderbookManager struct {
}

type update struct {
buffer chan *OrderbookUpdateMessage
fetchingBook bool
initialSync bool
buffer chan *OrderbookUpdateMessage
fetchingBook bool
initialSync bool
needsFetchingBook bool
}

// job defines a synchonisation job that tells a go routine to fetch an
Expand Down
117 changes: 87 additions & 30 deletions exchanges/bittrex/bittrex_ws_orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b *Bittrex) UpdateLocalOBBuffer(update *OrderbookUpdateMessage) (bool, err

err = b.applyBufferUpdate(currencyPair)
if err != nil {
b.flushAndCleanup(currencyPair)
log.Errorf(log.WebsocketMgr, "%s websocket UpdateLocalOBBuffer: Could not apply buffer update\n", b.Name)
}

return false, err
Expand Down Expand Up @@ -136,23 +136,40 @@ func (b *Bittrex) SeedLocalCacheWithOrderBook(p currency.Pair, sequence int64, o
// applyBufferUpdate applies the buffer to the orderbook or initiates a new
// orderbook sync by the REST protocol which is off handed to go routine.
func (b *Bittrex) applyBufferUpdate(pair currency.Pair) error {
fetching, err := b.obm.checkIsFetchingBook(pair)
fetching, needsFetching, err := b.obm.handleFetchingBook(pair)
if err != nil {
return err
}
if fetching {
return nil
}

recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot)
if err != nil || (recent.Asks == nil && recent.Bids == nil) {
if needsFetching {
if b.Verbose {
log.Debugf(log.WebsocketMgr, "Orderbook: Fetching via REST\n")
log.Debugf(log.WebsocketMgr, "%s Orderbook: Fetching via REST\n", b.Name)
}
return b.obm.fetchBookViaREST(pair)
}

return b.obm.checkAndProcessUpdate(b.ProcessUpdateOB, pair, recent)
recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot)
if err != nil {
log.Errorf(
log.WebsocketMgr,
"%s error fetching recent orderbook when applying updates: %s\n",
b.Name,
err)
}

if recent != nil {
err = b.obm.checkAndProcessUpdate(b.ProcessUpdateOB, pair, recent)
if err != nil {
log.Errorf(
log.WebsocketMgr,
"%s error processing update - initiating new orderbook sync via REST: %s\n",
b.Name,
err)
b.obm.setNeedsFetchingBook(pair)
}
}
return nil
}

// SynchroniseWebsocketOrderbook synchronises full orderbook for currency pair
Expand Down Expand Up @@ -192,12 +209,7 @@ func (b *Bittrex) processJob(p currency.Pair) error {

// Immediately apply the buffer updates so we don't wait for a
// new update to initiate this.
err = b.applyBufferUpdate(p)
if err != nil {
b.flushAndCleanup(p)
return err
}
return nil
return b.applyBufferUpdate(p)
}

// flushAndCleanup flushes orderbook and clean local cache
Expand Down Expand Up @@ -239,9 +251,10 @@ func (o *orderbookManager) stageWsUpdate(u *OrderbookUpdateMessage, pair currenc
state = &update{
// 100ms update assuming we might have up to a 10 second delay.
// There could be a potential 100 updates for the currency.
buffer: make(chan *OrderbookUpdateMessage, maxWSUpdateBuffer),
fetchingBook: false,
initialSync: true,
buffer: make(chan *OrderbookUpdateMessage, maxWSUpdateBuffer),
fetchingBook: false,
initialSync: true,
needsFetchingBook: true,
}
m2[a] = state
}
Expand All @@ -258,23 +271,27 @@ func (o *orderbookManager) stageWsUpdate(u *OrderbookUpdateMessage, pair currenc
}
}

// checkIsFetchingBook checks status if the book is currently being via the REST
// protocol.
func (o *orderbookManager) checkIsFetchingBook(pair currency.Pair) (bool, error) {
// stopFetchingBook completes the book fetching.
func (o *orderbookManager) stopFetchingBook(pair currency.Pair) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
if !ok {
return false,
fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s",
pair,
asset.Spot)
return fmt.Errorf("could not match pair %s and asset type %s in hash table",
pair,
asset.Spot)
}
return state.fetchingBook, nil
if !state.fetchingBook {
return fmt.Errorf("fetching book already set to false for %s %s",
pair,
asset.Spot)
}
state.fetchingBook = false
return nil
}

// stopFetchingBook completes the book fetching.
func (o *orderbookManager) stopFetchingBook(pair currency.Pair) error {
// stopNeedsFetchingBook completes the book fetching initiation.
func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
Expand All @@ -283,15 +300,54 @@ func (o *orderbookManager) stopFetchingBook(pair currency.Pair) error {
pair,
asset.Spot)
}
if !state.fetchingBook {
return fmt.Errorf("fetching book already set to false for %s %s",
if !state.needsFetchingBook {
return fmt.Errorf("needs fetching book already set to false for %s %s",
pair,
asset.Spot)
}
state.fetchingBook = false
state.needsFetchingBook = false
return nil
}

// setNeedsFetchingBook completes the book fetching initiation.
func (o *orderbookManager) setNeedsFetchingBook(pair currency.Pair) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
if !ok {
return fmt.Errorf("could not match pair %s and asset type %s in hash table",
pair,
asset.Spot)
}
state.needsFetchingBook = true
return nil
}

// handleFetchingBook checks if a full book is being fetched or needs to be
// fetched
func (o *orderbookManager) handleFetchingBook(pair currency.Pair) (fetching, needsFetching bool, err error) {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
if !ok {
return false, false,
fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s",
pair,
asset.Spot)
}

if state.fetchingBook {
return true, false, nil
}

if state.needsFetchingBook {
state.needsFetchingBook = false
state.fetchingBook = true
return false, true, nil
}
return false, false, nil
}

// completeInitialSync sets if an asset type has completed its initial sync
func (o *orderbookManager) completeInitialSync(pair currency.Pair) error {
o.Lock()
Expand Down Expand Up @@ -437,5 +493,6 @@ bufferEmpty:
// disable rest orderbook synchronisation
_ = o.stopFetchingBook(pair)
_ = o.completeInitialSync(pair)
_ = o.stopNeedsFetchingBook(pair)
return nil
}
36 changes: 21 additions & 15 deletions exchanges/orderbook/depth.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ func (d *Depth) TotalAskAmounts() (liquidity, value float64) {
}

// LoadSnapshot flushes the bids and asks with a snapshot
func (d *Depth) LoadSnapshot(bids, asks []Item) {
func (d *Depth) LoadSnapshot(bids, asks []Item, lastUpdateID int64, lastUpdated time.Time, updateByREST bool) {
d.m.Lock()
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.restSnapshot = updateByREST
d.bids.load(bids, d.stack)
d.asks.load(asks, d.stack)
d.alert()
Expand All @@ -105,6 +108,8 @@ func (d *Depth) LoadSnapshot(bids, asks []Item) {
// Flush flushes the bid and ask depths
func (d *Depth) Flush() {
d.m.Lock()
d.lastUpdateID = 0
d.lastUpdated = time.Time{}
d.bids.load(nil, d.stack)
d.asks.load(nil, d.stack)
d.alert()
Expand All @@ -113,11 +118,13 @@ func (d *Depth) Flush() {

// UpdateBidAskByPrice updates the bid and ask spread by supplied updates, this
// will trim total length of depth level to a specified supplied number
func (d *Depth) UpdateBidAskByPrice(bidUpdts, askUpdts Items, maxDepth int) {
func (d *Depth) UpdateBidAskByPrice(bidUpdts, askUpdts Items, maxDepth int, lastUpdateID int64, lastUpdated time.Time) {
if len(bidUpdts) == 0 && len(askUpdts) == 0 {
return
}
d.m.Lock()
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
tn := getNow()
if len(bidUpdts) != 0 {
d.bids.updateInsertByPrice(bidUpdts, d.stack, maxDepth, tn)
Expand All @@ -130,7 +137,7 @@ func (d *Depth) UpdateBidAskByPrice(bidUpdts, askUpdts Items, maxDepth int) {
}

// UpdateBidAskByID amends details by ID
func (d *Depth) UpdateBidAskByID(bidUpdts, askUpdts Items) error {
func (d *Depth) UpdateBidAskByID(bidUpdts, askUpdts Items, lastUpdateID int64, lastUpdated time.Time) error {
if len(bidUpdts) == 0 && len(askUpdts) == 0 {
return nil
}
Expand All @@ -148,12 +155,14 @@ func (d *Depth) UpdateBidAskByID(bidUpdts, askUpdts Items) error {
return err
}
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.alert()
return nil
}

// DeleteBidAskByID deletes a price level by ID
func (d *Depth) DeleteBidAskByID(bidUpdts, askUpdts Items, bypassErr bool) error {
func (d *Depth) DeleteBidAskByID(bidUpdts, askUpdts Items, bypassErr bool, lastUpdateID int64, lastUpdated time.Time) error {
if len(bidUpdts) == 0 && len(askUpdts) == 0 {
return nil
}
Expand All @@ -171,12 +180,14 @@ func (d *Depth) DeleteBidAskByID(bidUpdts, askUpdts Items, bypassErr bool) error
return err
}
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.alert()
return nil
}

// InsertBidAskByID inserts new updates
func (d *Depth) InsertBidAskByID(bidUpdts, askUpdts Items) error {
func (d *Depth) InsertBidAskByID(bidUpdts, askUpdts Items, lastUpdateID int64, lastUpdated time.Time) error {
if len(bidUpdts) == 0 && len(askUpdts) == 0 {
return nil
}
Expand All @@ -194,12 +205,14 @@ func (d *Depth) InsertBidAskByID(bidUpdts, askUpdts Items) error {
return err
}
}
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
d.alert()
return nil
}

// UpdateInsertByID updates or inserts by ID at current price level.
func (d *Depth) UpdateInsertByID(bidUpdts, askUpdts Items) error {
func (d *Depth) UpdateInsertByID(bidUpdts, askUpdts Items, lastUpdateID int64, lastUpdated time.Time) error {
if len(bidUpdts) == 0 && len(askUpdts) == 0 {
return nil
}
Expand All @@ -218,6 +231,8 @@ func (d *Depth) UpdateInsertByID(bidUpdts, askUpdts Items) error {
}
}
d.alert()
d.lastUpdateID = lastUpdateID
d.lastUpdated = lastUpdated
return nil
}

Expand All @@ -239,15 +254,6 @@ func (d *Depth) AssignOptions(b *Base) {
d.m.Unlock()
}

// SetLastUpdate sets details of last update information
func (d *Depth) SetLastUpdate(lastUpdate time.Time, lastUpdateID int64, updateByREST bool) {
d.m.Lock()
d.lastUpdated = lastUpdate
d.lastUpdateID = lastUpdateID
d.restSnapshot = updateByREST
d.m.Unlock()
}

// GetName returns name of exchange
func (d *Depth) GetName() string {
d.m.Lock()
Expand Down
Loading

0 comments on commit 4ba2c71

Please sign in to comment.