Skip to content

Commit

Permalink
websocket: fix deadlock when enabling/disabling via gctrpc (thrasher-…
Browse files Browse the repository at this point in the history
…corp#754)

* websocket: select case error if no receiver, add in functionality to reset to initial sync for books on a new websocket connection

* websocket: fix tests

* websocket: log error instead of losing it

* websocket: fix whoopsie

* exchanges: fix test

* websocket: force requirement of specific functionality

* exchanges: fix tests

* exchanges/websocket: move waitgroup add before scheduling across exchanges

* gateio: add feature subscribe

* bithumb/bittrex: include connection state reset, fix reconnection bug for Bithumb

* huobi: Add listen to shutdown to routine so it actually returns and stops being a naughty boy.

* huobi: add missing waitgroup add.

* exchanges: bleed comms channels

* binance: fix reconnection bug with buffer

* bithumb: fix reconnection bug with ws orderbook when websocket is diabled/enabled

* bithumb/bittrex: add bleeders for ws orderbook jobs

* linter: fix

* kraken: reduce code block from double assertion

* This bug ruined my day.

* glorious: error checking

* zb: add correct path for websocket connection

* exchange: Add verbosity when config conflicts and overwrites default values

* zb: add https to path

* exchanges: glorious nits

* stream: Add checkAndSetMonitoring to reduce potential routine bundling, increase timeout and check state in tests

* stream: remove check that is not needed.

* glorious: nits addr.

* lint: test
  • Loading branch information
shazbert authored Sep 3, 2021
1 parent a54c510 commit 66fbd43
Show file tree
Hide file tree
Showing 35 changed files with 635 additions and 164 deletions.
3 changes: 3 additions & 0 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,10 +2152,13 @@ func TestWsDepthUpdate(t *testing.T) {
t.Error(err)
}

b.obm.state[currency.BTC][currency.USDT][asset.Spot].fetchingBook = false

ob, err := b.Websocket.Orderbook.GetOrderbook(p, asset.Spot)
if err != nil {
t.Fatal(err)
}

if exp, got := seedLastUpdateID, ob.LastUpdateID; got != exp {
t.Fatalf("Unexpected Last update id of orderbook for old update. Exp: %d, got: %d", exp, got)
}
Expand Down
9 changes: 5 additions & 4 deletions exchanges/binance/binance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,10 +827,11 @@ type orderbookManager struct {
}

type update struct {
buffer chan *WebsocketDepthStream
fetchingBook bool
initialSync bool
lastUpdateID int64
buffer chan *WebsocketDepthStream
fetchingBook bool
initialSync bool
needsFetchingBook bool
lastUpdateID int64
}

// job defines a synchonisation job that tells a go routine to fetch an
Expand Down
125 changes: 107 additions & 18 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (b *Binance) WsConnect() error {
Delay: pingDelay,
})

b.Websocket.Wg.Add(1)
go b.wsReadData()

b.setupOrderbookManager()
return nil
}
Expand All @@ -97,12 +99,23 @@ func (b *Binance) setupOrderbookManager() {
state: make(map[currency.Code]map[currency.Code]map[asset.Item]*update),
jobs: make(chan job, maxWSOrderbookJobs),
}

for i := 0; i < maxWSOrderbookWorkers; i++ {
// 10 workers for synchronising book
b.SynchroniseWebsocketOrderbook()
} else {
// Change state on reconnect for initial sync.
for _, m1 := range b.obm.state {
for _, m2 := range m1 {
for _, update := range m2 {
update.initialSync = true
update.needsFetchingBook = true
update.lastUpdateID = 0
}
}
}
}

for i := 0; i < maxWSOrderbookWorkers; i++ {
// 10 workers for synchronising book
b.SynchroniseWebsocketOrderbook()
}
}

