Skip to content

Commit

Permalink
Enabled snaps back into the codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed May 31, 2023
1 parent 0971356 commit eb557c7
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 34 deletions.
4 changes: 2 additions & 2 deletions cmd/go-quai/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func verifyState(ctx *cli.Context) error {
log.Error("Failed to load head block")
return errors.New("no head block")
}
snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, headBlock.Root(), false, false, false)
snaptree, err := snapshot.New(chaindb, trie.NewDatabase(chaindb), 256, headBlock.Root(), false, false)
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
Expand Down Expand Up @@ -456,7 +456,7 @@ func dumpState(ctx *cli.Context) error {
if err != nil {
return err
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, root, false, false, false)
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, root, false, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (*core.Core, ethdb.Database)

// TODO(rjl493456442) disable snapshot generation/wiping if the chain is read only.
// Disable transaction indexing/unindexing by default.
protocol, err := core.NewCore(chainDb, nil, nil, nil, config, ctx.GlobalString(DomUrl.Name), makeSubUrls(ctx), engine, cache, vmcfg, &core.Genesis{})
protocol, err := core.NewCore(chainDb, nil, nil, nil, nil, config, ctx.GlobalString(DomUrl.Name), makeSubUrls(ctx), engine, cache, vmcfg, &core.Genesis{})
if err != nil {
Fatalf("Can't create BlockChain: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions core/bodydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type BodyDb struct {
processor *StateProcessor
}

func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, vmConfig vm.Config) (*BodyDb, error) {
func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, txLookupLimit *uint64, vmConfig vm.Config) (*BodyDb, error) {
nodeCtx := common.NodeLocation.Context()
blockCache, _ := lru.New(blockCacheLimit)
bodyCache, _ := lru.New(bodyCacheLimit)
Expand All @@ -59,7 +59,7 @@ func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chai

// only start the state processor in zone
if nodeCtx == common.ZONE_CTX {
bc.processor = NewStateProcessor(chainConfig, hc, engine, vmConfig, cacheConfig)
bc.processor = NewStateProcessor(chainConfig, hc, engine, vmConfig, cacheConfig, txLookupLimit)
}

return bc, nil
Expand Down
6 changes: 3 additions & 3 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type Core struct {
quit chan struct{} // core quit channel
}

func NewCore(db ethdb.Database, config *Config, isLocalBlock func(block *types.Header) bool, txConfig *TxPoolConfig, chainConfig *params.ChainConfig, domClientUrl string, subClientUrls []string, engine consensus.Engine, cacheConfig *CacheConfig, vmConfig vm.Config, genesis *Genesis) (*Core, error) {
slice, err := NewSlice(db, config, txConfig, isLocalBlock, chainConfig, domClientUrl, subClientUrls, engine, cacheConfig, vmConfig, genesis)
func NewCore(db ethdb.Database, config *Config, isLocalBlock func(block *types.Header) bool, txConfig *TxPoolConfig, txLookupLimit *uint64, chainConfig *params.ChainConfig, domClientUrl string, subClientUrls []string, engine consensus.Engine, cacheConfig *CacheConfig, vmConfig vm.Config, genesis *Genesis) (*Core, error) {
slice, err := NewSlice(db, config, txConfig, txLookupLimit, isLocalBlock, chainConfig, domClientUrl, subClientUrls, engine, cacheConfig, vmConfig, genesis)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -654,7 +654,7 @@ func (c *Core) State() (*state.StateDB, error) {

// StateAt returns a new mutable state based on a particular point in time.
func (c *Core) StateAt(root common.Hash) (*state.StateDB, error) {
return state.New(root, c.sl.hc.bc.processor.stateCache, nil)
return c.sl.hc.bc.processor.StateAt(root)
}

// StateCache returns the caching database underpinning the blockchain instance.
Expand Down
24 changes: 12 additions & 12 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type HeaderChain struct {

// NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points
// to the parent's interrupt semaphore.
func NewHeaderChain(db ethdb.Database, engine consensus.Engine, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, vmConfig vm.Config) (*HeaderChain, error) {
func NewHeaderChain(db ethdb.Database, engine consensus.Engine, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, txLookupLimit *uint64, vmConfig vm.Config) (*HeaderChain, error) {
headerCache, _ := lru.New(headerCacheLimit)
numberCache, _ := lru.New(numberCacheLimit)

Expand All @@ -82,12 +82,6 @@ func NewHeaderChain(db ethdb.Database, engine consensus.Engine, chainConfig *par
pendingEtxs, _ := lru.New(c_maxPendingEtxBatches)
hc.pendingEtxs = pendingEtxs

var err error
hc.bc, err = NewBodyDb(db, engine, hc, chainConfig, cacheConfig, vmConfig)
if err != nil {
return nil, err
}

hc.genesisHeader = hc.GetHeaderByNumber(0)
if hc.genesisHeader.Hash() != chainConfig.GenesisHash {
return nil, fmt.Errorf("genesis block mismatch: have %x, want %x", hc.genesisHeader.Hash(), chainConfig.GenesisHash)
Expand All @@ -96,15 +90,21 @@ func NewHeaderChain(db ethdb.Database, engine consensus.Engine, chainConfig *par
if hc.genesisHeader == nil {
return nil, ErrNoGenesis
}

// Initialize the heads slice
heads := make([]*types.Header, 0)
hc.heads = heads
//Load any state that is in our db
if err := hc.loadLastState(); err != nil {
return nil, err
}

var err error
hc.bc, err = NewBodyDb(db, engine, hc, chainConfig, cacheConfig, txLookupLimit, vmConfig)
if err != nil {
return nil, err
}

// Initialize the heads slice
heads := make([]*types.Header, 0)
hc.heads = heads

return hc, nil
}

Expand Down Expand Up @@ -476,7 +476,7 @@ func (hc *HeaderChain) Stop() {
hc.scope.Close()
hc.bc.scope.Close()
hc.wg.Wait()

hc.bc.processor.Stop()
log.Info("headerchain stopped")
}

Expand Down
4 changes: 2 additions & 2 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Slice struct {
validator Validator // Block and state validator interface
}

func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, isLocalBlock func(block *types.Header) bool, chainConfig *params.ChainConfig, domClientUrl string, subClientUrls []string, engine consensus.Engine, cacheConfig *CacheConfig, vmConfig vm.Config, genesis *Genesis) (*Slice, error) {
func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLookupLimit *uint64, isLocalBlock func(block *types.Header) bool, chainConfig *params.ChainConfig, domClientUrl string, subClientUrls []string, engine consensus.Engine, cacheConfig *CacheConfig, vmConfig vm.Config, genesis *Genesis) (*Slice, error) {
nodeCtx := common.NodeLocation.Context()
sl := &Slice{
config: chainConfig,
Expand All @@ -80,7 +80,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, isLocal
}

var err error
sl.hc, err = NewHeaderChain(db, engine, chainConfig, cacheConfig, vmConfig)
sl.hc, err = NewHeaderChain(db, engine, chainConfig, cacheConfig, txLookupLimit, vmConfig)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewPruner(db ethdb.Database, datadir, trieCachePath string, bloomSize uint6
if headBlock == nil {
return nil, errors.New("Failed to load head block")
}
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, false)
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false)
if err != nil {
return nil, err // The relevant snapshot(s) might not exist
}
Expand Down Expand Up @@ -363,7 +363,7 @@ func RecoverPruning(datadir string, db ethdb.Database, trieCachePath string) err
// - The state HEAD is rewound already because of multiple incomplete `prune-state`
// In this case, even the state HEAD is not exactly matched with snapshot, it
// still feasible to recover the pruning correctly.
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, false, true)
snaptree, err := snapshot.New(db, trie.NewDatabase(db), 256, headBlock.Root(), false, true)
if err != nil {
return err // The relevant snapshot(s) might not exist
}
Expand Down
6 changes: 2 additions & 4 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,15 @@ type Tree struct {
// This case happens when the snapshot is 'ahead' of the state trie.
// - otherwise, the entire snapshot is considered invalid and will be recreated on
// a background thread.
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, async bool, rebuild bool, recovery bool) (*Tree, error) {
func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash, rebuild bool, recovery bool) (*Tree, error) {
// Create a new, empty snapshot tree
snap := &Tree{
diskdb: diskdb,
triedb: triedb,
cache: cache,
layers: make(map[common.Hash]snapshot),
}
if !async {
defer snap.waitBuild()
}

// Attempt to load a previously persisted snapshot and rebuild one if failed
head, disabled, err := loadSnapshot(diskdb, triedb, cache, root, recovery)
if disabled {
Expand Down
88 changes: 83 additions & 5 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/dominant-strategies/go-quai/consensus"
"github.com/dominant-strategies/go-quai/core/rawdb"
"github.com/dominant-strategies/go-quai/core/state"
"github.com/dominant-strategies/go-quai/core/state/snapshot"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/core/vm"
"github.com/dominant-strategies/go-quai/crypto"
Expand Down Expand Up @@ -129,15 +130,18 @@ type StateProcessor struct {
prefetcher Prefetcher
vmConfig vm.Config

scope event.SubscriptionScope
wg sync.WaitGroup // chain processing wait group for shutting down
scope event.SubscriptionScope
wg sync.WaitGroup // chain processing wait group for shutting down
quit chan struct{} // state processor quit channel
txLookupLimit uint64

snaps *snapshot.Tree
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
}

// NewStateProcessor initialises a new StateProcessor.
func NewStateProcessor(config *params.ChainConfig, hc *HeaderChain, engine consensus.Engine, vmConfig vm.Config, cacheConfig *CacheConfig) *StateProcessor {
func NewStateProcessor(config *params.ChainConfig, hc *HeaderChain, engine consensus.Engine, vmConfig vm.Config, cacheConfig *CacheConfig, txLookupLimit *uint64) *StateProcessor {
receiptsCache, _ := lru.New(receiptsCacheLimit)
txLookupCache, _ := lru.New(txLookupCacheLimit)

Expand All @@ -159,8 +163,32 @@ func NewStateProcessor(config *params.ChainConfig, hc *HeaderChain, engine conse
}),
engine: engine,
triegc: prque.New(nil),
quit: make(chan struct{}),
}
sp.validator = NewBlockValidator(config, hc, engine)

// Load any existing snapshot, regenerating it if loading failed
if sp.cacheConfig.SnapshotLimit > 0 {
// TODO: If the state is not available, enable snapshot recovery
head := hc.CurrentHeader()
sp.snaps, _ = snapshot.New(hc.headerDb, sp.stateCache.TrieDB(), sp.cacheConfig.SnapshotLimit, head.Root(), true, false)
}
if txLookupLimit != nil {
sp.txLookupLimit = *txLookupLimit
}
// If periodic cache journal is required, spin it up.
if sp.cacheConfig.TrieCleanRejournal > 0 {
if sp.cacheConfig.TrieCleanRejournal < time.Minute {
log.Warn("Sanitizing invalid trie cache journal time", "provided", sp.cacheConfig.TrieCleanRejournal, "updated", time.Minute)
sp.cacheConfig.TrieCleanRejournal = time.Minute
}
triedb := sp.stateCache.TrieDB()
sp.wg.Add(1)
go func() {
defer sp.wg.Done()
triedb.SaveCachePeriodically(sp.cacheConfig.TrieCleanJournal, sp.cacheConfig.TrieCleanRejournal, sp.quit)
}()
}
return sp
}

Expand Down Expand Up @@ -188,7 +216,7 @@ func (p *StateProcessor) Process(block *types.Block, etxSet types.EtxSet) (types
}

// Initialize a statedb
statedb, err := state.New(parent.Header().Root(), p.stateCache, nil)
statedb, err := state.New(parent.Header().Root(), p.stateCache, p.snaps)
if err != nil {
return types.Receipts{}, []*types.Log{}, nil, 0, err
}
Expand Down Expand Up @@ -415,7 +443,7 @@ func (p *StateProcessor) State() (*state.StateDB, error) {

// StateAt returns a new mutable state based on a particular point in time.
func (p *StateProcessor) StateAt(root common.Hash) (*state.StateDB, error) {
return state.New(root, p.stateCache, nil)
return state.New(root, p.stateCache, p.snaps)
}

// StateCache returns the caching database underpinning the blockchain instance.
Expand Down Expand Up @@ -657,6 +685,56 @@ func (p *StateProcessor) StateAtTransaction(block *types.Block, txIndex int, ree
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
}

func (p *StateProcessor) Stop() {
// Ensure that the entirety of the state snapshot is journalled to disk.
var snapBase common.Hash
if p.snaps != nil {
var err error
if snapBase, err = p.snaps.Journal(p.hc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
}
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !p.cacheConfig.TrieDirtyDisabled {
triedb := p.stateCache.TrieDB()

for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
if number := p.hc.CurrentBlock().NumberU64(); number > offset {
recent := p.hc.GetBlockByNumber(number - offset)

log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
}
if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
for !p.triegc.Empty() {
triedb.Dereference(p.triegc.PopItem().(common.Hash))
}
if size, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
}
}
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
if p.cacheConfig.TrieCleanJournal != "" {
triedb := p.stateCache.TrieDB()
triedb.SaveCache(p.cacheConfig.TrieCleanJournal)
}
close(p.quit)
log.Info("State Processor stopped")
}

func prepareApplyETX(statedb *state.StateDB, tx *types.Transaction) *big.Int {
prevZeroBal := statedb.GetBalance(common.ZeroInternal) // Get current zero address balance
fee := big.NewInt(0).Add(tx.GasFeeCap(), tx.GasTipCap()) // Add gas price cap to miner tip cap
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Quai, error) {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}

eth.core, err = core.NewCore(chainDb, &config.Miner, eth.isLocalBlock, &config.TxPool, chainConfig, eth.config.DomUrl, eth.config.SubUrls, eth.engine, cacheConfig, vmConfig, config.Genesis)
eth.core, err = core.NewCore(chainDb, &config.Miner, eth.isLocalBlock, &config.TxPool, &config.TxLookupLimit, chainConfig, eth.config.DomUrl, eth.config.SubUrls, eth.engine, cacheConfig, vmConfig, config.Genesis)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit eb557c7

Please sign in to comment.