Skip to content

Commit

Permalink
Make feed subscriptions optionally blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
wizeguyy committed May 17, 2023
1 parent 48cf1e1 commit a2b5099
Show file tree
Hide file tree
Showing 22 changed files with 93 additions and 76 deletions.
8 changes: 4 additions & 4 deletions core/bodydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,23 @@ func (bc *BodyDb) Config() *params.ChainConfig { return bc.chainConfig }

// SubscribeChainEvent registers a subscription of ChainEvent.
func (bc *BodyDb) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
return bc.scope.Track(bc.chainFeed.Subscribe(ch))
return bc.scope.Track(bc.chainFeed.Subscribe(ch, true))
}

// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
func (bc *BodyDb) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch, true))
}

// SubscribeLogsEvent registers a subscription of []*types.Log.
func (bc *BodyDb) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return bc.scope.Track(bc.logsFeed.Subscribe(ch))
return bc.scope.Track(bc.logsFeed.Subscribe(ch, true))
}

// SubscribeBlockProcessingEvent registers a subscription of bool where true means
// block processing has started while false means it has stopped.
func (bc *BodyDb) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch, true))
}

func (bc *BodyDb) HasBlockAndState(hash common.Hash, number uint64) bool {
Expand Down
4 changes: 2 additions & 2 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ChainIndexerChain interface {
CurrentHeader() *types.Header

// SubscribeChainHeadEvent subscribes to new head header notifications.
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent, blocking bool) event.Subscription
}

// ChainIndexer does a post-processing job for equally sized sections of the
Expand Down Expand Up @@ -123,7 +123,7 @@ func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend Cha
// are notified about new events by their parents.
func (c *ChainIndexer) Start(chain ChainIndexerChain) {
events := make(chan ChainHeadEvent, 10)
sub := chain.SubscribeChainHeadEvent(events)
sub := chain.SubscribeChainHeadEvent(events, true)

go c.eventLoop(chain.CurrentHeader(), events, sub)
}
Expand Down
8 changes: 4 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ func (c *Core) Genesis() *types.Block {
}

// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
func (c *Core) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
return c.sl.hc.SubscribeChainHeadEvent(ch)
func (c *Core) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent, blocking bool) event.Subscription {
return c.sl.hc.SubscribeChainHeadEvent(ch, blocking)
}

// GetBody retrieves a block body (transactions and uncles) from the database by
Expand Down Expand Up @@ -488,7 +488,7 @@ func (c *Core) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
return c.sl.hc.bc.SubscribeChainEvent(ch)
}

// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
// SubscribeRemovedLogsEvent registers a subscription of ChainHeadEvent.
func (c *Core) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return c.sl.hc.bc.SubscribeRemovedLogsEvent(ch)
}
Expand Down Expand Up @@ -607,7 +607,7 @@ func (c *Core) SetEtherbase(addr common.Address) {
// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (c *Core) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return c.sl.miner.worker.pendingLogsFeed.Subscribe(ch)
return c.sl.miner.worker.pendingLogsFeed.Subscribe(ch, true)
}

// SubscribePendingBlock starts delivering the pending block to the given channel.
Expand Down
10 changes: 5 additions & 5 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,21 +876,21 @@ func (hc *HeaderChain) Engine() consensus.Engine {
}

// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
func (hc *HeaderChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
return hc.scope.Track(hc.chainHeadFeed.Subscribe(ch))
func (hc *HeaderChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent, blocking bool) event.Subscription {
return hc.scope.Track(hc.chainHeadFeed.Subscribe(ch, blocking))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (hc *HeaderChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return hc.scope.Track(hc.chainSideFeed.Subscribe(ch))
return hc.scope.Track(hc.chainSideFeed.Subscribe(ch, true))
}

func (hc *HeaderChain) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription {
return hc.scope.Track(hc.missingPendingEtxsFeed.Subscribe(ch))
return hc.scope.Track(hc.missingPendingEtxsFeed.Subscribe(ch, true))
}

func (hc *HeaderChain) SubscribeMissingPendingEtxsRollupEvent(ch chan<- common.Hash) event.Subscription {
return hc.scope.Track(hc.missingPendingEtxsRollupFeed.Subscribe(ch))
return hc.scope.Track(hc.missingPendingEtxsRollupFeed.Subscribe(ch, true))
}

func (hc *HeaderChain) StateAt(root common.Hash) (*state.StateDB, error) {
Expand Down
4 changes: 2 additions & 2 deletions core/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ func (miner *Miner) DisablePreseal() {
// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return miner.worker.pendingLogsFeed.Subscribe(ch)
return miner.worker.pendingLogsFeed.Subscribe(ch, true)
}

// SubscribePendingBlock starts delivering the pending block to the given channel.
func (miner *Miner) SubscribePendingHeader(ch chan<- *types.Header) event.Subscription {
return miner.worker.pendingHeaderFeed.Subscribe(ch)
return miner.worker.pendingHeaderFeed.Subscribe(ch, true)
}
8 changes: 4 additions & 4 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (sl *Slice) GetPendingBlockBody(header *types.Header) *types.Body {
}

func (sl *Slice) SubscribeMissingParentEvent(ch chan<- common.Hash) event.Subscription {
return sl.scope.Track(sl.missingParentFeed.Subscribe(ch))
return sl.scope.Track(sl.missingParentFeed.Subscribe(ch, true))
}

// MakeDomClient creates the quaiclient for the given domurl
Expand Down Expand Up @@ -831,15 +831,15 @@ func (sl *Slice) TxPool() *TxPool { return sl.txPool }
func (sl *Slice) Miner() *Miner { return sl.miner }

func (sl *Slice) SubscribeMissingBody(ch chan<- *types.Header) event.Subscription {
return sl.scope.Track(sl.missingBodyFeed.Subscribe(ch))
return sl.scope.Track(sl.missingBodyFeed.Subscribe(ch, true))
}

func (sl *Slice) SubscribePendingEtxs(ch chan<- types.PendingEtxs) event.Subscription {
return sl.scope.Track(sl.pendingEtxsFeed.Subscribe(ch))
return sl.scope.Track(sl.pendingEtxsFeed.Subscribe(ch, true))
}

func (sl *Slice) SubscribePendingEtxsRollup(ch chan<- types.PendingEtxsRollup) event.Subscription {
return sl.scope.Track(sl.pendingEtxsRollupFeed.Subscribe(ch))
return sl.scope.Track(sl.pendingEtxsRollupFeed.Subscribe(ch, true))
}

func (sl *Slice) CurrentInfo(header *types.Header) bool {
Expand Down
6 changes: 3 additions & 3 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type blockChain interface {
GetBlockFromCacheOrDb(hash common.Hash, number uint64) *types.Block
StateAt(root common.Hash) (*state.StateDB, error)

SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent, blocking bool) event.Subscription
}

// TxPoolConfig are the configuration parameters of the transaction pool.
Expand Down Expand Up @@ -314,7 +314,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
}

// Subscribe events from blockchain and start the main event loop.
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh, false)
pool.wg.Add(1)
go pool.loop()

Expand Down Expand Up @@ -414,7 +414,7 @@ func (pool *TxPool) Stop() {
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
return pool.scope.Track(pool.txFeed.Subscribe(ch, true))
}

// GasPrice returns the current gas price enforced by the transaction pool.
Expand Down
4 changes: 2 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ func (b *QuaiAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Su
return b.eth.Core().SubscribeChainEvent(ch)
}

func (b *QuaiAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
return b.eth.Core().SubscribeChainHeadEvent(ch)
func (b *QuaiAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent, blocking bool) event.Subscription {
return b.eth.Core().SubscribeChainHeadEvent(ch, blocking)
}

func (b *QuaiAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
Expand Down
2 changes: 1 addition & 1 deletion eth/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (e ethEntry) ENRKey() string {
// startEthEntryUpdate starts the ENR updater loop.
func (eth *Ethereum) startEthEntryUpdate(ln *enode.LocalNode) {
var newHead = make(chan core.ChainHeadEvent, 10)
sub := eth.core.SubscribeChainHeadEvent(newHead)
sub := eth.core.SubscribeChainHeadEvent(newHead, true)

go func() {
defer sub.Unsubscribe()
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,12 @@ func newPeerSet() *peerSet {

// SubscribeNewPeers subscribes to peer arrival events.
func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
return ps.newPeerFeed.Subscribe(ch)
return ps.newPeerFeed.Subscribe(ch, true)
}

// SubscribePeerDrops subscribes to peer departure events.
func (ps *peerSet) SubscribePeerDrops(ch chan<- *peerConnection) event.Subscription {
return ps.peerDropFeed.Subscribe(ch)
return ps.peerDropFeed.Subscribe(ch, true)
}

// Reset iterates over the current peer set, and resets each of the known peers
Expand Down
10 changes: 5 additions & 5 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,23 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types
}

func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.txFeed.Subscribe(ch)
return b.txFeed.Subscribe(ch, true)
}

func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return b.rmLogsFeed.Subscribe(ch)
return b.rmLogsFeed.Subscribe(ch, true)
}

func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.logsFeed.Subscribe(ch)
return b.logsFeed.Subscribe(ch, true)
}

func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.pendingLogsFeed.Subscribe(ch)
return b.pendingLogsFeed.Subscribe(ch, true)
}

func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.chainFeed.Subscribe(ch)
return b.chainFeed.Subscribe(ch, true)
}

