Skip to content
This repository has been archived by the owner on May 25, 2020. It is now read-only.

Commit

Permalink
[Feat] implement cache for tracking finality of top 900 blocks in can…
Browse files Browse the repository at this point in the history
…onical path
  • Loading branch information
anhntbk08 committed Aug 7, 2019
1 parent 6ffca85 commit 386805d
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/**/*tx_database*
*/**/*dapps*
build/_vendor/pkg

/devnet
#*
.#*
*#
Expand Down
101 changes: 82 additions & 19 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

Expand All @@ -66,6 +66,9 @@ const (

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
BlockChainVersion = 3

// Maximum length of chain to cache by block's number
blocksByNumberCacheLimit = 900
)

// CacheConfig contains the configuration values for the trie caching/pruning
Expand Down Expand Up @@ -144,6 +147,10 @@ type BlockChain struct {
badBlocks *lru.Cache // Bad block cache
IPCEndpoint string
Client *ethclient.Client // Global ipc client instance.

// Blocks hash array by block number
// cache field for tracking finality purpose, can't use for tracking block vs block relationship
blocksByNumberCache *lru.Cache
}

// NewBlockChain returns a fully initialised block chain using information
Expand All @@ -159,28 +166,30 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
blocksByNumberCache, _ := lru.New(blocksByNumberCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)
resultProcess, _ := lru.New(blockCacheLimit)
preparingBlock, _ := lru.New(blockCacheLimit)
downloadingBlock, _ := lru.New(blockCacheLimit)
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
resultProcess: resultProcess,
calculatingBlock: preparingBlock,
downloadingBlock: downloadingBlock,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
resultProcess: resultProcess,
calculatingBlock: preparingBlock,
downloadingBlock: downloadingBlock,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
blocksByNumberCache: blocksByNumberCache,
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
Expand Down Expand Up @@ -300,6 +309,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.bodyRLPCache.Purge()
bc.blockCache.Purge()
bc.futureBlocks.Purge()
bc.blocksByNumberCache.Purge()

// Rewind the block chain, ensuring we don't end up with a stateless head block
if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() {
Expand Down Expand Up @@ -639,6 +649,30 @@ func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*type
return
}

// GetBlocksByNumber get all blocks with same level
// just work with latest blocksByNumberCacheLimit
func (bc *BlockChain) GetBlocksByNumber(number uint) []common.Hash {
cached, ok := bc.blocksByNumberCache.Get(number)
if ok {
return cached.([]common.Hash)
}
return []common.Hash{}
}

// IsSamePathTwoBlocks check if two blocks are same path
// Assume block 1 is ahead block 2 so we need to check parentHash
func (bc *BlockChain) IsSamePathTwoBlocks(bh1 common.Hash, bh2 common.Hash) bool {
bl1 := bc.GetBlockByHash(bh1)
bl2 := bc.GetBlockByHash(bh2)
toBlockLevel := bl2.Number().Uint64()

for bl1.Number().Uint64() > toBlockLevel {
bl1 = bc.GetBlockByHash(bl1.ParentHash())
}

return (bl1.Hash() == bl2.Hash())
}

// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
Expand Down Expand Up @@ -676,7 +710,6 @@ func (bc *BlockChain) Stop() {
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.Disabled {
triedb := bc.stateCache.TrieDB()

for _, offset := range []uint64{0, 1, triesInMemory - 1} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
Expand Down Expand Up @@ -1369,6 +1402,35 @@ func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*Resu
return &ResultProcessBlock{receipts: receipts, logs: logs, state: statedb, proctime: proctime, usedGas: usedGas}, nil
}

/*
Update list blocksByNumberCache
data structure
{
block_number: []common.Hash
}
*/

func (bc *BlockChain) updateblocksByNumberCache(block *types.Block) []common.Hash {
var hashArr []common.Hash
blockNumber := block.Number()
cached, ok := bc.blocksByNumberCache.Get(blockNumber)

if ok {
hashArr := cached.([]common.Hash)
hashArr = append(hashArr, block.Hash())
bc.blocksByNumberCache.Remove(blockNumber)
bc.blocksByNumberCache.Add(blockNumber, hashArr)
return hashArr
}

hashArr = []common.Hash{
block.Hash(),
}
// Cache the found body for next time and return
bc.blocksByNumberCache.Add(blockNumber, hashArr)
return hashArr
}

// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
Expand Down Expand Up @@ -1410,13 +1472,14 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L

// Only count canonical blocks for GC processing time
bc.gcproc += result.proctime

case SideStatTy:
log.Debug("Inserted forked block from fetcher", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
common.PrettyDuration(time.Since(block.ReceivedAt)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))

blockInsertTimer.Update(result.proctime)
events = append(events, ChainSideEvent{block})
default:
bc.updateblocksByNumberCache(block)
}
stats.processed++
stats.usedGas += result.usedGas
Expand Down
14 changes: 11 additions & 3 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ func (b *EthApiBackend) EventMux() *event.TypeMux {
}

