Skip to content

Commit

Permalink
Reorg subscriptions (dominant-strategies#140)
Browse files Browse the repository at this point in the history
* Reorg subscription done

* made ReORgRollup a core type

* fetching the headers done

* sending the data in reorg channel

* ReOrgRollBack implementation

* ReOrgRollBack implementation

* reorg rollback is left

* reorg data process in manager

* reorg rollback done

* reorg rollback done

* reorg rollback done

* reorg rollback done

* reorg rollback done

* reorg rollback done

Co-authored-by: shreekarashastry <[email protected]>
  • Loading branch information
shreekarashastry and shreekarashastry authored Jan 23, 2022
1 parent 82beaf1 commit 4689a86
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 9 deletions.
5 changes: 4 additions & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync"
"time"

"github.com/spruce-solutions/go-quai"
ethereum "github.com/spruce-solutions/go-quai"
"github.com/spruce-solutions/go-quai/accounts/abi"
"github.com/spruce-solutions/go-quai/accounts/abi/bind"
"github.com/spruce-solutions/go-quai/common"
Expand Down Expand Up @@ -875,6 +875,9 @@ func (fb *filterBackend) SubscribePendingBlockEvent(ch chan<- *types.Header) eve
return nullSubscription()
}

func (fb *filterBackend) SubscribeReOrgEvent(ch chan<- core.ReOrgRollup) event.Subscription {
return nullSubscription()
}
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }

func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
Expand Down
125 changes: 125 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type BlockChain struct {
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
reOrgFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
Expand Down Expand Up @@ -1619,6 +1620,110 @@ func (bc *BlockChain) AddExternalBlock(block *types.ExternalBlock) error {
return nil
}

// ReOrgRollBack compares the difficulty of the newchain and oldchain. Rolls back
// the current header to the position where the reorg took place in a higher context
func (bc *BlockChain) ReOrgRollBack(header *types.Header) error {
var (
deletedTxs types.Transactions
deletedLogs [][]*types.Log

// collectLogs collects the logs that were generated or removed during
// the processing of the block that corresponds with the given hash.
// These logs are later announced as deleted or reborn
collectLogs = func(hash common.Hash) {
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return
}
receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)

var logs []*types.Log
for _, receipt := range receipts {
for _, log := range receipt.Logs {
l := *log
l.Removed = true
logs = append(logs, &l)
}
}
if len(logs) > 0 {
deletedLogs = append(deletedLogs, logs)
}
}
// mergeLogs returns a merged log slice with specified sort order.
mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log {
var ret []*types.Log
if reverse {
for i := len(logs) - 1; i >= 0; i-- {
ret = append(ret, logs[i]...)
}
} else {
for i := 0; i < len(logs); i++ {
ret = append(ret, logs[i]...)
}
}
return ret
}
)

if header != nil {
// get the commonBlock
commonBlock := bc.GetBlockByHash(header.Hash())
// if a block with this hash does not exist then we dont have to roll back
if commonBlock == nil {
return nil
}
// get the current head in this chain
currentBlock := bc.CurrentBlock()

for {
if currentBlock.NumberU64() == commonBlock.NumberU64()-1 {
break
}
deletedTxs = append(deletedTxs, currentBlock.Transactions()...)
collectLogs(currentBlock.Hash())

currentBlock = bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64())
if currentBlock == nil {
return fmt.Errorf("invalid current chain")
}
}
// set the head back to the block before the rollback point
if err := bc.SetHead(commonBlock.NumberU64() - 1); err != nil {
return err
}

fmt.Println("Header is now rolled back and the current head is at", bc.CurrentBlock().NumberU64())

// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch()
for _, tx := range deletedTxs {
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
}

// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(bc.db, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(indexesBatch, i)
}
if err := indexesBatch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}

// send the deleted logs to the removed logs feed
if len(deletedLogs) > 0 {
bc.rmLogsFeed.Send(RemovedLogsEvent{mergeLogs(deletedLogs, true)})
}
} else {
return fmt.Errorf("reorg header was null")
}
return nil
}

// InsertChain attempts to insert the given batch of blocks in to the canonical
// chain or, otherwise, create a fork. If an error is returned it will return
// the index number of the failing block as well an error describing what went
Expand Down Expand Up @@ -2109,6 +2214,17 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
return 0, nil
}

