Skip to content

Commit

Permalink
Removed the pending header subscription from filter systems as it is …
Browse files Browse the repository at this point in the history
…not needed
  • Loading branch information
gameofpointers committed Aug 21, 2023
1 parent 8d610a5 commit 409f056
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 58 deletions.
2 changes: 1 addition & 1 deletion eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (api *PublicFilterAPI) PendingHeader(ctx context.Context) (*rpc.Subscriptio

go func() {
header := make(chan *types.Header, c_pendingHeaderChSize)
headerSub := api.events.SubscribePendingHeader(header)
headerSub := api.backend.SubscribePendingHeaderEvent(header)

for {
select {
Expand Down
79 changes: 22 additions & 57 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ const (
LogsSubscription
// PendingLogsSubscription queries for logs in pending blocks
PendingLogsSubscription
// PendingHeaderSubscription queries for pending headers.
PendingHeaderSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending
Expand All @@ -68,8 +66,6 @@ const (
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
// pendingHeaderChSize is the size of channel listening to PendingHeaderEvent.
pendingHeaderChSize = 20
)

type subscription struct {
Expand All @@ -93,22 +89,20 @@ type EventSystem struct {
lastHead *types.Header

// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
pendingHeaderSub event.Subscription // Subscription for pending header event
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event

// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
pendingHeaderCh chan *types.Header // Channel to receive new pending header event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -119,16 +113,15 @@ type EventSystem struct {
// or by stopping the given mux.
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
pendingHeaderCh: make(chan *types.Header, pendingHeaderChSize),
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}

nodeCtx := common.NodeLocation.Context()
Expand All @@ -140,11 +133,10 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
}
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.pendingHeaderSub = m.backend.SubscribePendingHeaderEvent(m.pendingHeaderCh)

// Make sure none of the subscriptions are empty
if nodeCtx == common.ZONE_CTX {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.pendingHeaderSub == nil {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Fatal("Subscribe for event system failed")
}
} else {
Expand Down Expand Up @@ -324,22 +316,6 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
return es.subscribe(sub)
}

// SubscribePendingHeader creates a subscription that writes pending header that are created in the miner.
func (es *EventSystem) SubscribePendingHeader(header chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingHeaderSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
header: header,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

type filterIndex map[Type]map[rpc.ID]*subscription

func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
Expand Down Expand Up @@ -477,12 +453,6 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
return nil
}

func (es *EventSystem) handlePendingHeader(filters filterIndex, ev *types.Header) {
for _, f := range filters[PendingHeaderSubscription] {
f.header <- ev
}
}

// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
nodeCtx := common.NodeLocation.Context()
Expand All @@ -495,7 +465,6 @@ func (es *EventSystem) eventLoop() {
es.pendingLogsSub.Unsubscribe()
}
es.chainSub.Unsubscribe()
es.pendingHeaderSub.Unsubscribe()
}()

index := make(filterIndex)
Expand All @@ -511,8 +480,6 @@ func (es *EventSystem) eventLoop() {
select {
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)
case ev := <-es.pendingHeaderCh:
es.handlePendingHeader(index, ev)

case f := <-es.install:
if f.typ != MinedAndPendingLogsSubscription {
Expand All @@ -528,8 +495,6 @@ func (es *EventSystem) eventLoop() {

case <-es.chainSub.Err():
return
case <-es.pendingHeaderSub.Err():
return
}
}
}
Expand Down

0 comments on commit 409f056

Please sign in to comment.