Skip to content

Commit

Permalink
Created the AppendQueue with HashAndNumber of Header
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Mar 13, 2023
1 parent 13a2592 commit c64a456
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 48 deletions.
107 changes: 59 additions & 48 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/common/timedcache"
"github.com/dominant-strategies/go-quai/consensus"
"github.com/dominant-strategies/go-quai/core/rawdb"
"github.com/dominant-strategies/go-quai/core/state"
Expand All @@ -20,20 +19,20 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/params"
"github.com/dominant-strategies/go-quai/rlp"
lru "github.com/hashicorp/golang-lru"
)

const (
maxFutureHeaders = 1800 // Maximum number of future headers we can store in cache
maxFutureTime = 30 // Max time into the future (in seconds) we will accept a block
futureHeaderTtl = 300 // Time (in seconds) a header is allowed to live in the futureHeader cache
futureHeaderRetryPeriod = 3 // Time (in seconds) before retrying to append future headers
c_maxAppendQueue = 1000000 // Maximum number of future headers we can store in cache
c_maxFutureTime = 30 // Max time into the future (in seconds) we will accept a block
c_appendQueueRetryPeriod = 1 // Time (in seconds) before retrying to append from AppendQueue
)

type Core struct {
sl *Slice
engine consensus.Engine

futureHeaders *timedcache.TimedCache
appendQueue *lru.Cache

quit chan struct{} // core quit channel
}
Expand All @@ -50,10 +49,10 @@ func NewCore(db ethdb.Database, config *Config, isLocalBlock func(block *types.H
quit: make(chan struct{}),
}

futureHeaders, _ := timedcache.New(maxFutureHeaders, futureHeaderTtl)
c.futureHeaders = futureHeaders
appendQueue, _ := lru.New(c_maxAppendQueue)
c.appendQueue = appendQueue

go c.updateFutureHeaders()
go c.updateAppendQueue()
return c, nil
}

Expand Down Expand Up @@ -81,7 +80,7 @@ func (c *Core) InsertChain(blocks types.Blocks) (int, error) {
log.Error("failed to send ETXs to domclient", "block: ", block.Hash(), "err", err)
}
}
c.removeFutureHeader(block.Header())
c.removeFromAppendQueue(block)
} else if err.Error() == consensus.ErrFutureBlock.Error() ||
err.Error() == ErrBodyNotFound.Error() ||
err.Error() == ErrPendingEtxNotFound.Error() ||
Expand All @@ -94,61 +93,71 @@ func (c *Core) InsertChain(blocks types.Blocks) (int, error) {
} else if err.Error() != ErrKnownBlock.Error() {
log.Info("Append failed.", "hash", block.Hash(), "err", err)
}
c.removeFutureHeader(block.Header())
c.removeFromAppendQueue(block)
}
}
return len(blocks), nil
}

