Skip to content

Commit

Permalink
fix: fix tracker/head/head.go, preventing freezing
Browse files Browse the repository at this point in the history
  • Loading branch information
yankeguo committed Dec 28, 2023
1 parent efdbf22 commit 41f45d2
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 38 deletions.
56 changes: 35 additions & 21 deletions tracker/head/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,47 @@ func New(
func (self *TrackerHead) Start() error {
level.Info(self.logger).Log("msg", "starting", "reorgWaitPeriod", self.reorgWaitPeriod)

src, subs := self.waitSubscribe()
defer func() {
if subs != nil {
subs.Unsubscribe()
}
}()
for {
var done bool

go self.listen(src)
func() {
src, subs := self.waitSubscribe()
// according to docs, Unsubscribe() should always be called
defer subs.Unsubscribe()

for {
select {
case <-self.ctx.Done():
// chances are that the context was canceled while waiting for the subscription
if src == nil {
done = true
return
}

ctx, cancel := context.WithCancel(self.ctx)
defer cancel()

// using child context to cancel the listener
go self.listen(ctx, src)

select {
case <-self.ctx.Done():
done = true
case err := <-subs.Err():
level.Error(self.logger).Log("msg", "subscription failed will try to resubscribe", "err", err)
}
}()

// break the loop if the context was canceled
if done {
return nil
case err := <-subs.Err():
level.Error(self.logger).Log("msg", "subscription failed will try to resubscribe", "err", err)
src, subs = self.waitSubscribe()
self.listen(src)
}
}
}

func (self *TrackerHead) listen(src chan *types.Header) {
func (self *TrackerHead) listen(ctx context.Context, src chan *types.Header) {
level.Info(self.logger).Log("msg", "starting new subs listener")

for {
select {
case <-self.ctx.Done():
case <-ctx.Done():
level.Info(self.logger).Log("msg", "subscription listener canceled")
return
case event := <-src:
Expand All @@ -98,7 +112,7 @@ func (self *TrackerHead) listen(src chan *types.Header) {
level.Debug(logger).Log("msg", "new block")
if self.reorgWaitPeriod == 0 {
go func(event *types.Header) {
ctx, cncl := context.WithTimeout(self.ctx, time.Minute)
ctx, cncl := context.WithTimeout(ctx, time.Minute)
defer cncl()
block, err := self.client.BlockByNumber(ctx, event.Number)
if err != nil {
Expand All @@ -107,7 +121,7 @@ func (self *TrackerHead) listen(src chan *types.Header) {
}
select {
case self.dstChan <- block:
case <-self.ctx.Done():
case <-ctx.Done():
return
}
}(event)
Expand All @@ -122,11 +136,11 @@ func (self *TrackerHead) listen(src chan *types.Header) {

select {
case <-waitForReorg.C:
case <-self.ctx.Done():
case <-ctx.Done():
return
}

ctx, cncl := context.WithTimeout(self.ctx, 2*time.Minute)
ctx, cncl := context.WithTimeout(ctx, 2*time.Minute)
defer cncl()

// Duplicate event numbers will still return the same block when using this query.
Expand All @@ -149,7 +163,7 @@ func (self *TrackerHead) listen(src chan *types.Header) {
select {
case self.dstChan <- block:
return
case <-self.ctx.Done():
case <-ctx.Done():
return
}

Expand Down
46 changes: 29 additions & 17 deletions tracker/head/head_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
// Copyright (c) The Cryptorium Authors.
// Licensed under the MIT License.

package head

import (
"context"
"errors"
"math/big"
"sync"
"testing"
"time"

"github.com/cryptoriums/packages/logging"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

type testChainReader struct {
sub *testSubscription
subLocker sync.Locker
sub *testSubscription
}

var _ ethereum.ChainReader = &testChainReader{}
Expand Down Expand Up @@ -50,6 +55,8 @@ func (tcr *testChainReader) TransactionInBlock(ctx context.Context, blockHash co
}

func (tcr *testChainReader) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (sub ethereum.Subscription, err error) {
tcr.subLocker.Lock()
defer tcr.subLocker.Unlock()
tcr.sub = &testSubscription{
ctx: ctx,
chHeaders: ch,
Expand All @@ -59,6 +66,18 @@ func (tcr *testChainReader) SubscribeNewHead(ctx context.Context, ch chan<- *typ
return
}

func (tcr *testChainReader) sendHeader() {
tcr.subLocker.Lock()
defer tcr.subLocker.Unlock()
tcr.sub.chHeaders <- &types.Header{}
}

func (tcr *testChainReader) sendError() {
tcr.subLocker.Lock()
defer tcr.subLocker.Unlock()
tcr.sub.chErrors <- errors.New("random error")
}

type testSubscription struct {
ctx context.Context
chHeaders chan<- *types.Header
Expand All @@ -79,7 +98,9 @@ func TestTrackerHead(t *testing.T) {
defer cancel()

logger := logging.NewLogger()
client := &testChainReader{}
client := &testChainReader{
subLocker: &sync.Mutex{},
}

tracker, blocks, err := New(
ctx,
Expand All @@ -89,36 +110,27 @@ func TestTrackerHead(t *testing.T) {
)
require.NoError(t, err)

_ = blocks

go func() {
err := tracker.Start()
require.NoError(t, err)
}()

go func() {
time.Sleep(time.Second)
sub := client.sub
client.sendHeader()

time.Sleep(time.Second)
sub.chHeaders <- &types.Header{}

sub.chErrors <- errors.New("random error")
client.sendError()

time.Sleep(time.Second)
sub = client.sub
client.sendHeader()

time.Sleep(time.Second)
sub.chHeaders <- &types.Header{}
time.Sleep(time.Second)

sub.chErrors <- errors.New("random error")
client.sendError()

time.Sleep(time.Second)
sub = client.sub
client.sendHeader()

time.Sleep(time.Second)
sub.chHeaders <- &types.Header{}
time.Sleep(time.Second)

cancel()
Expand Down

0 comments on commit 41f45d2

Please sign in to comment.