Skip to content

Commit

Permalink
Merge pull request ethereum#20152 from karalabe/snapshot-5
Browse files Browse the repository at this point in the history
Dynamic state snapshots
  • Loading branch information
karalabe authored Mar 23, 2020
2 parents 93ffb85 + 074efe6 commit 613af7c
Show file tree
Hide file tree
Showing 59 changed files with 5,690 additions and 154 deletions.
6 changes: 3 additions & 3 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (b *SimulatedBackend) rollback() {
statedb, _ := b.blockchain.State()

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database())
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database(), nil)
}

// stateByBlockNumber retrieves a state by a given blocknumber.
Expand Down Expand Up @@ -480,7 +480,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
statedb, _ := b.blockchain.State()

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database())
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database(), nil)
return nil
}

Expand Down Expand Up @@ -593,7 +593,7 @@ func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
statedb, _ := b.blockchain.State()

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database())
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database(), nil)

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/evm/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ func runCmd(ctx *cli.Context) error {
genesisConfig = gen
db := rawdb.NewMemoryDatabase()
genesis := gen.ToBlock(db)
statedb, _ = state.New(genesis.Root(), state.NewDatabase(db))
statedb, _ = state.New(genesis.Root(), state.NewDatabase(db), nil)
chainConfig = gen.Config
} else {
statedb, _ = state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()))
statedb, _ = state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
genesisConfig = new(core.Genesis)
}
if ctx.GlobalString(SenderFlag.Name) != "" {
Expand Down
2 changes: 1 addition & 1 deletion cmd/evm/staterunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func stateTestCmd(ctx *cli.Context) error {
for _, st := range test.Subtests() {
// Run the test and aggregate the result
result := &StatetestResult{Name: key, Fork: st.Fork, Pass: true}
state, err := test.Run(st, cfg)
state, err := test.Run(st, cfg, false)
// print state root for evmlab tracing
if ctx.GlobalBool(MachineFlag.Name) && state != nil {
fmt.Fprintf(os.Stderr, "{\"stateRoot\": \"%x\"}\n", state.IntermediateRoot(false))
Expand Down
3 changes: 2 additions & 1 deletion cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The dumpgenesis command dumps the genesis block configuration in JSON format to
utils.CacheFlag,
utils.SyncModeFlag,
utils.GCModeFlag,
utils.SnapshotFlag,
utils.CacheDatabaseFlag,
utils.CacheGCFlag,
},
Expand Down Expand Up @@ -544,7 +545,7 @@ func dump(ctx *cli.Context) error {
fmt.Println("{}")
utils.Fatalf("block not found")
} else {
state, err := state.New(block.Root(), state.NewDatabase(chainDb))
state, err := state.New(block.Root(), state.NewDatabase(chainDb), nil)
if err != nil {
utils.Fatalf("could not create new state: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var (
utils.SyncModeFlag,
utils.ExitWhenSyncedFlag,
utils.GCModeFlag,
utils.SnapshotFlag,
utils.LightServeFlag,
utils.LightLegacyServFlag,
utils.LightIngressFlag,
Expand All @@ -106,6 +107,7 @@ var (
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheGCFlag,
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheGCFlag,
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
},
},
Expand Down
23 changes: 21 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ var (
Usage: `Blockchain garbage collection mode ("full", "archive")`,
Value: "full",
}
SnapshotFlag = cli.BoolFlag{
Name: "snapshot",
Usage: `Enables snapshot-database mode -- experimental work in progress feature`,
}
LightKDFFlag = cli.BoolFlag{
Name: "lightkdf",
Usage: "Reduce key-derivation RAM & CPU usage at some expense of KDF strength",
Expand Down Expand Up @@ -383,14 +387,19 @@ var (
}
CacheTrieFlag = cli.IntFlag{
Name: "cache.trie",
Usage: "Percentage of cache memory allowance to use for trie caching (default = 25% full mode, 50% archive mode)",
Value: 25,
Usage: "Percentage of cache memory allowance to use for trie caching (default = 15% full mode, 30% archive mode)",
Value: 15,
}
CacheGCFlag = cli.IntFlag{
Name: "cache.gc",
Usage: "Percentage of cache memory allowance to use for trie pruning (default = 25% full mode, 0% archive mode)",
Value: 25,
}
CacheSnapshotFlag = cli.IntFlag{
Name: "cache.snapshot",
Usage: "Percentage of cache memory allowance to use for snapshot caching (default = 10% full mode, 20% archive mode)",
Value: 10,
}
CacheNoPrefetchFlag = cli.BoolFlag{
Name: "cache.noprefetch",
Usage: "Disable heuristic state prefetch during block import (less CPU and disk IO, more time waiting for data)",
Expand Down Expand Up @@ -1463,6 +1472,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) {
cfg.TrieDirtyCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100
}
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheSnapshotFlag.Name) {
cfg.SnapshotCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheSnapshotFlag.Name) / 100
}
if !ctx.GlobalIsSet(SnapshotFlag.Name) {
cfg.SnapshotCache = 0 // Disabled
}
if ctx.GlobalIsSet(DocRootFlag.Name) {
cfg.DocRoot = ctx.GlobalString(DocRootFlag.Name)
}
Expand Down Expand Up @@ -1724,6 +1739,10 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai
TrieDirtyLimit: eth.DefaultConfig.TrieDirtyCache,
TrieDirtyDisabled: ctx.GlobalString(GCModeFlag.Name) == "archive",
TrieTimeLimit: eth.DefaultConfig.TrieTimeout,
SnapshotLimit: eth.DefaultConfig.SnapshotCache,
}
if !ctx.GlobalIsSet(SnapshotFlag.Name) {
cache.SnapshotLimit = 0 // Disabled
}
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
cache.TrieCleanLimit = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100
Expand Down
68 changes: 52 additions & 16 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
Expand Down Expand Up @@ -61,6 +62,10 @@ var (
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)

snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil)
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)

blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
Expand Down Expand Up @@ -115,6 +120,9 @@ type CacheConfig struct {
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

// BlockChain represents the canonical chain given a database with a genesis
Expand All @@ -136,6 +144,7 @@ type BlockChain struct {
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping

Expand Down Expand Up @@ -188,6 +197,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
}
}
bodyCache, _ := lru.New(bodyCacheLimit)
Expand Down Expand Up @@ -293,6 +304,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
}
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait)
}
// Take ownership of this particular state
go bc.update()
return bc, nil
Expand Down Expand Up @@ -339,7 +354,7 @@ func (bc *BlockChain) loadLastState() error {
return bc.Reset()
}
// Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Dangling block without a state associated, init from scratch
log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
if err := bc.repair(&currentBlock); err != nil {
Expand Down Expand Up @@ -401,7 +416,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
if newHeadBlock == nil {
newHeadBlock = bc.genesisBlock
} else {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil {
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
// Rewound state missing, rolled back to before pivot, reset to genesis
newHeadBlock = bc.genesisBlock
}
Expand Down Expand Up @@ -486,6 +501,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()

// Destroy any existing state snapshot and regenerate it in the background
if bc.snaps != nil {
bc.snaps.Rebuild(block.Root())
}
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
}
Expand Down Expand Up @@ -524,7 +543,7 @@ func (bc *BlockChain) State() (*state.StateDB, error) {

// StateAt returns a new mutable state based on a particular point in time.
func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
return state.New(root, bc.stateCache)
return state.New(root, bc.stateCache, bc.snaps)
}

// StateCache returns the caching database underpinning the blockchain instance.
Expand Down Expand Up @@ -576,7 +595,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
func (bc *BlockChain) repair(head **types.Block) error {
for {
// Abort if we've rewound to a head block that does have associated state
if _, err := state.New((*head).Root(), bc.stateCache); err == nil {
if _, err := state.New((*head).Root(), bc.stateCache, bc.snaps); err == nil {
log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
return nil
}
Expand Down Expand Up @@ -839,6 +858,14 @@ func (bc *BlockChain) Stop() {

bc.wg.Wait()

// Ensure that the entirety of the state snapshot is journalled to disk.
var snapBase common.Hash
if bc.snaps != nil {
var err error
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
}
// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
Expand All @@ -857,6 +884,12 @@ func (bc *BlockChain) Stop() {
}
}
}
if snapBase != (common.Hash{}) {
log.Info("Writing snapshot state to disk", "root", snapBase)
if err := triedb.Commit(snapBase, true); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
}
Expand Down Expand Up @@ -1647,7 +1680,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache)
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}
Expand All @@ -1656,9 +1689,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
var followupInterrupt uint32
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache)
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, interrupt)
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(interrupt) == 1 {
Expand All @@ -1676,14 +1709,16 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}
// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them

triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.StorageReads + statedb.StorageUpdates
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates

blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)

Expand Down Expand Up @@ -1712,10 +1747,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
atomic.StoreUint32(&followupInterrupt, 1)

// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them

blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits)
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockInsertTimer.UpdateSince(start)

switch status {
Expand Down
Loading

0 comments on commit 613af7c

Please sign in to comment.