Skip to content

Commit

Permalink
Cleaned block fetcher and added writes to DB
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Mar 13, 2023
1 parent fc2fdf0 commit 7054949
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 130 deletions.
129 changes: 13 additions & 116 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ var (
blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/in", nil)
blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/block/broadcasts/out", nil)
blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/drop", nil)
blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/block/broadcasts/dos", nil)

headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/headers", nil)
bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/block/bodies", nil)
Expand All @@ -67,12 +66,12 @@ var (

var errTerminated = errors.New("terminated")

// HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
type HeaderRetrievalFn func(common.Hash) *types.Header

// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block

// blockWriteFn is a callback type for retrieving a block from the local chain.
type blockWriteFn func(*types.Block)

// headerRequesterFn is a callback type for sending a header retrieval request.
type headerRequesterFn func(common.Hash) error

Expand All @@ -88,12 +87,6 @@ type blockBroadcasterFn func(block *types.Block, propagate bool)
// chainHeightFn is a callback type to retrieve the current chain height.
type chainHeightFn func() uint64

// headersInsertFn is a callback type to insert a batch of headers into the local chain.
type headersInsertFn func(headers []*types.Header) (int, error)

// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)

// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

Expand Down Expand Up @@ -156,8 +149,6 @@ func (inject *blockOrHeaderInject) hash() common.Hash {
// BlockFetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
type BlockFetcher struct {
light bool // The indicator whether it's a light fetcher or normal one.

// Various event channels
notify chan *blockAnnounce
inject chan *blockOrHeaderInject
Expand All @@ -177,17 +168,14 @@ type BlockFetcher struct {

// Block cache
queue *prque.Prque // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
writeBlock blockWriteFn // Writes the block to the DB
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
chainHeight chainHeightFn // Retrieves the current chain's height
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving

// Testing hooks
Expand All @@ -199,9 +187,8 @@ type BlockFetcher struct {
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(getBlock blockRetrievalFn, writeBlock blockWriteFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, dropPeer peerDropFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
Expand All @@ -214,15 +201,12 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
writeBlock: writeBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
}
Expand Down Expand Up @@ -349,26 +333,19 @@ func (f *BlockFetcher) loop() {
f.forgetHash(hash)
}
}
// Import any queued blocks that could potentially fit
height := f.chainHeight()
// Import any queued blocks
for !f.queue.Empty() {
op := f.queue.PopItem().(*blockOrHeaderInject)
hash := op.hash()
if f.queueChangeHook != nil {
f.queueChangeHook(hash, false)
}
// If too high up the chain or phase, continue later
number := op.number()
// Otherwise if fresh and still unknown, try and import
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
if f.getBlock(hash) != nil {
f.forgetBlock(hash)
continue
}
if f.light {
f.importHeaders(op.origin, op.header)
} else {
f.importBlocks(op.origin, op.block)
}
f.importBlocks(op.origin, op.block)
}
// Wait for an outside event to occur
select {
Expand Down Expand Up @@ -414,11 +391,6 @@ func (f *BlockFetcher) loop() {
// A direct block insertion was requested, try and fill any pending gaps
blockBroadcastInMeter.Mark(1)

// Now only direct block injection is allowed, drop the header injection
// here silently if we receive.
if f.light {
continue
}
f.enqueue(op.origin, nil, op.block)

case hash := <-f.done:
Expand All @@ -434,16 +406,13 @@ func (f *BlockFetcher) loop() {
// In current LES protocol(les2/les3), only header announce is
// available, no need to wait too much time for header broadcast.
timeout := arriveTimeout - gatherSlack
if f.light {
timeout = 0
}
if time.Since(announces[0].time) > timeout {
// Pick a random peer to retrieve from, reset all others
announce := announces[rand.Intn(len(announces))]
f.forgetHash(hash)

// If the block still didn't arrive, queue for fetching
if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) {
if f.getBlock(hash) == nil {
request[announce.origin] = append(request[announce.origin], hash)
f.fetching[hash] = announce
}
Expand Down Expand Up @@ -524,16 +493,6 @@ func (f *BlockFetcher) loop() {
f.forgetHash(hash)
continue
}
// Collect all headers only if we are running in light
// mode and the headers are not imported by other means.
if f.light {
if f.getHeader(hash) == nil {
announce.header = header
lightHeaders = append(lightHeaders, announce)
}
f.forgetHash(hash)
continue
}
// Only keep if not imported by other means
if f.getBlock(hash) == nil {
announce.header = header
Expand Down Expand Up @@ -681,12 +640,6 @@ func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
if len(f.announced) == 0 {
return
}
// Schedule announcement retrieval quickly for light mode
// since server won't send any headers to client.
if f.light {
fetch.Reset(lightTimeout)
return
}
// Otherwise find the earliest expiring announcement
earliest := time.Now()
for _, announces := range f.announced {
Expand Down Expand Up @@ -725,21 +678,7 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
} else {
hash, number = block.Hash(), block.NumberU64()
}
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit)
blockBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist)
blockBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}

// Schedule the block for future importing
if _, ok := f.queued[hash]; !ok {
op := &blockOrHeaderInject{origin: peer}
Expand All @@ -748,7 +687,6 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
} else {
op.block = block
}
f.queues[peer] = count
f.queued[hash] = op
f.queue.Push(op, -int64(number))
if f.queueChangeHook != nil {
Expand All @@ -758,39 +696,6 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
}
}

// importHeaders spawns a new goroutine to run a header insertion into the chain.
// If the header's number is at the same height as the current import phase, it
// updates the phase states accordingly.
func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
hash := header.Hash()
log.Debug("Importing propagated header", "peer", peer, "number", header.Number(), "hash", hash)

go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
parent := f.getHeader(header.ParentHash())
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number(), "hash", hash, "parent", header.ParentHash())
return
}
// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number(), "hash", hash, "err", err)
f.dropPeer(peer)
return
}
// Run the actual import and log any issues
if _, err := f.insertHeaders([]*types.Header{header}); err != nil {
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number(), "hash", hash, "err", err)
return
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(header, nil)
}
}()
}

// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
Expand Down Expand Up @@ -821,12 +726,8 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
f.dropPeer(peer)
return
}

// Run the actual import and log any issues
if _, err := f.insertChain(types.Blocks{block}); err != nil {
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}
// TODO: verify the Headers work to be in a certain threshold window
f.writeBlock(block)
// If import succeeded, broadcast the block
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, false)
Expand Down Expand Up @@ -886,10 +787,6 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) {
// state.
func (f *BlockFetcher) forgetBlock(hash common.Hash) {
if insert := f.queued[hash]; insert != nil {
f.queues[insert.origin]--
if f.queues[insert.origin] == 0 {
delete(f.queues, insert.origin)
}
delete(f.queued, hash)
}
}
18 changes: 4 additions & 14 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"math"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/dominant-strategies/go-quai/common"
Expand Down Expand Up @@ -164,20 +163,11 @@ func newHandler(config *handlerConfig) (*handler, error) {
heighter := func() uint64 {
return h.core.CurrentBlock().NumberU64()
}
inserter := func(blocks types.Blocks) (int, error) {
n, err := h.core.InsertChain(blocks)
if err == nil {
atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
}
// If the error is related to synchronization of sub, we can still accept the transactions
// This is a good approximation on reaching the fray
if err != nil && err.Error() == core.ErrPendingBlock.Error() {
atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
return n, nil
}
return n, err
// writeBlock writes the block to the DB
writeBlock := func(block *types.Block) {
h.core.WriteBlock(block)
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.core.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
h.blockFetcher = fetcher.NewBlockFetcher(h.core.GetBlockByHash, writeBlock, validator, h.BroadcastBlock, heighter, h.removePeer)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
Expand Down

0 comments on commit 7054949

Please sign in to comment.