Skip to content

Commit

Permalink
core, eth/downloader: commit block data using batches (ethereum#15115)
Browse files Browse the repository at this point in the history
* ethdb: add Putter interface and Has method

* ethdb: improve docs and add IdealBatchSize

* ethdb: remove memory batch lock

Batches are not safe for concurrent use.

* core: use ethdb.Putter for Write* functions

This covers the easy cases.

* core/state: simplify StateSync

* trie: optimize local node check

* ethdb: add ValueSize to Batch

* core: optimize HasHeader check

This avoids one random database read get the block number. For many uses
of HasHeader, the expectation is that it's actually there. Using Has
avoids a load + decode of the value.

* core: write fast sync block data in batches

Collect writes into batches up to the ideal size instead of issuing many
small, concurrent writes.

* eth/downloader: commit larger state batches

Collect nodes into a batch up to the ideal size instead of committing
whenever a node is received.

* core: optimize HasBlock check

This avoids a random database read to get the number.

* core: use numberCache in HasHeader

numberCache has higher capacity, increasing the odds of finding the
header without a database lookup.

* core: write imported block data using a batch

Restore batch writes of state and add blocks, tx entries, receipts to
the same batch. The change also simplifies the miner.

This commit also removes posting of logs when a forked block is imported.

* core: fix DB write error handling

* ethdb: use RLock for Has

* core: fix HasBlock comment
  • Loading branch information
fjl authored and karalabe committed Sep 9, 2017
1 parent ac193e3 commit 10181b5
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 280 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func ImportChain(chain *core.BlockChain, fn string) error {

func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool {
for _, b := range bs {
if !chain.HasBlock(b.Hash()) {
if !chain.HasBlock(b.Hash(), b.NumberU64()) {
return false
}
}
Expand Down
224 changes: 97 additions & 127 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"io"
"math/big"
mrand "math/rand"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -515,10 +514,13 @@ func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
return body
}

// HasBlock checks if a block is fully present in the database or not, caching
// it if present.
func (bc *BlockChain) HasBlock(hash common.Hash) bool {
return bc.GetBlockByHash(hash) != nil
// HasBlock checks if a block is fully present in the database or not.
func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool {
if bc.blockCache.Contains(hash) {
return true
}
ok, _ := bc.chainDb.Has(blockBodyKey(hash, number))
return ok
}

// HasBlockAndState checks if a block and associated state trie is fully present
Expand Down Expand Up @@ -693,108 +695,73 @@ func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts ty

// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
// XXX should this be moved to the test?
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
bc.wg.Add(1)
defer bc.wg.Done()

// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(blockChain); i++ {
if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
// Chain broke ancestry, log a messge (programming error) and skip insertion
log.Error("Non contiguous receipt insert", "number", blockChain[i].Number(), "hash", blockChain[i].Hash(), "parent", blockChain[i].ParentHash(),
"prevnumber", blockChain[i-1].Number(), "prevhash", blockChain[i-1].Hash())

return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])
}
}
// Pre-checks passed, start the block body and receipt imports
bc.wg.Add(1)
defer bc.wg.Done()

// Collect some import statistics to report on
stats := struct{ processed, ignored int32 }{}
start := time.Now()

// Create the block importing task queue and worker functions
tasks := make(chan int, len(blockChain))
for i := 0; i < len(blockChain) && i < len(receiptChain); i++ {
tasks <- i
}
close(tasks)

errs, failed := make([]error, len(tasks)), int32(0)
process := func(worker int) {
for index := range tasks {
block, receipts := blockChain[index], receiptChain[index]
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
bytes = 0
batch = bc.chainDb.NewBatch()
)
for i, block := range blockChain {
receipts := receiptChain[i]
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
}
// Skip if the entire data is already known
if bc.HasBlock(block.Hash(), block.NumberU64()) {
stats.ignored++
continue
}
// Compute all the non-consensus fields of the receipts
SetReceiptsData(bc.config, block, receipts)
// Write all the data out into the database
if err := WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return i, fmt.Errorf("failed to write block body: %v", err)
}
if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return i, fmt.Errorf("failed to write block receipts: %v", err)
}
if err := WriteTxLookupEntries(batch, block); err != nil {
return i, fmt.Errorf("failed to write lookup metadata: %v", err)
}
stats.processed++

// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return
}
if atomic.LoadInt32(&failed) > 0 {
return
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash()) {
errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
atomic.AddInt32(&failed, 1)
return
}
// Skip if the entire data is already known
if bc.HasBlock(block.Hash()) {
atomic.AddInt32(&stats.ignored, 1)
continue
}
// Compute all the non-consensus fields of the receipts
SetReceiptsData(bc.config, block, receipts)
// Write all the data out into the database
if err := WriteBody(bc.chainDb, block.Hash(), block.NumberU64(), block.Body()); err != nil {
errs[index] = fmt.Errorf("failed to write block body: %v", err)
atomic.AddInt32(&failed, 1)
log.Crit("Failed to write block body", "err", err)
return
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
}
if err := WriteBlockReceipts(bc.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
atomic.AddInt32(&failed, 1)
log.Crit("Failed to write block receipts", "err", err)
return
}
if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
errs[index] = fmt.Errorf("failed to write lookup metadata: %v", err)
atomic.AddInt32(&failed, 1)
log.Crit("Failed to write lookup metadata", "err", err)
return
}
atomic.AddInt32(&stats.processed, 1)
bytes += batch.ValueSize()
batch = bc.chainDb.NewBatch()
}
}
// Start as many worker threads as goroutines allowed
pending := new(sync.WaitGroup)
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
pending.Add(1)
go func(id int) {
defer pending.Done()
process(id)
}(i)
}
pending.Wait()