// KeepAuthKeyAlive will continuously send messages to
Expand All @@ -129,7 +142,6 @@ func (b *Binance) KeepAuthKeyAlive() {

// wsReadData receives and passes on websocket messages for processing
func (b *Binance) wsReadData() {
b.Websocket.Wg.Add(1)
defer b.Websocket.Wg.Done()

for {
Expand Down Expand Up @@ -662,20 +674,59 @@ func (b *Binance) ProcessUpdate(cp currency.Pair, a asset.Item, ws *WebsocketDep
// applyBufferUpdate applies the buffer to the orderbook or initiates a new
// orderbook sync by the REST protocol which is off handed to go routine.
func (b *Binance) applyBufferUpdate(pair currency.Pair) error {
fetching, err := b.obm.checkIsFetchingBook(pair)
fetching, needsFetching, err := b.obm.handleFetchingBook(pair)
if err != nil {
return err
}
if fetching {
return nil
}
if needsFetching {
if b.Verbose {
log.Debugf(log.WebsocketMgr, "%s Orderbook: Fetching via REST\n", b.Name)
}
return b.obm.fetchBookViaREST(pair)
}

recent, err := b.Websocket.Orderbook.GetOrderbook(pair, asset.Spot)
if err != nil || (recent.Asks == nil && recent.Bids == nil) {
return b.obm.fetchBookViaREST(pair)
if err != nil {
log.Errorf(
log.WebsocketMgr,
"%s error fetching recent orderbook when applying updates: %s\n",
b.Name,
err)
}

return b.obm.checkAndProcessUpdate(b.ProcessUpdate, pair, recent)
if recent != nil {
err = b.obm.checkAndProcessUpdate(b.ProcessUpdate, pair, recent)
if err != nil {
log.Errorf(
log.WebsocketMgr,
"%s error processing update - initiating new orderbook sync via REST: %s\n",
b.Name,
err)
err = b.obm.setNeedsFetchingBook(pair)
if err != nil {
return err
}
}
}

return nil
}

// setNeedsFetchingBook completes the book fetching initiation.
func (o *orderbookManager) setNeedsFetchingBook(pair currency.Pair) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
if !ok {
return fmt.Errorf("could not match pair %s and asset type %s in hash table",
pair,
asset.Spot)
}
state.needsFetchingBook = true
return nil
}

// SynchroniseWebsocketOrderbook synchronises full orderbook for currency pair
Expand All @@ -686,15 +737,21 @@ func (b *Binance) SynchroniseWebsocketOrderbook() {
defer b.Websocket.Wg.Done()
for {
select {
case <-b.Websocket.ShutdownC:
for {
select {
case <-b.obm.jobs:
default:
return
}
}
case j := <-b.obm.jobs:
err := b.processJob(j.Pair)
if err != nil {
log.Errorf(log.WebsocketMgr,
"%s processing websocket orderbook error %v",
b.Name, err)
}
case <-b.Websocket.ShutdownC:
return
}
}
}()
Expand Down Expand Up @@ -762,9 +819,10 @@ func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency.
state = &update{
// 100ms update assuming we might have up to a 10 second delay.
// There could be a potential 100 updates for the currency.
buffer: make(chan *WebsocketDepthStream, maxWSUpdateBuffer),
fetchingBook: false,
initialSync: true,
buffer: make(chan *WebsocketDepthStream, maxWSUpdateBuffer),
fetchingBook: false,
initialSync: true,
needsFetchingBook: true,
}
m2[a] = state
}
Expand All @@ -788,19 +846,30 @@ func (o *orderbookManager) stageWsUpdate(u *WebsocketDepthStream, pair currency.
}
}

// checkIsFetchingBook checks status if the book is currently being via the REST
// protocol.
func (o *orderbookManager) checkIsFetchingBook(pair currency.Pair) (bool, error) {
// handleFetchingBook checks if a full book is being fetched or needs to be
// fetched
func (o *orderbookManager) handleFetchingBook(pair currency.Pair) (fetching, needsFetching bool, err error) {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
if !ok {
return false,
false,
fmt.Errorf("check is fetching book cannot match currency pair %s asset type %s",
pair,
asset.Spot)
}
return state.fetchingBook, nil

if state.fetchingBook {
return true, false, nil
}

if state.needsFetchingBook {
state.needsFetchingBook = false
state.fetchingBook = true
return false, true, nil
}
return false, false, nil
}

// stopFetchingBook completes the book fetching.
Expand Down Expand Up @@ -960,5 +1029,25 @@ bufferEmpty:
// disable rest orderbook synchronisation
_ = o.stopFetchingBook(pair)
_ = o.completeInitialSync(pair)
_ = o.stopNeedsFetchingBook(pair)
return nil
}

// stopNeedsFetchingBook completes the book fetching initiation.
func (o *orderbookManager) stopNeedsFetchingBook(pair currency.Pair) error {
o.Lock()
defer o.Unlock()
state, ok := o.state[pair.Base][pair.Quote][asset.Spot]
if !ok {
return fmt.Errorf("could not match pair %s and asset type %s in hash table",
pair,
asset.Spot)
}
if !state.needsFetchingBook {
return fmt.Errorf("needs fetching book already set to false for %s %s",
pair,
asset.Spot)
}
state.needsFetchingBook = false
return nil
}
29 changes: 23 additions & 6 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (b *Bitfinex) WsConnect() error {
err)
}

b.Websocket.Wg.Add(1)
go b.wsReadData(b.Websocket.Conn)

if b.Websocket.CanUseAuthenticatedEndpoints() {
Expand All @@ -63,6 +64,7 @@ func (b *Bitfinex) WsConnect() error {
err)
b.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
b.Websocket.Wg.Add(1)
go b.wsReadData(b.Websocket.AuthConn)
err = b.WsSendAuth()
if err != nil {
Expand All @@ -74,13 +76,13 @@ func (b *Bitfinex) WsConnect() error {
}
}

b.Websocket.Wg.Add(1)
go b.WsDataHandler()
return nil
}

// wsReadData receives and passes on websocket messages for processing
func (b *Bitfinex) wsReadData(ws stream.Connection) {
b.Websocket.Wg.Add(1)
defer b.Websocket.Wg.Done()
for {
resp := ws.ReadMessage()
Expand All @@ -93,19 +95,34 @@ func (b *Bitfinex) wsReadData(ws stream.Connection) {

// WsDataHandler handles data from wsReadData
func (b *Bitfinex) WsDataHandler() {
b.Websocket.Wg.Add(1)
defer b.Websocket.Wg.Done()
for {
select {
case resp := <-comms:
if resp.Type == websocket.TextMessage {
case <-b.Websocket.ShutdownC:
select {
case resp := <-comms:
err := b.wsHandleData(resp.Raw)
if err != nil {
b.Websocket.DataHandler <- err
select {
case b.Websocket.DataHandler <- err:
default:
log.Errorf(log.WebsocketMgr,
"%s websocket handle data error: %v",
b.Name,
err)
}
}
default:
}
case <-b.Websocket.ShutdownC:
return
case resp := <-comms:
if resp.Type != websocket.TextMessage {
continue
}
err := b.wsHandleData(resp.Raw)
if err != nil {
b.Websocket.DataHandler <- err
}
}
}
}
Expand Down
21 changes: 14 additions & 7 deletions exchanges/bithumb/bithumb_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,31 @@ func (b *Bithumb) WsConnect() error {
b.Name,
err)
}

b.Websocket.Wg.Add(1)
go b.wsReadData()

b.setupOrderbookManager()
return nil
}

// wsReadData receives and passes on websocket messages for processing
func (b *Bithumb) wsReadData() {
b.Websocket.Wg.Add(1)
defer b.Websocket.Wg.Done()

for {
resp := b.Websocket.Conn.ReadMessage()
if resp.Raw == nil {
select {
case <-b.Websocket.ShutdownC:
return
}
err := b.wsHandleData(resp.Raw)
if err != nil {
b.Websocket.DataHandler <- err
default:
resp := b.Websocket.Conn.ReadMessage()
if resp.Raw == nil {
return
}
err := b.wsHandleData(resp.Raw)
if err != nil {
b.Websocket.DataHandler <- err
}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions exchanges/bithumb/bithumb_websocket_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ type orderbookManager struct {
}

type update struct {
buffer chan *WsOrderbooks
fetchingBook bool
initialSync bool
lastUpdated time.Time
buffer chan *WsOrderbooks
fetchingBook bool
initialSync bool
needsFetchingBook bool
lastUpdated time.Time
}

// job defines a synchonisation job that tells a go routine to fetch an
Expand Down
Loading

0 comments on commit 66fbd43

Please sign in to comment.