Skip to content

Commit

Permalink
HeadTracker has a goroutine dedicated to managing eth connection
Browse files Browse the repository at this point in the history
This means connections are managed asynchronously.
  • Loading branch information
dimroc committed Oct 3, 2018
1 parent 22ff3b5 commit a41c36b
Showing 1 changed file with 89 additions and 56 deletions.
145 changes: 89 additions & 56 deletions services/head_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@ type HeadTrackable interface {
// in a thread safe manner. Reconstitutes the last block number from the data
// store on reboot.
type HeadTracker struct {
trackers map[string]HeadTrackable
headers chan models.BlockHeader
headSubscription models.EthSubscription
store *store.Store
head *models.IndexableBlockNumber
headMutex sync.RWMutex
trackersMutex sync.RWMutex
connected bool
sleeper utils.Sleeper
done chan struct{}
disconnected chan struct{}
disconnectedWg sync.WaitGroup
started bool
connectedWg sync.WaitGroup
bootMutex sync.Mutex
trackers map[string]HeadTrackable
headers chan models.BlockHeader
headSubscription models.EthSubscription
store *store.Store
head *models.IndexableBlockNumber
headMutex sync.RWMutex
trackersMutex sync.RWMutex
connected bool
sleeper utils.Sleeper
done chan struct{}
disconnected chan struct{}
disconnectedWg sync.WaitGroup
started bool
connectedWg sync.WaitGroup
connectionRequestListenerWg sync.WaitGroup
connectionRequest chan struct{}
connectionSucceeded chan struct{}
bootMutex sync.Mutex
}

// NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers.
Expand Down Expand Up @@ -80,25 +83,14 @@ func (ht *HeadTracker) Start() error {
}

ht.done = make(chan struct{})
ht.started = true
return ht.connectToHead()
}
ht.connectionRequest = make(chan struct{})
ht.connectionSucceeded = make(chan struct{})

func (ht *HeadTracker) connectToHead() error {
ht.disconnected = make(chan struct{})
ht.headers = make(chan models.BlockHeader)
sub, err := ht.store.TxManager.SubscribeToNewHeads(ht.headers)
if err != nil {
return err
}
ht.headSubscription = sub
ht.connectedWg.Add(2)
go ht.listenToSubscriptionErrors()
go ht.listenToNewHeads()
ht.connectedWg.Wait()
ht.disconnectedWg.Add(2)
ht.connected = true
ht.connect(ht.Head())
ht.connectionRequestListenerWg.Add(1)
go ht.connectionRequestListener()
ht.connectionRequest <- struct{}{}
<-ht.connectionSucceeded
ht.started = true
return nil
}

Expand All @@ -112,23 +104,10 @@ func (ht *HeadTracker) Stop() error {
}

close(ht.done)
err := ht.disconnectFromHead()
ht.connectionRequestListenerWg.Wait()
close(ht.connectionRequest)
close(ht.connectionSucceeded)
ht.started = false
return err
}

func (ht *HeadTracker) disconnectFromHead() error {
if !ht.connected {
return nil
}

ht.headSubscription.Unsubscribe()
close(ht.disconnected)
ht.disconnectedWg.Wait()

close(ht.headers)
ht.connected = false
ht.disconnect()
return nil
}

Expand Down Expand Up @@ -206,14 +185,47 @@ func (ht *HeadTracker) onNewHead(head *models.BlockHeader) {
}
}

func (ht *HeadTracker) connectToHead() error {
ht.disconnected = make(chan struct{})
ht.headers = make(chan models.BlockHeader)
sub, err := ht.store.TxManager.SubscribeToNewHeads(ht.headers)
if err != nil {
return err
}
ht.headSubscription = sub
ht.connectedWg.Add(2)
go ht.listenToSubscriptionErrors()
go ht.listenToNewHeads()
ht.connectedWg.Wait()
ht.disconnectedWg.Add(2)
ht.connected = true
ht.connect(ht.Head())
return nil
}

func (ht *HeadTracker) disconnectFromHead() error {
if !ht.connected {
return nil
}

ht.headSubscription.Unsubscribe()
close(ht.disconnected)
ht.disconnectedWg.Wait()

close(ht.headers)
ht.connected = false
ht.disconnect()
return nil
}

func (ht *HeadTracker) listenToSubscriptionErrors() {
ht.connectedWg.Done()
select {
case err := <-ht.headSubscription.Err():
ht.disconnectedWg.Done()
if err != nil {
logger.Errorw("Error in new head subscription, disconnected", "err", err)
ht.reconnectLoop()
ht.connectionRequest <- struct{}{}
}
case <-ht.disconnected:
ht.disconnectedWg.Done()
Expand Down Expand Up @@ -260,21 +272,42 @@ func (ht *HeadTracker) listenToNewHeads() {
}
}

func (ht *HeadTracker) reconnectLoop() {
func (ht *HeadTracker) connectionRequestListener() {
defer ht.connectionRequestListenerWg.Done()
for {
select {
case <-ht.done:
ht.disconnectFromHead()
return
case <-ht.connectionRequest:
if !ht.reconnectionPoll() {
return
}
}
}
}

// reconnectionPoll periodically attempts to connect to an ethereum node. It returns true
// on success, and false if cut short by a done request and did not connect.
func (ht *HeadTracker) reconnectionPoll() bool {
ht.sleeper.Reset()
for {
ht.disconnectFromHead()
logger.Info("Reconnecting to node ", ht.store.Config.EthereumURL, " in ", ht.sleeper.Duration())
logger.Info("Connecting to node ", ht.store.Config.EthereumURL, " in ", ht.sleeper.Duration())
select {
case <-ht.done:
return
return false
case <-time.After(ht.sleeper.After()):
err := ht.connectToHead()
if err != nil {
logger.Errorw(fmt.Sprintf("Error reconnecting to %v", ht.store.Config.EthereumURL), "err", err)
logger.Errorw(fmt.Sprintf("Error connecting to %v", ht.store.Config.EthereumURL), "err", err)
} else {
logger.Info("Reconnected to node ", ht.store.Config.EthereumURL)
return
logger.Info("Connected to node ", ht.store.Config.EthereumURL)
select {
case ht.connectionSucceeded <- struct{}{}:
default:
}
return true
}
}
}
Expand Down

0 comments on commit a41c36b

Please sign in to comment.