func (b *EthApiBackend) AccountManager() *accounts.Manager {
return b.eth.AccountManager()
return b.eth.AccountManager()uint64
}

func (b *EthApiBackend) BloomStatus() (uint64, uint64) {
sections, _, _ := b.eth.bloomIndexer.Sections()
func (b *EthApiBackend) BloomStatuint64s() (uint64, uint64) {
sections, _, _ := b.eth.bloomuint64ndexer.Sections()
return params.BloomBitsBlocks, sections
}

Expand Down Expand Up @@ -260,3 +260,11 @@ func (s *EthApiBackend) GetRewardByHash(hash common.Hash) map[string]interface{}
}
return make(map[string]interface{})
}

func (b *EthApiBackend) GetBlocksByNumber(blockNr rpc.BlockNumber) []common.Hash{} {
return b.eth.blockchain.GetBlocksByNumber(uint64(blockNr))
}

func (b *EthApiBackend) IsSamePathTwoBlocks(bh1 common.Hash, bh2 common.Hash) bool {
return return b.eth.blockchain.IsSamePathTwoBlocks(bh1, bh2)
}
141 changes: 96 additions & 45 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,61 +1055,112 @@ func (s *PublicBlockChainAPI) rpcOutputBlock(b *types.Block, inclTx bool, fullTx
return fields, nil
}