// If anything failed, report
if failed > 0 {
for i, err := range errs {
if err != nil {
return i, err
}
if batch.ValueSize() > 0 {
bytes += batch.ValueSize()
if err := batch.Write(); err != nil {
return 0, err
}
}
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during receipts processing")
return 0, nil
}

// Update the head fast sync block if better
bc.mu.Lock()

head := blockChain[len(errs)-1]
head := blockChain[len(blockChain)-1]
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
if bc.GetTd(bc.currentFastBlock.Hash(), bc.currentFastBlock.NumberU64()).Cmp(td) < 0 {
if err := WriteHeadFastBlockHash(bc.chainDb, head.Hash()); err != nil {
Expand All @@ -805,16 +772,18 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
bc.mu.Unlock()

// Report some public statistics so the user has a clue what's going on
last := blockChain[len(blockChain)-1]
log.Info("Imported new block receipts", "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", last.Number(), "hash", last.Hash(), "ignored", stats.ignored)

log.Info("Imported new block receipts",
"count", stats.processed,
"elapsed", common.PrettyDuration(time.Since(start)),
"bytes", bytes,
"number", head.Number(),
"hash", head.Hash(),
"ignored", stats.ignored)
return 0, nil
}

// WriteBlock writes the block to the chain.
func (bc *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err error) {
func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()

Expand All @@ -827,7 +796,7 @@ func (bc *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err er
bc.mu.Lock()
defer bc.mu.Unlock()

if bc.HasBlock(block.Hash()) {
if bc.HasBlock(block.Hash(), block.NumberU64()) {
log.Trace("Block existed", "hash", block.Hash())
return
}
Expand All @@ -837,10 +806,18 @@ func (bc *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err er

// Irrelevant of the canonical status, write the block itself to the database
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
log.Crit("Failed to write block total difficulty", "err", err)
return NonStatTy, err
}
if err := WriteBlock(bc.chainDb, block); err != nil {
log.Crit("Failed to write block contents", "err", err)
// Write other block data using a batch.
batch := bc.chainDb.NewBatch()
if err := WriteBlock(batch, block); err != nil {
return NonStatTy, err
}
if _, err := state.CommitTo(batch, bc.config.IsEIP158(block.Number())); err != nil {
return NonStatTy, err
}
if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return NonStatTy, err
}

// If the total difficulty is higher than our known, add it to the canonical chain
Expand All @@ -853,15 +830,28 @@ func (bc *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err er
return NonStatTy, err
}
}
bc.insert(block) // Insert the block as the new head of the chain
// Write the positional metadata for transaction and receipt lookups
if err := WriteTxLookupEntries(batch, block); err != nil {
return NonStatTy, err
}
// Write hash preimages
if err := WritePreimages(bc.chainDb, block.NumberU64(), state.Preimages()); err != nil {
return NonStatTy, err
}
status = CanonStatTy
} else {
status = SideStatTy
}
if err := batch.Write(); err != nil {
return NonStatTy, err
}

// Set new head.
if status == CanonStatTy {
bc.insert(block)
}
bc.futureBlocks.Remove(block.Hash())

return
return status, nil
}

// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. If an error is returned
Expand Down Expand Up @@ -975,29 +965,18 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
bc.reportBlock(block, receipts, err)
return i, err
}
// Write state changes to database
if _, err = state.CommitTo(bc.chainDb, bc.config.IsEIP158(block.Number())); err != nil {
return i, err
}

// coalesce logs for later processing
coalescedLogs = append(coalescedLogs, logs...)

if err = WriteBlockReceipts(bc.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
return i, err
}

// write the block to the chain and get the status
status, err := bc.WriteBlock(block)
// Write the block to the chain and get the status.
status, err := bc.WriteBlockAndState(block, receipts, state)
if err != nil {
return i, err
}

switch status {
case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))

// coalesce logs for later processing
coalescedLogs = append(coalescedLogs, logs...)
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs})
// We need some control over the mining operation. Acquiring locks and waiting
Expand All @@ -1006,15 +985,6 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if bc.LastBlockHash() == block.Hash() {
events = append(events, ChainHeadEvent{block})
}

// Write the positional metadata for transaction and receipt lookups
if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
return i, err
}
// Write hash preimages
if err := WritePreimages(bc.chainDb, block.NumberU64(), state.Preimages()); err != nil {
return i, err
}
case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
Expand Down Expand Up @@ -1357,8 +1327,8 @@ func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {

// HasHeader checks if a block header is present in the database or not, caching
// it if present.
func (bc *BlockChain) HasHeader(hash common.Hash) bool {
return bc.hc.HasHeader(hash)
func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool {
return bc.hc.HasHeader(hash, number)
}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
Expand Down
Loading

0 comments on commit 10181b5

Please sign in to comment.