Skip to content

Commit

Permalink
Bloom Indexer uses stored bloom to calculate and store indexed bloom …
Browse files Browse the repository at this point in the history
…filter instead of header
  • Loading branch information
jdowning100 committed Jun 20, 2023
1 parent 8e31ff0 commit cd507f4
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 12 deletions.
4 changes: 2 additions & 2 deletions core/bloom_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHea

// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error {
b.gen.AddBloom(uint(header.Number().Uint64()-b.section*b.size), header.Bloom())
func (b *BloomIndexer) Process(ctx context.Context, header *types.Header, bloom types.Bloom) error {
b.gen.AddBloom(uint(header.Number().Uint64()-b.section*b.size), bloom)
b.head = header.Hash()
return nil
}
Expand Down
23 changes: 14 additions & 9 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ChainIndexerBackend interface {

// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(ctx context.Context, header *types.Header) error
Process(ctx context.Context, header *types.Header, bloom types.Bloom) error

// Commit finalizes the section metadata and stores it into the database.
Commit() error
Expand All @@ -55,7 +55,8 @@ type ChainIndexerBackend interface {
type ChainIndexerChain interface {
// CurrentHeader retrieves the latest locally known header.
CurrentHeader() *types.Header

// GetBloom retrieves the bloom for the given block hash.
GetBloom(blockhash common.Hash) (*types.Bloom, error)
// SubscribeChainHeadEvent subscribes to new head header notifications.
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
}
Expand All @@ -70,11 +71,11 @@ type ChainIndexerChain interface {
// after an entire section has been finished or in case of rollbacks that might
// affect already finished sections.
type ChainIndexer struct {
chainDb ethdb.Database // Chain database to index the data from
indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to

chainDb ethdb.Database // Chain database to index the data from
indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
GetBloom func(common.Hash) (*types.Bloom, error)
active uint32 // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
Expand Down Expand Up @@ -124,7 +125,7 @@ func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend Cha
func (c *ChainIndexer) Start(chain ChainIndexerChain) {
events := make(chan ChainHeadEvent, 10)
sub := chain.SubscribeChainHeadEvent(events)

c.GetBloom = chain.GetBloom
go c.eventLoop(chain.CurrentHeader(), events, sub)
}

Expand Down Expand Up @@ -363,7 +364,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
} else if header.ParentHash() != lastHead {
return common.Hash{}, fmt.Errorf("chain reorged during section processing")
}
if err := c.backend.Process(c.ctx, header); err != nil {
bloom, err := c.GetBloom(header.Hash())
if err != nil {
return common.Hash{}, err
}
if err := c.backend.Process(c.ctx, header, *bloom); err != nil {
return common.Hash{}, err
}
lastHead = header.Hash()
Expand Down
6 changes: 6 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ var (
// ErrPendingEtxAlreadyKnown is returned received pending etx already in the cache/db
ErrPendingEtxAlreadyKnown = errors.New("pending etx already known")

// ErrBloomAlreadyKnown is returned if received bloom is already in the cache/db
ErrBloomAlreadyKnown = errors.New("bloom already known")

// ErrBodyNotFound is returned when body data for a given header hash cannot be found.
ErrBodyNotFound = errors.New("could not find the body data to match the header root hash")

Expand All @@ -53,6 +56,9 @@ var (
//ErrPendingEtxNotFound is returned when pendingEtxs cannot be found for a hash given in the submanifest
ErrPendingEtxNotFound = errors.New("pending etx not found")

//ErrBloomNotFound is returned when bloom cannot be found for a hash
ErrBloomNotFound = errors.New("bloom not found")

//ErrPendingEtxRollupNotFound is returned when pendingEtxsRollup cannot be found for a hash given in the submanifest
ErrPendingEtxRollupNotFound = errors.New("pending etx rollup not found")

Expand Down
32 changes: 32 additions & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type HeaderChain struct {

pendingEtxsRollup *lru.Cache
pendingEtxs *lru.Cache
blooms *lru.Cache
missingPendingEtxsFeed event.Feed
missingPendingEtxsRollupFeed event.Feed

Expand Down Expand Up @@ -82,6 +83,9 @@ func NewHeaderChain(db ethdb.Database, engine consensus.Engine, chainConfig *par
pendingEtxs, _ := lru.New(c_maxPendingEtxBatches)
hc.pendingEtxs = pendingEtxs

blooms, _ := lru.New(c_maxBloomFilters)
hc.blooms = blooms

hc.genesisHeader = hc.GetHeaderByNumber(0)
if hc.genesisHeader.Hash() != chainConfig.GenesisHash {
return nil, fmt.Errorf("genesis block mismatch: have %x, want %x", hc.genesisHeader.Hash(), chainConfig.GenesisHash)
Expand Down Expand Up @@ -186,6 +190,21 @@ func (hc *HeaderChain) GetPendingEtxsRollup(hash common.Hash) (*types.PendingEtx
return &rollups, nil
}

// GetBloom gets the bloom from the cache or database
func (hc *HeaderChain) GetBloom(hash common.Hash) (*types.Bloom, error) {
var bloom types.Bloom
// Look for bloom first in bloom cache, then in database
if res, ok := hc.blooms.Get(hash); ok && res != nil {
bloom = res.(types.Bloom)
} else if res := rawdb.ReadBloom(hc.headerDb, hash); res != nil {
bloom = *res
} else {
log.Debug("unable to find bloom for hash in database", "hash:", hash.String())
return nil, ErrBloomNotFound
}
return &bloom, nil
}

// backfillPETXs collects any missing PendingETX objects needed to process the
// given header. This is done by informing the fetcher of any pending ETXs we do
// not have, so that they can be fetched from our peers.
Expand Down Expand Up @@ -394,6 +413,19 @@ func (hc *HeaderChain) AddPendingEtxs(pEtxs types.PendingEtxs) error {
return nil
}

func (hc *HeaderChain) AddBloom(bloom types.Bloom, hash common.Hash) error {
// Only write the bloom if we have not seen it before
if !hc.blooms.Contains(hash) {
// Write to bloom database
rawdb.WriteBloom(hc.headerDb, hash, bloom)
// Also write to cache for faster access
hc.blooms.Add(hash, bloom)
} else {
return ErrBloomAlreadyKnown
}
return nil
}

// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (hc *HeaderChain) loadLastState() error {
Expand Down
47 changes: 47 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,3 +1304,50 @@ func DeleteManifest(db ethdb.KeyValueWriter, hash common.Hash) {
log.Fatal("Failed to delete manifest", "err", err)
}
}

// ReadBloomRLP retrieves the bloom for the given block, in RLP encoding
func ReadBloomRLP(db ethdb.Reader, hash common.Hash) rlp.RawValue {
// Try to look up the data in leveldb.
data, _ := db.Get(bloomKey(hash))
if len(data) > 0 {
return data
}
return nil // Can't find the data anywhere.
}

// WriteBloomRLP stores the bloom corresponding to a given block, in RLP encoding.
func WriteBloomRLP(db ethdb.KeyValueWriter, hash common.Hash, rlp rlp.RawValue) {
if err := db.Put(bloomKey(hash), rlp); err != nil {
log.Fatal("Failed to store block bloom filter", "err", err)
}
}

// ReadBloom retreives the bloom corresponding to a given block
func ReadBloom(db ethdb.Reader, hash common.Hash) *types.Bloom {
data := ReadBloomRLP(db, hash)
if len(data) == 0 {
return nil
}
bloom := types.Bloom{}
if err := rlp.Decode(bytes.NewReader(data), &bloom); err != nil {
log.Error("Invalid bloom RLP", "hash", hash, "err", err)
return nil
}
return &bloom
}

// WriteBloom stores the bloom corresponding to a given block
func WriteBloom(db ethdb.KeyValueWriter, hash common.Hash, bloom types.Bloom) {
data, err := rlp.EncodeToBytes(bloom)
if err != nil {
log.Fatal("Failed to RLP encode pending etxs", "err", err)
}
WriteBloomRLP(db, hash, data)
}

// DeleteBloom removes all bloom data associated with a block.
func DeleteBloom(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
if err := db.Delete(bloomKey(hash)); err != nil {
log.Fatal("Failed to delete bloom", "err", err)
}
}
5 changes: 5 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var (
pendingEtxsPrefix = []byte("pe") // pendingEtxsPrefix + hash -> PendingEtxs at block
pendingEtxsRollupPrefix = []byte("pr") // pendingEtxsRollupPrefix + hash -> PendingEtxsRollup at block
manifestPrefix = []byte("ma") // manifestPrefix + hash -> Manifest at block
bloomPrefix = []byte("bl") // bloomPrefix + hash -> bloom at block

txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits
Expand Down Expand Up @@ -297,3 +298,7 @@ func pendingEtxsRollupKey(hash common.Hash) []byte {
func manifestKey(hash common.Hash) []byte {
return append(manifestPrefix, hash.Bytes()...)
}

func bloomKey(hash common.Hash) []byte {
return append(bloomPrefix, hash.Bytes()...)
}
5 changes: 5 additions & 0 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
const (
c_maxPendingEtxBatches = 1024
c_maxPendingEtxsRollup = 256
c_maxBloomFilters = 1024
c_pendingHeaderCacheLimit = 100
c_pendingHeaderChacheBufferFactor = 2
pendingHeaderGCTime = 5
Expand Down Expand Up @@ -661,6 +662,10 @@ func (sl *Slice) init(genesis *Genesis) error {
if err != nil {
return err
}
err = sl.hc.AddBloom(types.Bloom{}, genesisHeader.Hash())
if err != nil {
return err
}
rawdb.WriteEtxSet(sl.sliceDb, genesisHash, 0, types.NewEtxSet())

if common.NodeLocation.Context() == common.PRIME_CTX {
Expand Down
6 changes: 5 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.Block, newInbound
}
time4 := common.PrettyDuration(time.Since(start))
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
time4_5 := common.PrettyDuration(time.Since(start))
// Create bloom filter and write it to cache/db
bloom := types.CreateBloom(receipts)
p.hc.AddBloom(bloom, block.Hash())
time5 := common.PrettyDuration(time.Since(start))
rawdb.WritePreimages(batch, statedb.Preimages())
time6 := common.PrettyDuration(time.Since(start))
Expand Down Expand Up @@ -405,7 +409,7 @@ func (p *StateProcessor) Apply(batch ethdb.Batch, block *types.Block, newInbound
rawdb.WriteEtxSet(p.hc.bc.db, block.Hash(), block.NumberU64(), etxSet)
time12 := common.PrettyDuration(time.Since(start))

log.Info("times during state processor apply:", "t1:", time1, "t2:", time2, "t3:", time3, "t4:", time4, "t5:", time5, "t6:", time6, "t7:", time7, "t8:", time8, "t9:", time9, "t10:", time10, "t11:", time11, "t12:", time12)
log.Info("times during state processor apply:", "t1:", time1, "t2:", time2, "t3:", time3, "t4:", time4, "t4.5:", time4_5, "t5:", time5, "t6:", time6, "t7:", time7, "t8:", time8, "t9:", time9, "t10:", time10, "t11:", time11, "t12:", time12)
return logs, nil
}

Expand Down

0 comments on commit cd507f4

Please sign in to comment.