// procfutureHeaders sorts the future block cache and attempts to append
func (c *Core) procfutureHeaders() {
// Reconstruct future headers into future blocks list to append
blocks := make([]*types.Block, 0, c.futureHeaders.Len())
for _, hash := range c.futureHeaders.Keys() {
if header, exist := c.futureHeaders.Peek(hash); exist {
header := header.(*types.Header)
if block, err := c.sl.ConstructLocalBlock(header); err == nil {
blocks = append(blocks, block)
} else if err.Error() == ErrBodyNotFound.Error() {
c.sl.missingBodyFeed.Send(header.Hash())
} else if err.Error() == consensus.ErrUnknownAncestor.Error() {
c.sl.missingParentFeed.Send(header.ParentHash())
} else {
log.Debug("could not construct block from future header", "err", err)
c.removeFutureHeader(header)
}
// procAppendQueue sorts the append queue and attempts to append
func (c *Core) procAppendQueue() {
// Sort the blocks by number and attempt to insert them
var hashNumberList []types.HashAndNumber
for _, hash := range c.appendQueue.Keys() {
if value, exist := c.appendQueue.Peek(hash); exist {
hashNumber := types.HashAndNumber{Hash: hash.(common.Hash), Number: value.(uint64)}
hashNumberList = append(hashNumberList, hashNumber)
}
}
// Sort the blocks by number and attempt to insert them
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].NumberU64() < blocks[j].NumberU64()
sort.Slice(hashNumberList, func(i, j int) bool {
return hashNumberList[i].Number < hashNumberList[j].Number
})
c.InsertChain(blocks)

// Attempt to service the sorted list
for _, hashAndNumber := range hashNumberList {
block := c.GetBlockOrCandidateByHash(hashAndNumber.Hash)
if block != nil {
c.serviceFutureBlock(block)
} else {
log.Warn("Entry in the FH cache without being in the db: ", hashAndNumber.Hash)
}
}
}

// addfutureHeader adds a header to the future header cache
func (c *Core) addfutureHeader(header *types.Header) error {
max := uint64(time.Now().Unix() + maxFutureTime)
if header.Time() > max {
return fmt.Errorf("future block timestamp %v > allowed %v", header.Time(), max)
func (c *Core) serviceFutureBlock(block *types.Block) {
parentBlock := c.GetBlockByHash(block.ParentHash())
if parentBlock != nil {
c.InsertChain([]*types.Block{block})
} else {
if !c.HasHeader(block.ParentHash(), block.NumberU64()-1) {
c.sl.missingParentFeed.Send(block.ParentHash())
}
}
c.futureHeaders.ContainsOrAdd(header.Hash(), header)
}

// addToAppendQueue adds a block to the append queue
func (c *Core) addToAppendQueue(block *types.Block) error {
max := uint64(time.Now().Unix() + c_maxFutureTime)
if block.Time() > max {
return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max)
}
c.appendQueue.ContainsOrAdd(block.Hash(), block.NumberU64())
return nil
}

// removeFutureHeader removes a header from the future header cache
func (c *Core) removeFutureHeader(header *types.Header) {
c.futureHeaders.Remove(header.Hash())
// removeFromAppendQueue removes a block from the append queue
func (c *Core) removeFromAppendQueue(block *types.Block) {
c.appendQueue.Remove(block.Hash())
}

// updatefutureHeaders is a time to procfutureHeaders
func (c *Core) updateFutureHeaders() {
futureTimer := time.NewTicker(futureHeaderRetryPeriod * time.Second)
// updateAppendQueue is a time to procAppendQueue
func (c *Core) updateAppendQueue() {
futureTimer := time.NewTicker(c_appendQueueRetryPeriod * time.Second)
defer futureTimer.Stop()
for {
select {
case <-futureTimer.C:
c.procfutureHeaders()
c.procAppendQueue()
case <-c.quit:
return
}
Expand Down Expand Up @@ -198,6 +207,11 @@ func (c *Core) Stop() {

// WriteBlock write the block to the bodydb database
func (c *Core) WriteBlock(block *types.Block) {
isDomCoincident := c.sl.engine.IsDomCoincident(block.Header())
// Only add non dom blocks to the append queue
if !isDomCoincident {
c.addToAppendQueue(block)
}
c.sl.WriteBlock(block)
}

Expand All @@ -211,9 +225,6 @@ func (c *Core) Append(header *types.Header, domPendingHeader *types.Header, domT
c.sl.missingParentFeed.Send(header.ParentHash())
}
}
// If dom tries to append the block and sub is not in sync.
// proc the future header cache.
go c.procfutureHeaders()
return newPendingEtxs, err
}

Expand Down
5 changes: 5 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,3 +923,8 @@ func (m BlockManifest) EncodeIndex(i int, w *bytes.Buffer) {
func (m BlockManifest) Size() common.StorageSize {
return common.StorageSize(m.Len() * common.HashLength)
}

type HashAndNumber struct {
Hash common.Hash
Number uint64
}

0 comments on commit c64a456

Please sign in to comment.