Skip to content

Commit

Permalink
Updated the appendqueue logic and added retry counter
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Aug 23, 2023
1 parent 62ac48e commit 713d197
Showing 1 changed file with 75 additions and 37 deletions.
112 changes: 75 additions & 37 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,24 @@ import (
)

const (
c_maxAppendQueue = 1000000 // Maximum number of future headers we can store in cache
c_maxFutureTime = 30 // Max time into the future (in seconds) we will accept a block
c_appendQueueRetryPeriod = 1 // Time (in seconds) before retrying to append from AppendQueue
c_appendQueueThreshold = 1000 // Number of blocks to load from the disk to ram on every proc of append queue
c_processingCache = 10 // Number of block hashes held to prevent multi simultaneous appends on a single block hash
c_maxAppendQueue = 1000000 // Maximum number of future headers we can store in cache
c_maxFutureTime = 30 // Max time into the future (in seconds) we will accept a block
c_appendQueueRetryPeriod = 1 // Time (in seconds) before retrying to append from AppendQueue
c_appendQueueThreshold = 1000 // Number of blocks to load from the disk to ram on every proc of append queue
c_processingCache = 10 // Number of block hashes held to prevent multi simultaneous appends on a single block hash
c_primeRetryThreshold = 900 // Number of times a block is retry to be appended before eviction from append queue in Prime
c_regionRetryThreshold = 300 // Number of times a block is retry to be appended before eviction from append queue in Region
c_zoneRetryThreshold = 100 // Number of times a block is retry to be appended before eviction from append queue in Zone
c_maxFutureBlocks = 15 // Number of blocks ahead of the current block to be put in the hashNumberList
c_appendQueueRetryPriorityThreshold = 5 // If retry counter for a block is less than this number, then its put in the special list that is tried first to be appended
c_appendQueueRemoveThreshold = 10 // Number of blocks behind the block should be from the current header to be eligble for removal from the append queue
)

type blockNumberAndRetryCounter struct {
number uint64
retry uint64
}

