Skip to content

Commit

Permalink
Revert "Revert "bugfix: send the uncles to chainSideFeed for recording""
Browse files Browse the repository at this point in the history
This reverts commit 50fd2e0.
  • Loading branch information
wizeguyy committed May 15, 2023
1 parent 4f3e1ba commit 263e6ea
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
5 changes: 0 additions & 5 deletions core/bodydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ func (bc *BodyDb) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
return bc.scope.Track(bc.chainFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BodyDb) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
}

// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
func (bc *BodyDb) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
Expand Down
10 changes: 5 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,11 @@ func (c *Core) SubscribeMissingPendingEtxsRollupEvent(ch chan<- common.Hash) eve
return c.sl.hc.SubscribeMissingPendingEtxsRollupEvent(ch)
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (c *Core) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return c.sl.hc.SubscribeChainSideEvent(ch)
}

//--------------------//
// BlockChain methods //
//--------------------//
Expand All @@ -483,11 +488,6 @@ func (c *Core) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
return c.sl.hc.bc.SubscribeChainEvent(ch)
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (c *Core) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return c.sl.hc.bc.SubscribeChainSideEvent(ch)
}

// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
func (c *Core) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return c.sl.hc.bc.SubscribeRemovedLogsEvent(ch)
Expand Down
5 changes: 5 additions & 0 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ func (sl *Slice) updatePhCacheFromDom(pendingHeader types.PendingHeader, termini

// writePhCache dom writes a given pendingHeaderWithTermini to the cache with the terminus used as the key.
func (sl *Slice) writeToPhCacheAndPickPhHead(pendingHeaderWithTermini types.PendingHeader, appendTime *time.Duration) bool {
nodeCtx := common.NodeLocation.Context()
bestPh, exist := sl.phCache[sl.bestPhKey]
if !exist {
log.Error("BestPh Key does not exist for", "key", sl.bestPhKey)
Expand Down Expand Up @@ -558,6 +559,10 @@ func (sl *Slice) writeToPhCacheAndPickPhHead(pendingHeaderWithTermini types.Pend
}
log.Debug("Choosing new pending header", "Ph Number:", pendingHeaderWithTermini.Header.NumberArray())
return true
} else {
if nodeCtx == common.ZONE_CTX && newPhEntropy.Cmp(oldBestPhEntropy) != 0 {
sl.hc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
}
return false
}
Expand Down
2 changes: 1 addition & 1 deletion core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, db ethdb.Databas
// Subscribe NewTxsEvent for tx pool
worker.txsSub = txPool.SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
worker.chainSideSub = headerchain.bc.SubscribeChainSideEvent(worker.chainSideCh)
worker.chainSideSub = headerchain.SubscribeChainSideEvent(worker.chainSideCh)
}

// Sanitize recommit interval if the user-specified one is too short.
Expand Down
37 changes: 35 additions & 2 deletions quaistats/quaistats.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (

// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10
chainSideChanSize = 10

// reportInterval is the time interval between two reports.
reportInterval = 15
Expand All @@ -69,6 +70,7 @@ const (
// backend encompasses the bare-minimum functionality needed for quaistats reporting
type backend interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription
CurrentHeader() *types.Header
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
Expand Down Expand Up @@ -100,6 +102,7 @@ type Service struct {
pongCh chan struct{} // Pong notifications are fed into this channel
histCh chan []uint64 // History request block numbers are fed into this channel
headSub event.Subscription
sideSub event.Subscription

tpsLookupCache *lru.Cache
gasLookupCache *lru.Cache
Expand Down Expand Up @@ -212,8 +215,12 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string)
func (s *Service) Start() error {
// Subscribe to chain events to execute updates on
chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
chainSideCh := make(chan core.ChainSideEvent, chainSideChanSize)

s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh)
go s.loop(chainHeadCh)
s.sideSub = s.backend.SubscribeChainSideEvent(chainSideCh)

go s.loop(chainHeadCh, chainSideCh)

log.Info("Stats daemon started")
return nil
Expand All @@ -222,18 +229,20 @@ func (s *Service) Start() error {
// Stop implements node.Lifecycle, terminating the monitoring and reporting daemon.
func (s *Service) Stop() error {
s.headSub.Unsubscribe()
s.sideSub.Unsubscribe()
log.Info("Stats daemon stopped")
return nil
}

// loop keeps trying to connect to the netstats server, reporting chain events
// until termination.
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent) {
func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, chainSideCh chan core.ChainSideEvent) {
nodeCtx := common.NodeLocation.Context()
// Start a goroutine that exhausts the subscriptions to avoid events piling up
var (
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
sideCh = make(chan *types.Block, 1)
)
go func() {
HandleLoop:
Expand All @@ -245,6 +254,12 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent) {
case headCh <- head.Block:
default:
}
// Notify of chain side events, but drop if too frequent
case sideEvent := <-chainSideCh:
select {
case sideCh <- sideEvent.Block:
default:
}
case <-s.headSub.Err():
break HandleLoop
}
Expand Down Expand Up @@ -334,6 +349,10 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent) {
log.Warn("Post-block transaction stats report failed", "err", err)
}
}
case sideEvent := <-sideCh:
if err = s.reportSideBlock(conn, sideEvent); err != nil {
log.Warn("Block stats report failed", "err", err)
}
}
}
fullReport.Stop()
Expand Down Expand Up @@ -704,6 +723,20 @@ func (s *Service) computeTps(block *types.Block) int64 {
return blockTps
}

// reportSideBlock retrieves the current chain side event and reports it to the stats server.
func (s *Service) reportSideBlock(conn *connWrapper, block *types.Block) error {
log.Trace("Sending new side block to quaistats", "number", block.Number(), "hash", block.Hash())

stats := map[string]interface{}{
"id": s.node,
"sideBlock": block,
}
report := map[string][]interface{}{
"emit": {"sideBlock", stats},
}
return conn.WriteJSON(report)
}

// reportBlock retrieves the current chain head and reports it to the stats server.
func (s *Service) reportBlock(conn *connWrapper, block *types.Block) error {
// Gather the block details from the header or block chain
Expand Down

0 comments on commit 263e6ea

Please sign in to comment.