func (s *PublicBlockChainAPI) rpcOutputBlockSigners(b *types.Block, ctx context.Context, masternodes []common.Address) ([]common.Address, error) {
// Get signers for block.
// findNearestSignedBlock finds the nearest checkpoint from input block
// the checkpoint block maybe not have relationship with the input block (in fork case)
func (s *PublicBlockChainAPI) findNearestSignedBlock(b *types.Block, ctx context.Context) (*types.Block, *types.Block) {
if b.Number().Int64() <= 0 {
return nil, nil
}

blockNumber := b.Number().Uint64()
signedBlockNumber := blockNumber + (common.MergeSignRange - (blockNumber % common.MergeSignRange))
latestBlockNumber := s.b.CurrentBlock().Number()

if signedBlockNumber >= latestBlockNumber.Uint64() || !s.b.ChainConfig().IsTIP2019(b.Number()) {
signedBlockNumber = blockNumber
}

// Get block epoc latest.
checkpointNumber := signedBlockNumber - (signedBlockNumber % s.b.ChainConfig().Posv.Epoch)
checkpointBlock, _ := s.b.BlockByNumber(ctx, rpc.BlockNumber(checkpointNumber))

if checkpointBlock != nil {
signedBlock, _ := s.b.BlockByNumber(ctx, rpc.BlockNumber(signedBlockNumber))
return signedBlock, checkpointBlock
}

return nil, nil
}

/*
findFinalityOfBlock return finality of a block
Use blocksByNumberCache for to keep track - refer core/blockchain.go for more detail
From signedBlock's number we go back to number
*/
func (s *PublicBlockChainAPI) findFinalityOfBlock(b *types.Block, ctx context.Context, masternodes []common.Address) (uint, error) {
signedBlock, _ := s.findNearestSignedBlock(b, ctx)
if signedBlock == nil {
return 0, nil
}

signedBlocks := s.b.GetBlocksByNumber(rpc.BlockNumber(signedBlock.Number().Uint64()))
log.Debug("Signed Blocks ", signedBlocks)

// Track down all the way to check if input block same path
isFound := false
for count := 0; count < len(signedBlocks); count++ {
blockHash := signedBlocks[count]
}
}

/*
Extract signers from block
Need checkpointBlock for querrying all masternodes
*/
func (s *PublicBlockChainAPI) getSigners(block *types.Block, checkpointBlock *types.Block, engine *posv.Posv) ([]common.Address, error) {
client, err := s.b.GetIPCClient()
var filterSigners []common.Address
var signers []common.Address
blockNumber := block.Number().Uint64()

masternodes := engine.GetMasternodesFromCheckpointHeader(checkpointBlock.Header(), blockNumber, s.b.ChainConfig().Posv.Epoch)
if s.b.ChainConfig().IsTIPSigning(checkpointBlock.Number()) {
signers, err = GetSignersFromBlocks(s.b, block.NumberU64(), block.Hash(), masternodes)
} else {
signers, err = contracts.GetSignersByExecutingEVM(common.HexToAddress(common.BlockSigners), client, block.Hash())
}
if err != nil {
log.Error("Fail to connect IPC client for block status", "error", err)
return []common.Address{}, err
log.Error("Fail to get signers from block signer SC.", "error", err)
return nil, err
}
validator, _ := engine.RecoverValidator(block.Header())
creator, _ := engine.RecoverSigner(block.Header())
signers = append(signers, validator)
signers = append(signers, creator)

var signers []common.Address
var filterSigners []common.Address
if b.Number().Int64() > 0 {
blockNumber := b.Number().Uint64()
signedBlockNumber := blockNumber + (common.MergeSignRange - (blockNumber % common.MergeSignRange))
latestBlockNumber := s.b.CurrentBlock().Number()
if signedBlockNumber >= latestBlockNumber.Uint64() || !s.b.ChainConfig().IsTIP2019(b.Number()) {
signedBlockNumber = blockNumber
}
if engine, ok := s.b.GetEngine().(*posv.Posv); ok {
// Get block epoc latest.
lastCheckpointNumber := signedBlockNumber - (signedBlockNumber % s.b.ChainConfig().Posv.Epoch)
prevCheckpointBlock, _ := s.b.BlockByNumber(ctx, rpc.BlockNumber(lastCheckpointNumber))
if prevCheckpointBlock != nil {
masternodes := engine.GetMasternodesFromCheckpointHeader(prevCheckpointBlock.Header(), blockNumber, s.b.ChainConfig().Posv.Epoch)
signedBlock, _ := s.b.BlockByNumber(ctx, rpc.BlockNumber(signedBlockNumber))
if s.b.ChainConfig().IsTIPSigning(latestBlockNumber) {
signers, err = GetSignersFromBlocks(s.b, signedBlock.NumberU64(), signedBlock.Hash(), masternodes)
} else {
signers, err = contracts.GetSignersByExecutingEVM(common.HexToAddress(common.BlockSigners), client, signedBlock.Hash())
}
if err != nil {
log.Error("Fail to get signers from block signer SC.", "error", err)
return nil, err
}
validator, _ := engine.RecoverValidator(b.Header())
creator, _ := engine.RecoverSigner(b.Header())
signers = append(signers, validator)
signers = append(signers, creator)
countFinality := 0
for _, masternode := range masternodes {
for _, signer := range signers {
if signer == masternode {
countFinality++
filterSigners = append(filterSigners, masternode)
break
}
}
}
for _, masternode := range masternodes {
for _, signer := range signers {
if signer == masternode {
filterSigners = append(filterSigners, masternode)
break
}
} else {
log.Error("Undefined POSV consensus engine")
}
}
return filterSigners, nil
}

func (s *PublicBlockChainAPI) rpcOutputBlockSigners(b *types.Block, ctx context.Context, masternodes []common.Address) ([]common.Address, error) {
client, err := s.b.GetIPCClient()
if err != nil {
log.Error("Fail to connect IPC client for block status", "error", err)
return []common.Address{}, err
}

engine, ok := s.b.GetEngine().(*posv.Posv)
if !ok {
log.Error("Undefined POSV consensus engine")
return []common.Address{}, nil
}

signedBlock, checkpointBlock := s.findNearestSignedBlock(b, ctx)
if signedBlock == nil {
return []common.Address{}, nil
}

return s.getSigners(signedBlock, checkpointBlock, engine)
}

// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction
type RPCTransaction struct {
BlockHash common.Hash `json:"blockHash"`
Expand Down
2 changes: 2 additions & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Backend interface {
GetIPCClient() (*ethclient.Client, error)
GetEngine() consensus.Engine
GetRewardByHash(hash common.Hash) map[string]interface{}

GetBlocksByNumber(blockNr rpc.BlockNumber) []common.Hash
}

func GetAPIs(apiBackend Backend) []rpc.API {
Expand Down

0 comments on commit 386805d

Please sign in to comment.