Skip to content

Commit

Permalink
Worker only every runs one async process to generate pending header
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed May 23, 2023
1 parent fb93e0c commit 1f80b10
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 53 deletions.
69 changes: 48 additions & 21 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
c_startingPrintLimit = 10
c_regionRelayProc = 3
c_primeRelayProc = 10
c_asyncPhUpdateChanSize = 10
)

type Slice struct {
Expand All @@ -50,12 +51,16 @@ type Slice struct {
domUrl string
subClients []*quaiclient.Client

wg sync.WaitGroup
scope event.SubscriptionScope
missingBodyFeed event.Feed
pendingEtxsFeed event.Feed
pendingEtxsRollupFeed event.Feed
missingParentFeed event.Feed

asyncPhCh chan *types.Header
asyncPhSub event.Subscription

phCachemu sync.RWMutex

bestPhKey common.Hash
Expand Down Expand Up @@ -106,6 +111,10 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, isLocal
return nil, err
}

if nodeCtx == common.ZONE_CTX {
go sl.asyncPendingHeaderLoop()
}

return sl, nil
}

Expand Down Expand Up @@ -263,31 +272,13 @@ func (sl *Slice) relayPh(block *types.Block, appendTime *time.Duration, reorg bo
nodeCtx := common.NodeLocation.Context()

if nodeCtx == common.ZONE_CTX {

// Send an empty header to miner
bestPh, exists := sl.readPhCache(sl.bestPhKey)
if exists {
bestPh.Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header)
return
}

// Only if reorg is true invoke the worker to update the state root
if reorg {
localPendingHeader, err := sl.miner.worker.GeneratePendingHeader(block, true)
if err != nil {
return
} else {
pendingHeaderWithTermini.Header = sl.combinePendingHeader(localPendingHeader, pendingHeaderWithTermini.Header, nodeCtx, true)
sl.updatePhCache(pendingHeaderWithTermini, true)
}
bestPh, exists = sl.readPhCache(sl.bestPhKey)
if exists {
bestPh.Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header)
return
}
}

} else if !domOrigin && reorg {
for i := range sl.subClients {
if sl.subClients[i] != nil {
Expand All @@ -297,6 +288,41 @@ func (sl *Slice) relayPh(block *types.Block, appendTime *time.Duration, reorg bo
}
}

// asyncPendingHeaderLoop waits for the pendingheader updates from the worker and updates the phCache
func (sl *Slice) asyncPendingHeaderLoop() {
nodeCtx := common.NodeLocation.Context()

// Subscribe to the AsyncPh updates from the worker
sl.asyncPhCh = make(chan *types.Header, c_asyncPhUpdateChanSize)
sl.asyncPhSub = sl.miner.worker.SubscribeAsyncPendingHeader(sl.asyncPhCh)

for {
select {
case asyncPh := <-sl.asyncPhCh:
// Read the termini for the given header
termini := sl.hc.GetTerminiByHash(asyncPh.ParentHash())
pendingHeaderWithTermini, exists := sl.readPhCache(termini[c_terminusIndex])
if exists {
localPendingHeader := asyncPh
pendingHeaderWithTermini.Header = sl.combinePendingHeader(localPendingHeader, pendingHeaderWithTermini.Header, nodeCtx, true)
sl.updatePhCache(pendingHeaderWithTermini, true)

bestPh, exists := sl.readPhCache(sl.bestPhKey)
if exists {
bestPh.Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header)
}
}

case <-sl.asyncPhSub.Err():
return

case <-sl.quit:
return
}
}
}

// Read the phCache
func (sl *Slice) readPhCache(hash common.Hash) (types.PendingHeader, bool) {
sl.phCachemu.RLock()
Expand All @@ -318,7 +344,7 @@ func (sl *Slice) writePhCache(hash common.Hash, pendingHeader types.PendingHeade
// Generate a slice pending header
func (sl *Slice) generateSlicePendingHeader(block *types.Block, newTermini []common.Hash, domPendingHeader *types.Header, domOrigin bool, fill bool) (types.PendingHeader, error) {
// Upate the local pending header
localPendingHeader, err := sl.miner.worker.GeneratePendingHeader(block, fill)
localPendingHeader, err := sl.miner.worker.GeneratePendingHeader(block, true)
if err != nil {
return types.PendingHeader{}, err
}
Expand Down Expand Up @@ -765,7 +791,7 @@ func (sl *Slice) NewGenesisPendingHeader(domPendingHeader *types.Header) {
nodeCtx := common.NodeLocation.Context()
genesisHash := sl.config.GenesisHash
// Upate the local pending header
localPendingHeader, err := sl.miner.worker.GeneratePendingHeader(sl.hc.GetBlockByHash(genesisHash), true)
localPendingHeader, err := sl.miner.worker.GeneratePendingHeader(sl.hc.GetBlockByHash(genesisHash), false)
if err != nil {
return
}
Expand Down Expand Up @@ -851,6 +877,7 @@ func (sl *Slice) Stop() {

sl.hc.Stop()
if nodeCtx == common.ZONE_CTX {
sl.asyncPhSub.Unsubscribe()
sl.txPool.Stop()
}
sl.miner.Stop()
Expand Down
111 changes: 79 additions & 32 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ type worker struct {
pendingHeaderFeed event.Feed

// Subscriptions
txsCh chan NewTxsEvent
txsSub event.Subscription
chainHeadCh chan ChainHeadEvent
txsCh chan NewTxsEvent
txsSub event.Subscription
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription

// Channels
taskCh chan *task
Expand All @@ -188,6 +189,10 @@ type worker struct {
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust

interrupt chan struct{}
asyncPhFeed event.Feed // asyncPhFeed sends an event after each state root update
scope event.SubscriptionScope

wg sync.WaitGroup

current *environment // An environment for current running cycle.
Expand Down Expand Up @@ -242,6 +247,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
interrupt: make(chan struct{}),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}
Expand All @@ -250,6 +256,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
phBodyCache, _ := lru.New(pendingBlockBodyLimit)
worker.pendingBlockBody = phBodyCache

worker.chainHeadSub = worker.hc.SubscribeChainHeadEvent(worker.chainHeadCh)
if nodeCtx == common.ZONE_CTX {
// Subscribe NewTxsEvent for tx pool
worker.txsSub = txPool.SubscribeNewTxsEvent(worker.txsCh)
Expand All @@ -265,6 +272,9 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
worker.wg.Add(1)
go worker.mainLoop()

worker.wg.Add(1)
go worker.asyncStateLoop()

return worker
}

Expand Down Expand Up @@ -338,6 +348,7 @@ func (w *worker) start() {

// stop sets the running status as 0.
func (w *worker) stop() {
w.chainHeadSub.Unsubscribe()
atomic.StoreInt32(&w.running, 0)
}

Expand All @@ -351,6 +362,7 @@ func (w *worker) isRunning() bool {
func (w *worker) close() {
atomic.StoreInt32(&w.running, 0)
close(w.exitCh)
w.scope.Close()
w.wg.Wait()
}

Expand Down Expand Up @@ -383,34 +395,56 @@ func (w *worker) StorePendingBlockBody() {
rawdb.WritePbBodyKeys(w.workerDb, pendingBlockBodyKeys)
}

// asyncStateLoop updates the state root for a block and returns the state udpate in a channel
func (w *worker) asyncStateLoop() {
defer w.wg.Done() // decrement the wait group after the close of the loop

for {
select {
case head := <-w.chainHeadCh:

w.interruptAsyncPhGen()

go func() {
select {
case <-w.interrupt:
w.interrupt = make(chan struct{})
return
default:
block := head.Block
header, err := w.GeneratePendingHeader(block, true)
if err != nil {
log.Error("Error generating pending header with state", "err", err)
return
}
// Send the updated pendingHeader in the asyncPhFeed
w.asyncPhFeed.Send(header)
return
}
}()
case <-w.exitCh:
return
case <-w.chainHeadSub.Err():
return
}
}
}

// GeneratePendingBlock generates pending block given a commited block.
func (w *worker) GeneratePendingHeader(block *types.Block, fill bool) (*types.Header, error) {
nodeCtx := common.NodeLocation.Context()

// Sanitize recommit interval if the user-specified one is too short.
recommit := w.config.Recommit
if recommit < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}
w.interruptAsyncPhGen()

var (
interrupt *int32
timestamp int64 // timestamp for each round of sealing.
)

timer := time.NewTimer(0)
defer timer.Stop()
<-timer.C // discard the initial tick

timestamp = time.Now().Unix()
if interrupt != nil {
atomic.StoreInt32(interrupt, commitInterruptNewHead)
}
interrupt = new(int32)

// reset the timer and update the newTx to zero.
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)

start := time.Now()
Expand Down Expand Up @@ -438,8 +472,6 @@ func (w *worker) GeneratePendingHeader(block *types.Block, fill bool) (*types.He
}
}

env := work.copy()

// Swap out the old work with the new one, terminating any leftover
// prefetcher processes in the mean time and starting a new one.
if w.current != nil {
Expand All @@ -449,29 +481,40 @@ func (w *worker) GeneratePendingHeader(block *types.Block, fill bool) (*types.He

// Create a local environment copy, avoid the data race with snapshot state.
// https://github.com/ethereum/go-ethereum/issues/24299
block, err = w.FinalizeAssembleAndBroadcast(w.hc, env.header, block, env.state, env.txs, env.unclelist(), env.etxs, env.subManifest, env.receipts)
block, err = w.FinalizeAssemble(w.hc, work.header, block, work.state, work.txs, work.unclelist(), work.etxs, work.subManifest, work.receipts)
if err != nil {
return nil, err
}
env.header = block.Header()
work.header = block.Header()
w.printPendingHeaderInfo(work, block, start)
w.updateSnapshot(work)

env.uncleMu.RLock()
return work.header, nil
}

// printPendingHeaderInfo logs the pending header information
func (w *worker) printPendingHeaderInfo(work *environment, block *types.Block, start time.Time) {
work.uncleMu.RLock()
if w.CurrentInfo(block.Header()) {
log.Info("Commit new sealing work", "number", block.Number(), "sealhash", block.Header().SealHash(),
"uncles", len(env.uncles), "txs", env.tcount, "etxs", len(block.ExtTransactions()),
"gas", block.GasUsed(), "fees", totalFees(block, env.receipts),
"uncles", len(work.uncles), "txs", work.tcount, "etxs", len(block.ExtTransactions()),
"gas", block.GasUsed(), "fees", totalFees(block, work.receipts),
"elapsed", common.PrettyDuration(time.Since(start)))
} else {
log.Debug("Commit new sealing work", "number", block.Number(), "sealhash", block.Header().SealHash(),
"uncles", len(env.uncles), "txs", env.tcount, "etxs", len(block.ExtTransactions()),
"gas", block.GasUsed(), "fees", totalFees(block, env.receipts),
"uncles", len(work.uncles), "txs", work.tcount, "etxs", len(block.ExtTransactions()),
"gas", block.GasUsed(), "fees", totalFees(block, work.receipts),
"elapsed", common.PrettyDuration(time.Since(start)))
}
env.uncleMu.RUnlock()

w.updateSnapshot(env)
work.uncleMu.RUnlock()
}

return w.snapshotBlock.Header(), nil
// interruptAsyncPhGen kills any async ph generation running
func (w *worker) interruptAsyncPhGen() {
if w.interrupt != nil {
close(w.interrupt)
w.interrupt = nil
}
}

// mainLoop is responsible for generating and submitting sealing work based on
Expand Down Expand Up @@ -911,7 +954,7 @@ func (w *worker) adjustGasLimit(interrupt *int32, env *environment, parent *type
env.header.SetGasLimit(CalcGasLimit(parent.GasLimit(), gasUsed))
}

func (w *worker) FinalizeAssembleAndBroadcast(chain consensus.ChainHeaderReader, header *types.Header, parent *types.Block, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, etxs []*types.Transaction, subManifest types.BlockManifest, receipts []*types.Receipt) (*types.Block, error) {
func (w *worker) FinalizeAssemble(chain consensus.ChainHeaderReader, header *types.Header, parent *types.Block, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, etxs []*types.Transaction, subManifest types.BlockManifest, receipts []*types.Receipt) (*types.Block, error) {
nodeCtx := common.NodeLocation.Context()
block, err := w.engine.FinalizeAndAssemble(chain, header, state, txs, uncles, etxs, subManifest, receipts)
if err != nil {
Expand Down Expand Up @@ -969,7 +1012,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
// https://github.com/ethereum/go-ethereum/issues/24299
env := env.copy()
parent := w.hc.GetBlock(env.header.ParentHash(), env.header.NumberU64()-1)
block, err := w.FinalizeAssembleAndBroadcast(w.hc, env.header, parent, env.state, env.txs, env.unclelist(), env.etxs, env.subManifest, env.receipts)
block, err := w.FinalizeAssemble(w.hc, env.header, parent, env.state, env.txs, env.unclelist(), env.etxs, env.subManifest, env.receipts)
if err != nil {
return err
}
Expand Down Expand Up @@ -1020,6 +1063,10 @@ func (w *worker) GetPendingBlockBody(header *types.Header) *types.Body {
return nil
}

func (w *worker) SubscribeAsyncPendingHeader(ch chan *types.Header) event.Subscription {
return w.scope.Track(w.asyncPhFeed.Subscribe(ch))
}

// copyReceipts makes a deep copy of the given receipts.
func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
result := make([]*types.Receipt, len(receipts))
Expand Down

0 comments on commit 1f80b10

Please sign in to comment.