Skip to content

Commit

Permalink
Moved composition of new worker states into updatePhCache, added a mu…
Browse files Browse the repository at this point in the history
…tex aroudn updatePhCache, and logical branch to allow root updates
  • Loading branch information
kiltsonfire authored and gameofpointers committed Jun 14, 2023
1 parent 7e965f4 commit 79a8d0e
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Slice struct {
phCache *lru.Cache

validator Validator // Block and state validator interface
phCacheMu sync.RWMutex
}

func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLookupLimit *uint64, isLocalBlock func(block *types.Header) bool, chainConfig *params.ChainConfig, domClientUrl string, subClientUrls []string, engine consensus.Engine, cacheConfig *CacheConfig, vmConfig vm.Config, genesis *Genesis) (*Slice, error) {
Expand Down Expand Up @@ -238,7 +239,7 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do

oldBestPhEntropy := new(big.Int).Set(bestPh.Header.CalcPhS())

sl.updatePhCache(pendingHeaderWithTermini, true)
sl.updatePhCache(pendingHeaderWithTermini, true, nil)

if nodeCtx == common.ZONE_CTX {
subReorg = sl.pickPhHead(pendingHeaderWithTermini, oldBestPhEntropy)
Expand Down Expand Up @@ -290,7 +291,6 @@ func (sl *Slice) relayPh(block *types.Block, appendTime *time.Duration, reorg bo

// asyncPendingHeaderLoop waits for the pendingheader updates from the worker and updates the phCache
func (sl *Slice) asyncPendingHeaderLoop() {
nodeCtx := common.NodeLocation.Context()

// Subscribe to the AsyncPh updates from the worker
sl.asyncPhCh = make(chan *types.Header, c_asyncPhUpdateChanSize)
Expand All @@ -299,21 +299,13 @@ func (sl *Slice) asyncPendingHeaderLoop() {
for {
select {
case asyncPh := <-sl.asyncPhCh:
// Read the termini for the given header
termini := sl.hc.GetTerminiByHash(asyncPh.ParentHash())
pendingHeaderWithTermini, exists := sl.readPhCache(termini[c_terminusIndex])
sl.updatePhCache(types.PendingHeader{}, true, asyncPh)

bestPh, exists := sl.readPhCache(sl.bestPhKey)
if exists {
localPendingHeader := asyncPh
pendingHeaderWithTermini.Header = sl.combinePendingHeader(localPendingHeader, pendingHeaderWithTermini.Header, nodeCtx, true)
sl.updatePhCache(pendingHeaderWithTermini, true)

bestPh, exists := sl.readPhCache(sl.bestPhKey)
if exists {
bestPh.Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header)
}
bestPh.Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header)
}

case <-sl.asyncPhSub.Err():
return

Expand Down Expand Up @@ -577,7 +569,7 @@ func (sl *Slice) updatePhCacheFromDom(pendingHeader types.PendingHeader, termini
}

oldBestPhEntropy := new(big.Int).Set(bestPh.Header.CalcPhS())
sl.updatePhCache(types.PendingHeader{Header: combinedPendingHeader, Termini: localPendingHeader.Termini}, false)
sl.updatePhCache(types.PendingHeader{Header: combinedPendingHeader, Termini: localPendingHeader.Termini}, false, nil)
sl.pickPhHead(types.PendingHeader{Header: combinedPendingHeader, Termini: localPendingHeader.Termini}, oldBestPhEntropy)
return nil
}
Expand All @@ -586,18 +578,33 @@ func (sl *Slice) updatePhCacheFromDom(pendingHeader types.PendingHeader, termini
}

// updatePhCache updates cache given a pendingHeaderWithTermini with the terminus used as the key.
func (sl *Slice) updatePhCache(pendingHeaderWithTermini types.PendingHeader, inSlice bool) {
func (sl *Slice) updatePhCache(pendingHeaderWithTermini types.PendingHeader, inSlice bool, localHeader *types.Header) {
sl.phCacheMu.Lock()
defer sl.phCacheMu.Unlock()

var exists bool
if localHeader != nil {
termini := sl.hc.GetTerminiByHash(localHeader.ParentHash())
pendingHeaderWithTermini, exists = sl.readPhCache(termini[c_terminusIndex])
if exists {
pendingHeaderWithTermini.Header = sl.combinePendingHeader(localHeader, pendingHeaderWithTermini.Header, common.ZONE_CTX, true)
}
}

// Update the pendingHeader Cache
oldPh, exist := sl.readPhCache(pendingHeaderWithTermini.Termini[c_terminusIndex])
var deepCopyPendingHeaderWithTermini types.PendingHeader
newPhEntropy := pendingHeaderWithTermini.Header.CalcPhS()
deepCopyPendingHeaderWithTermini = types.PendingHeader{Header: types.CopyHeader(pendingHeaderWithTermini.Header), Termini: pendingHeaderWithTermini.Termini}
deepCopyPendingHeaderWithTermini.Header.SetLocation(common.NodeLocation)
deepCopyPendingHeaderWithTermini.Header.SetTime(uint64(time.Now().Unix()))

if exist {
// If we are inslice we will only update the cache if the entropy is better
// Simultaneously we have to allow for the state root update
// asynchronously, to do this equal check is added to the inSlice case
if (!inSlice && newPhEntropy.Cmp(oldPh.Header.CalcPhS()) >= 0) ||
(inSlice && pendingHeaderWithTermini.Header.ParentEntropy().Cmp(oldPh.Header.ParentEntropy()) > 0) {
(inSlice && pendingHeaderWithTermini.Header.ParentEntropy().Cmp(oldPh.Header.ParentEntropy()) >= 0) {
sl.writePhCache(pendingHeaderWithTermini.Termini[c_terminusIndex], deepCopyPendingHeaderWithTermini)
log.Info("PhCache update:", "inSlice:", inSlice, "Ph Number:", deepCopyPendingHeaderWithTermini.Header.NumberArray(), "Termini:", deepCopyPendingHeaderWithTermini.Termini[c_terminusIndex])
}
Expand Down

0 comments on commit 79a8d0e

Please sign in to comment.