type Core struct {
sl *Slice
engine consensus.Engine
Expand Down Expand Up @@ -127,56 +138,81 @@ func (c *Core) InsertChain(blocks types.Blocks) (int, error) {

// procAppendQueue sorts the append queue and attempts to append
func (c *Core) procAppendQueue() {
// Sort the blocks by number and attempt to insert them
// Sort the blocks by number and retry attempts and try to insert them
// blocks will be aged out of the append queue after the retry threhsold
var hashNumberList []types.HashAndNumber
var hashNumberPriorityList []types.HashAndNumber
for _, hash := range c.appendQueue.Keys() {
if value, exist := c.appendQueue.Peek(hash); exist {
hashNumber := types.HashAndNumber{Hash: hash.(common.Hash), Number: value.(uint64)}
hashNumberList = append(hashNumberList, hashNumber)
hashNumber := types.HashAndNumber{Hash: hash.(common.Hash), Number: value.(blockNumberAndRetryCounter).number}
if value.(blockNumberAndRetryCounter).retry < c_appendQueueRetryPriorityThreshold {
hashNumberPriorityList = append(hashNumberPriorityList, hashNumber)
}
if hashNumber.Number < c.CurrentHeader().NumberU64()+c_maxFutureBlocks {
hashNumberList = append(hashNumberList, hashNumber)
}
}
}

c.serviceBlocks(hashNumberPriorityList)
if len(hashNumberPriorityList) > 0 {
log.Info("Size of hashNumberPriorityList", "len", len(hashNumberPriorityList), "first entry", hashNumberPriorityList[0].Number, "last entry", hashNumberPriorityList[len(hashNumberPriorityList)-1].Number)
}
c.serviceBlocks(hashNumberList)
if len(hashNumberList) > 0 {
log.Info("Size of hashNumberList", "len", len(hashNumberList), "first entry", hashNumberList[0].Number, "last entry", hashNumberList[len(hashNumberList)-1].Number)
}
}

func (c *Core) serviceBlocks(hashNumberList []types.HashAndNumber) {
sort.Slice(hashNumberList, func(i, j int) bool {
return hashNumberList[i].Number < hashNumberList[j].Number
})

// Only take c_appendQueueThreshold latest blocks out of the database because we know that the
// append will be interrupted once we reach the dom block, so no need to get
// all the blocks in the appendQueue and load it to the RAM
var threshold int
if len(hashNumberList) > c_appendQueueThreshold {
threshold = c_appendQueueThreshold
} else {
threshold = len(hashNumberList)
var retryThreshold uint64
switch common.NodeLocation.Context() {
case common.PRIME_CTX:
retryThreshold = c_primeRetryThreshold
case common.REGION_CTX:
retryThreshold = c_regionRetryThreshold
case common.ZONE_CTX:
retryThreshold = c_zoneRetryThreshold
}

// Attempt to service the sorted list
for _, hashAndNumber := range hashNumberList[:threshold] {
block := c.GetBlockOrCandidateByHash(hashAndNumber.Hash)
if block != nil {
c.serviceFutureBlock(block)
for _, hashAndNumber := range hashNumberList {
header := c.GetHeaderOrCandidateByHash(hashAndNumber.Hash)
if header != nil {
var numberAndRetryCounter blockNumberAndRetryCounter
if value, exist := c.appendQueue.Peek(header.Hash()); exist {
numberAndRetryCounter = value.(blockNumberAndRetryCounter)
numberAndRetryCounter.retry += 1
if numberAndRetryCounter.retry > retryThreshold && numberAndRetryCounter.number+c_appendQueueRemoveThreshold < c.CurrentHeader().NumberU64() {
c.appendQueue.Remove(header.Hash())
} else {
c.appendQueue.Add(header.Hash(), numberAndRetryCounter)
}
}
parentHeader := c.GetHeader(header.ParentHash(), header.NumberU64()-1)
if parentHeader != nil {
// Using a empty block to append here because append only takes in a
// header and we read the block inside append again, so to save the
// ram, we are using the header
c.InsertChain([]*types.Block{types.NewBlockWithHeader(header)})
} else {
if !c.HasHeader(header.ParentHash(), header.NumberU64()-1) {
c.sl.missingParentFeed.Send(header.ParentHash())
}
}
} else {
log.Warn("Entry in the FH cache without being in the db: ", "Hash: ", hashAndNumber.Hash)
}
}
}

func (c *Core) serviceFutureBlock(block *types.Block) {
parentBlock := c.GetBlock(block.ParentHash(), block.NumberU64()-1)
if parentBlock != nil {
c.InsertChain([]*types.Block{block})
} else {
if !c.HasHeader(block.ParentHash(), block.NumberU64()-1) {
c.sl.missingParentFeed.Send(block.ParentHash())
}
}
}

// addToAppendQueue adds a block to the append queue
func (c *Core) addToAppendQueue(block *types.Block) error {
max := uint64(time.Now().Unix() + c_maxFutureTime)
if block.Time() > max {
return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max)
}
c.appendQueue.ContainsOrAdd(block.Hash(), block.NumberU64())
c.appendQueue.ContainsOrAdd(block.Hash(), blockNumberAndRetryCounter{block.NumberU64(), 0})
return nil
}

Expand Down Expand Up @@ -269,7 +305,7 @@ func (c *Core) WriteBlock(block *types.Block) {
if c.sl.IsBlockHashABadHash(block.Hash()) {
return
}
if c.GetBlockByHash(block.Hash()) == nil {
if c.GetHeaderByHash(block.Hash()) == nil {
// Only add non dom blocks to the append queue
_, order, err := c.CalcOrder(block.Header())
if err != nil {
Expand All @@ -278,6 +314,8 @@ func (c *Core) WriteBlock(block *types.Block) {
if order == common.NodeLocation.Context() {
c.addToAppendQueue(block)
}
}
if c.GetHeaderOrCandidateByHash(block.Hash()) == nil {
c.sl.WriteBlock(block)
}
}
Expand Down

0 comments on commit 713d197

Please sign in to comment.