Skip to content

Commit

Permalink
core, light: send chain events using event.Feed (ethereum#14865)
Browse files Browse the repository at this point in the history
  • Loading branch information
tailingchen authored and fjl committed Aug 18, 2017
1 parent a4da841 commit bf1e263
Show file tree
Hide file tree
Showing 37 changed files with 787 additions and 373 deletions.
3 changes: 1 addition & 2 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -61,7 +60,7 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
database, _ := ethdb.NewMemDatabase()
genesis := core.Genesis{Config: params.AllProtocolChanges, Alloc: alloc}
genesis.MustCommit(database)
blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), vm.Config{})
backend := &SimulatedBackend{database: database, blockchain: blockchain, config: genesis.Config}
backend.rollback()
return backend
Expand Down
3 changes: 1 addition & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethstats"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand Down Expand Up @@ -1103,7 +1102,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
Fatalf("%v", err)
}
vmcfg := vm.Config{EnablePreimageRecording: ctx.GlobalBool(VMEnableDebugFlag.Name)}
chain, err = core.NewBlockChain(chainDb, config, engine, new(event.TypeMux), vmcfg)
chain, err = core.NewBlockChain(chainDb, config, engine, vmcfg)
if err != nil {
Fatalf("Can't create BlockChain: %v", err)
}
Expand Down
6 changes: 2 additions & 4 deletions core/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -175,8 +174,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {

// Time the insertion of the new chain.
// State and blocks are stored in the same DB.
evmux := new(event.TypeMux)
chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), evmux, vm.Config{})
chainman, _ := NewBlockChain(db, gspec.Config, ethash.NewFaker(), vm.Config{})
defer chainman.Stop()
b.ReportAllocs()
b.ResetTimer()
Expand Down Expand Up @@ -286,7 +284,7 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
if err != nil {
b.Fatalf("error opening database at %v: %v", dir, err)
}
chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
chain, err := NewBlockChain(db, params.TestChainConfig, ethash.NewFaker(), vm.Config{})
if err != nil {
b.Fatalf("error creating chain: %v", err)
}
Expand Down
9 changes: 4 additions & 5 deletions core/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)

Expand All @@ -43,7 +42,7 @@ func TestHeaderVerification(t *testing.T) {
headers[i] = block.Header()
}
// Run the header checker for blocks one-by-one, checking for both valid and invalid nonces
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{})
defer chain.Stop()

for i := 0; i < len(blocks); i++ {
Expand Down Expand Up @@ -107,11 +106,11 @@ func testHeaderConcurrentVerification(t *testing.T, threads int) {
var results <-chan error

if valid {
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFaker(), vm.Config{})
_, results = chain.engine.VerifyHeaders(chain, headers, seals)
chain.Stop()
} else {
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), new(event.TypeMux), vm.Config{})
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeFailer(uint64(len(headers)-1)), vm.Config{})
_, results = chain.engine.VerifyHeaders(chain, headers, seals)
chain.Stop()
}
Expand Down Expand Up @@ -174,7 +173,7 @@ func testHeaderConcurrentAbortion(t *testing.T, threads int) {
defer runtime.GOMAXPROCS(old)

// Start the verifications and immediately abort
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), new(event.TypeMux), vm.Config{})
chain, _ := NewBlockChain(testdb, params.TestChainConfig, ethash.NewFakeDelayer(time.Millisecond), vm.Config{})
defer chain.Stop()

abort, results := chain.engine.VerifyHeaders(chain, headers, seals)
Expand Down
92 changes: 69 additions & 23 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,16 @@ const (
type BlockChain struct {
config *params.ChainConfig // chain & network configuration

hc *HeaderChain
chainDb ethdb.Database
eventMux *event.TypeMux
genesisBlock *types.Block
hc *HeaderChain
chainDb ethdb.Database
rmTxFeed event.Feed
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
Expand Down Expand Up @@ -115,7 +121,7 @@ type BlockChain struct {
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator and
// Processor.
func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux, vmConfig vm.Config) (*BlockChain, error) {
func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
Expand All @@ -126,7 +132,6 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co
config: config,
chainDb: chainDb,
stateCache: state.NewDatabase(chainDb),
eventMux: mux,
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
Expand Down Expand Up @@ -594,6 +599,8 @@ func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}
// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
atomic.StoreInt32(&bc.procInterrupt, 1)

Expand Down Expand Up @@ -1000,6 +1007,12 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {

blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs})
// We need some control over the mining operation. Acquiring locks and waiting
// for the miner to create new block takes too long and in most cases isn't
// even necessary.
if bc.LastBlockHash() == block.Hash() {
events = append(events, ChainHeadEvent{block})
}

// Write the positional metadata for transaction and receipt lookups
if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
Expand All @@ -1024,7 +1037,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
stats.usedGas += usedGas.Uint64()
stats.report(chain, i)
}
go bc.postChainEvents(events, coalescedLogs)
go bc.PostChainEvents(events, coalescedLogs)

return 0, nil
}
Expand Down Expand Up @@ -1184,39 +1197,42 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Must be posted in a goroutine because of the transaction pool trying
// to acquire the chain manager lock
if len(diff) > 0 {
go bc.eventMux.Post(RemovedTransactionEvent{diff})
go bc.rmTxFeed.Send(RemovedTransactionEvent{diff})
}
if len(deletedLogs) > 0 {
go bc.eventMux.Post(RemovedLogsEvent{deletedLogs})
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}

if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
bc.eventMux.Post(ChainSideEvent{Block: block})
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
}()
}

return nil
}

// postChainEvents iterates over the events generated by a chain insertion and
// posts them into the event mux.
func (bc *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) {
// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
// post event logs for further processing
bc.eventMux.Post(logs)
if logs != nil {
bc.logsFeed.Send(logs)
}
for _, event := range events {
if event, ok := event.(ChainEvent); ok {
// We need some control over the mining operation. Acquiring locks and waiting
// for the miner to create new block takes too long and in most cases isn't
// even necessary.
if bc.LastBlockHash() == event.Hash {
bc.eventMux.Post(ChainHeadEvent{event.Block})
}
switch ev := event.(type) {
case ChainEvent:
bc.chainFeed.Send(ev)

case ChainHeadEvent:
bc.chainHeadFeed.Send(ev)

case ChainSideEvent:
bc.chainSideFeed.Send(ev)
}
// Fire the insertion events individually too
bc.eventMux.Post(event)
}
}

Expand Down Expand Up @@ -1384,3 +1400,33 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config }

// Engine retrieves the blockchain's consensus engine.
func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }

// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent.
func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
return bc.scope.Track(bc.rmTxFeed.Subscribe(ch))
}

// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
}

// SubscribeChainEvent registers a subscription of ChainEvent.
func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
return bc.scope.Track(bc.chainFeed.Subscribe(ch))
}

// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
}

// SubscribeLogsEvent registers a subscription of []*types.Log.
func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return bc.scope.Track(bc.logsFeed.Subscribe(ch))
}
Loading

0 comments on commit bf1e263

Please sign in to comment.