func (bc *BlockChain) getAllHeaders(blocks []*types.Block) []*types.Header {
// Initialize the headers array
var headers []*types.Header

// Find all the headers since genesis
for i := 0; i < len(blocks); i++ {
headers = append(headers, blocks[i].Header())
}
return headers
}

// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
Expand Down Expand Up @@ -2167,6 +2283,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return ret
}
)

// Reduce the longer chain to the same number as the shorter one
if oldBlock.NumberU64() > newBlock.NumberU64() {
// Old chain is longer, gather all transactions and logs as deleted ones
Expand All @@ -2193,6 +2310,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// If the common ancestor was found, bail out
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock

// Once the common block is found, the reorg data is sent to the reOrg feed
bc.reOrgFeed.Send(ReOrgRollup{ReOrgHeader: commonBlock.Header(), OldChainHeaders: bc.getAllHeaders(oldChain), NewChainHeaders: bc.getAllHeaders(newChain)})
break
}
// Remove an old block as well as stash away a new block
Expand Down Expand Up @@ -2633,6 +2753,11 @@ func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscripti
return bc.scope.Track(bc.chainFeed.Subscribe(ch))
}

// SubscribeReOrgEvent registers a subscription of ReOrgEvent.
func (bc *BlockChain) SubscribeReOrgEvent(ch chan<- ReOrgRollup) event.Subscription {
return bc.scope.Track(bc.reOrgFeed.Subscribe(ch))
}

// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
Expand Down
5 changes: 5 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type ChainEvent struct {
Logs []*types.Log
}

type ReOrgRollup struct {
ReOrgHeader *types.Header
OldChainHeaders []*types.Header
NewChainHeaders []*types.Header
}
type ChainSideEvent struct {
Block *types.Block
}
Expand Down
10 changes: 9 additions & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"
"math/big"

"github.com/spruce-solutions/go-quai"
ethereum "github.com/spruce-solutions/go-quai"
"github.com/spruce-solutions/go-quai/accounts"
"github.com/spruce-solutions/go-quai/common"
"github.com/spruce-solutions/go-quai/consensus"
Expand Down Expand Up @@ -124,6 +124,10 @@ func (b *EthAPIBackend) AddExternalBlock(block *types.ExternalBlock) error {
return b.eth.blockchain.AddExternalBlock(block)
}

func (b *EthAPIBackend) ReOrgRollBack(header *types.Header) error {
return b.eth.blockchain.ReOrgRollBack(header)
}

func (b *EthAPIBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
return b.eth.blockchain.GetBlockByHash(hash), nil
}
Expand Down Expand Up @@ -235,6 +239,10 @@ func (b *EthAPIBackend) SubscribePendingBlockEvent(ch chan<- *types.Header) even
return b.eth.miner.SubscribePendingBlock(ch)
}

func (b *EthAPIBackend) SubscribeReOrgEvent(ch chan<- core.ReOrgRollup) event.Subscription {
return b.eth.BlockChain().SubscribeReOrgEvent(ch)
}

func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainEvent(ch)
}
Expand Down
33 changes: 32 additions & 1 deletion eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"sync"
"time"

"github.com/spruce-solutions/go-quai"
ethereum "github.com/spruce-solutions/go-quai"
"github.com/spruce-solutions/go-quai/common"
"github.com/spruce-solutions/go-quai/common/hexutil"
"github.com/spruce-solutions/go-quai/core"
"github.com/spruce-solutions/go-quai/core/types"
"github.com/spruce-solutions/go-quai/ethdb"
"github.com/spruce-solutions/go-quai/event"
Expand Down Expand Up @@ -270,6 +271,36 @@ func (api *PublicFilterAPI) PendingBlock(ctx context.Context) (*rpc.Subscription
return rpcSub, nil
}

// reOrg sends a notification each time a new pending block is created.
func (api *PublicFilterAPI) ReOrg(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
reOrg := make(chan core.ReOrgRollup)
reOrgSub := api.events.SubscribeReOrg(reOrg)

for {
select {
case b := <-reOrg:
notifier.Notify(rpcSub.ID, b)
case <-rpcSub.Err():
reOrgSub.Unsubscribe()
return
case <-notifier.Closed():
reOrgSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down
1 change: 1 addition & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Backend interface {
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingBlockEvent(ch chan<- *types.Header) event.Subscription
SubscribeReOrgEvent(ch chan<- core.ReOrgRollup) event.Subscription

BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
Expand Down
Loading

0 comments on commit 4689a86

Please sign in to comment.