Skip to content

Commit

Permalink
les, light: LES/2 protocol version (ethereum#14970)
Browse files Browse the repository at this point in the history
This PR implements the new LES protocol version extensions:

* new and more efficient Merkle proofs reply format (when replying to
  a multiple Merkle proofs request, we just send a single set of trie
  nodes containing all necessary nodes)
* BBT (BloomBitsTrie) works similarly to the existing CHT and contains
  the bloombits search data to speed up log searches
* GetTxStatusMsg returns the inclusion position or the
  pending/queued/unknown state of a transaction referenced by hash
* an optional signature of new block data (number/hash/td) can be
  included in AnnounceMsg to provide an option for "very light
  clients" (mobile/embedded devices) to skip expensive Ethash check
  and accept multiple signatures of somewhat trusted servers (still a
  lot better than trusting a single server completely and retrieving
  everything through RPC). The new client mode is not implemented in
  this PR, just the protocol extension.
  • Loading branch information
zsfelfoldi authored and fjl committed Oct 24, 2017
1 parent 6d6a5a9 commit ca376ea
Show file tree
Hide file tree
Showing 34 changed files with 2,049 additions and 481 deletions.
51 changes: 44 additions & 7 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bloombits

import (
"bytes"
"context"
"errors"
"math"
"sort"
Expand Down Expand Up @@ -60,6 +61,8 @@ type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Error error
Context context.Context
}

// Matcher is a pipelined system of schedulers and logic matchers which perform
Expand Down Expand Up @@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) {
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) {
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
return nil, errors.New("matcher already running")
Expand All @@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
Expand Down Expand Up @@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher

quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown
pend sync.WaitGroup
quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown
ctx context.Context
err error
stopping bool
lock sync.Mutex
pend sync.WaitGroup
}

// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close(timeout time.Duration) {
func (s *MatcherSession) Close() {
s.lock.Lock()
stopping := s.stopping
s.stopping = true
s.lock.Unlock()
// ensure that we only close the session once
if stopping {
return
}

// Bail out if the matcher is not running
select {
case <-s.quit:
Expand All @@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) {
}
// Signal termination and wait for all goroutines to tear down
close(s.quit)
time.AfterFunc(timeout, func() { close(s.kill) })
time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
}

// setError sets an error and stops the session
func (s *MatcherSession) setError(err error) {
s.lock.Lock()
s.err = err
s.lock.Unlock()
s.Close()
}

// Error returns an error if one has happened during the session
func (s *MatcherSession) Error() error {
s.lock.Lock()
defer s.lock.Unlock()

return s.err
}

// AllocateRetrieval assigns a bloom bit index to a client process that can either
// immediately reuest and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
Expand Down Expand Up @@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan

case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
request <- &Retrieval{Bit: bit, Sections: sections}
request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}

result := <-request
if result.Error != nil {
s.setError(result.Error)
}

s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
Expand Down
9 changes: 5 additions & 4 deletions core/bloombits/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package bloombits

import (
"context"
"math/rand"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -144,7 +145,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
quit := make(chan struct{})
matches := make(chan uint64, 16)

session, err := matcher.Start(0, blocks-1, matches)
session, err := matcher.Start(context.Background(), 0, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
Expand All @@ -163,13 +164,13 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
}
// If we're testing intermittent mode, abort and restart the pipeline
if intermittent {
session.Close(time.Second)
session.Close()
close(quit)

quit = make(chan struct{})
matches = make(chan uint64, 16)

session, err = matcher.Start(i+1, blocks-1, matches)
session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
Expand All @@ -183,7 +184,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
}
// Clean up the session and ensure we match the expected retrieval count
session.Close(time.Second)
session.Close()
close(quit)

if retrievals != 0 && requested != retrievals {
Expand Down
69 changes: 50 additions & 19 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset(section uint64)
Reset(section uint64, lastSectionHead common.Hash) error

// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)

// Commit finalizes the section metadata and stores it into the database.
// Commit finalizes the section metadata and stores it into the database. This
// interface will usually be a batch writer.
Commit() error
}

Expand Down Expand Up @@ -100,11 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
return c
}

// AddKnownSectionHead marks a new section head as known/processed if it is newer
// than the already known best section head
func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()

if section < c.storedSections {
return
}
c.setSectionHead(section, shead)
c.setValidSections(section + 1)
}

// IndexerChain interface is used for connecting the indexer to a blockchain
type IndexerChain interface {
CurrentHeader() *types.Header
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}

// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
go c.eventLoop(currentHeader, chainEventer)
func (c *ChainIndexer) Start(chain IndexerChain) {
ch := make(chan ChainEvent, 10)
sub := chain.SubscribeChainEvent(ch)
currentHeader := chain.CurrentHeader()

go c.eventLoop(currentHeader, ch, sub)
}

// Close tears down all goroutines belonging to the indexer and returns any error
Expand All @@ -125,12 +149,14 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}

// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}

// Return any failures
switch {
case len(errs) == 0:
Expand All @@ -147,12 +173,10 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)

events := make(chan ChainEvent, 10)
sub := chainEventer(events)
defer sub.Unsubscribe()

// Fire the initial new head event to start any outstanding processing
Expand All @@ -169,7 +193,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
errc <- nil
return

case ev, ok := <-events:
case ev, ok := <-ch:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
Expand All @@ -178,7 +202,9 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
}
c.newHead(header.Number.Uint64(), false)

Expand Down Expand Up @@ -233,9 +259,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
updating bool
updated time.Time
updated time.Time
updateMsg bool
)

for {
select {
case errc := <-c.quit:
Expand All @@ -250,7 +277,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updating = true
updateMsg = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
Expand All @@ -259,7 +286,7 @@ func (c *ChainIndexer) updateLoop() {
section := c.storedSections
var oldHead common.Hash
if section > 0 {
oldHead = c.sectionHead(section - 1)
oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
Expand All @@ -270,11 +297,11 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Lock()

// If processing succeeded and no reorgs occcurred, mark the section completed
if err == nil && oldHead == c.sectionHead(section-1) {
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updating {
updating = false
if c.storedSections == c.knownSections && updateMsg {
updateMsg = false
c.log.Info("Finished upgrading chain index")
}

Expand Down Expand Up @@ -311,7 +338,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)

// Reset and partial processing
c.backend.Reset(section)

if err := c.backend.Reset(section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}

for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
Expand Down Expand Up @@ -341,7 +372,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()

return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1)
return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}

// AddChildIndexer adds a child ChainIndexer that can use the output of this one
Expand Down Expand Up @@ -383,7 +414,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {

// sectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) sectionHead(section uint64) common.Hash {
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)

Expand Down
4 changes: 3 additions & 1 deletion core/chain_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)
Expand Down Expand Up @@ -208,9 +209,10 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}

func (b *testChainIndexBackend) Reset(section uint64) {
func (b *testChainIndexBackend) Reset(section uint64, lastSectionHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
}

func (b *testChainIndexBackend) Process(header *types.Header) {
Expand Down
Loading

0 comments on commit ca376ea

Please sign in to comment.