func (b *testBackend) BloomStatus() (uint64, uint64) {
Expand Down
6 changes: 3 additions & 3 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,11 @@ func testSendTransactions(t *testing.T, protocol uint) {
backend := new(testEthHandler)

anns := make(chan []common.Hash)
annSub := backend.txAnnounces.Subscribe(anns)
annSub := backend.txAnnounces.Subscribe(anns, true)
defer annSub.Unsubscribe()

bcasts := make(chan []*types.Transaction)
bcastSub := backend.txBroadcasts.Subscribe(bcasts)
bcastSub := backend.txBroadcasts.Subscribe(bcasts, true)
defer bcastSub.Unsubscribe()

go eth.Handle(backend, sink)
Expand Down Expand Up @@ -702,7 +702,7 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
backend := new(testEthHandler)

blocks := make(chan *types.Block, 1)
sub := backend.blockBroadcasts.Subscribe(blocks)
sub := backend.blockBroadcasts.Subscribe(blocks, true)
defer sub.Unsubscribe()

go eth.Handle(backend, sink)
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package eth

import (
sync "github.com/sasha-s/go-deadlock"
"math/big"
"sort"
sync "github.com/sasha-s/go-deadlock"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/consensus/blake3pow"
Expand Down Expand Up @@ -109,7 +109,7 @@ func (p *testTxPool) Pending(enforceTips bool) (map[common.Address]types.Transac
// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
return p.txFeed.Subscribe(ch, true)
}

// testHandler is a live implementation of the Ethereum protocol handler, just
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (e enrEntry) ENRKey() string {
// head events and updates the requested node record whenever a fork is passed.
func StartENRUpdater(chain *core.Core, ln *enode.LocalNode) {
var newHead = make(chan core.ChainHeadEvent, 10)
sub := chain.SubscribeChainHeadEvent(newHead)
sub := chain.SubscribeChainHeadEvent(newHead, true)

go func() {
defer sub.Unsubscribe()
Expand Down
2 changes: 1 addition & 1 deletion event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSubCloseUnsub(t *testing.T) {
// the point of this test is **not** to panic
var mux TypeMux
mux.Stop()
sub := mux.Subscribe(0)
sub := mux.Subscribe(0, true)
sub.Unsubscribe()
}

Expand Down
2 changes: 1 addition & 1 deletion event/example_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func ExampleFeed_acknowledgedEvents() {
defer close(done)
for i := 0; i < 3; i++ {
ch := make(chan ackedEvent, 100)
sub := feed.Subscribe(ch)
sub := feed.Subscribe(ch, true)
go func() {
defer sub.Unsubscribe()
for {
Expand Down
4 changes: 2 additions & 2 deletions event/example_scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (s *App) Calc(op byte, a, b int) int {
func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
switch op {
case '/':
return s.scope.Track(s.divServer.results.Subscribe(ch))
return s.scope.Track(s.divServer.results.Subscribe(ch, true))
case '*':
return s.scope.Track(s.mulServer.results.Subscribe(ch))
return s.scope.Track(s.mulServer.results.Subscribe(ch, true))
default:
panic("invalid op")
}
Expand Down
Loading

0 comments on commit a2b5099

Please sign in to comment.