Skip to content

Commit

Permalink
Removed the Receipts and NodeData fetch and validation in the downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Aug 2, 2023
1 parent d299ee2 commit 4a5c570
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 702 deletions.
94 changes: 26 additions & 68 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ var (
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxSkeletonWindow = 1024 // Amount of blocks to be fetched for a skeleton assembly.
MaxSkeletonSize = 1024 // Number of header fetches to need for a skeleton assembly
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request

PrimeSkeletonDist = 8
Expand All @@ -68,7 +67,6 @@ var (
errInvalidAncestor = errors.New("retrieved ancestor is invalid")
errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid")
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errBadBlockFound = errors.New("peer sent a bad block")
errCanceled = errors.New("syncing canceled (requested)")
Expand Down Expand Up @@ -105,12 +103,10 @@ type Downloader struct {
committed int32

// Channels
headerCh chan dataPack // Channel receiving inbound block headers
bodyCh chan dataPack // Channel receiving inbound block bodies
receiptCh chan dataPack // Channel receiving inbound receipts
bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // Channel to signal the receipt fetcher of new tasks
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks
headerCh chan dataPack // Channel receiving inbound block headers
bodyCh chan dataPack // Channel receiving inbound block bodies
bodyWakeCh chan bool // Channel to signal the block body fetcher of new tasks
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks

// Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
Expand All @@ -122,10 +118,9 @@ type Downloader struct {
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

// Core encapsulates functions required to sync a full core.
Expand Down Expand Up @@ -168,23 +163,20 @@ type Core interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mux *event.TypeMux, core Core, dropPeer peerDropFn) *Downloader {
func New(mux *event.TypeMux, core Core, dropPeer peerDropFn) *Downloader {
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
core: core,
headNumber: core.CurrentBlock().NumberU64(),
headEntropy: core.CurrentLogEntropy(),
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 10),
quitCh: make(chan struct{}),
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
core: core,
headNumber: core.CurrentBlock().NumberU64(),
headEntropy: core.CurrentLogEntropy(),
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 10),
quitCh: make(chan struct{}),
}

return dl
Expand Down Expand Up @@ -308,13 +300,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, entropy *big.Int,
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
d.peers.Reset()

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh} {
select {
case <-ch:
default:
}
}
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
for _, ch := range []chan dataPack{d.headerCh, d.bodyCh} {
for empty := false; !empty; {
select {
case <-ch:
Expand Down Expand Up @@ -414,7 +406,6 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, entropy *
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin) },
func() error { return d.processFullSyncContent(peerHeight) },
}
Expand Down Expand Up @@ -531,8 +522,6 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, err error
return nil, errTimeout

case <-d.bodyCh:
case <-d.receiptCh:
// Out of bounds delivery, ignore
}
}
}
Expand Down Expand Up @@ -741,7 +730,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
d.dropPeer(p.id)

// Finish the sync gracefully instead of dumping the gathered data though
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
Expand Down Expand Up @@ -820,32 +809,6 @@ func (d *Downloader) fetchBodies(from uint64) error {
return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
log.Debug("Downloading transaction receipts", "origin", from)

var (
deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
}
expire = func() map[string]int { return d.queue.ExpireReceipts(d.peers.rates.TargetTimeout()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.peers.rates.TargetRoundTrip()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetReceiptsIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

log.Debug("Transaction receipt download terminated", "err", err)
return err
}

// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
Expand Down Expand Up @@ -1074,7 +1037,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
// Terminate header processing if we synced up
if len(headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
Expand Down Expand Up @@ -1115,7 +1078,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
for d.queue.PendingBlocks() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
Expand All @@ -1141,7 +1104,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
d.syncStatsLock.Unlock()

// Signal the content downloaders of the availablility of new tasks
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
for _, ch := range []chan bool{d.bodyWakeCh} {
select {
case ch <- true:
default:
Expand Down Expand Up @@ -1215,11 +1178,6 @@ func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transactio
return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles, extTransactions, manifests}, bodyInMeter, bodyDropMeter)
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}

// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
Expand Down
Loading

0 comments on commit 4a5c570

Please sign in to comment.