Skip to content

Commit

Permalink
cmd, core, eth, light, trie: dump clean cache periodically (ethereum#…
Browse files Browse the repository at this point in the history
…20391)

* cmd, core, eth, light, trie: dump clean cache periodically

* eth: update config

* trie: minor fix

* core, trie: address comments

* eth: remove useless

* trie: print clean cache dump start too

Co-authored-by: Péter Szilágyi <[email protected]>
  • Loading branch information
rjl493456442 and karalabe authored Jul 28, 2020
1 parent 79ce553 commit 93da0cf
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 24 deletions.
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ var (
utils.CacheFlag,
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheTrieJournalFlag,
utils.CacheTrieRejournalFlag,
utils.CacheGCFlag,
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.CacheFlag,
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheTrieJournalFlag,
utils.CacheTrieRejournalFlag,
utils.CacheGCFlag,
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
Expand Down
16 changes: 16 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,16 @@ var (
Usage: "Percentage of cache memory allowance to use for trie caching (default = 15% full mode, 30% archive mode)",
Value: 15,
}
CacheTrieJournalFlag = cli.StringFlag{
Name: "cache.trie.journal",
Usage: "Disk journal directory for trie cache to survive node restarts",
Value: eth.DefaultConfig.TrieCleanCacheJournal,
}
CacheTrieRejournalFlag = cli.DurationFlag{
Name: "cache.trie.rejournal",
Usage: "Time interval to regenerate the trie cache journal",
Value: eth.DefaultConfig.TrieCleanCacheRejournal,
}
CacheGCFlag = cli.IntFlag{
Name: "cache.gc",
Usage: "Percentage of cache memory allowance to use for trie pruning (default = 25% full mode, 0% archive mode)",
Expand Down Expand Up @@ -1537,6 +1547,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheTrieFlag.Name) {
cfg.TrieCleanCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheTrieFlag.Name) / 100
}
if ctx.GlobalIsSet(CacheTrieJournalFlag.Name) {
cfg.TrieCleanCacheJournal = ctx.GlobalString(CacheTrieJournalFlag.Name)
}
if ctx.GlobalIsSet(CacheTrieRejournalFlag.Name) {
cfg.TrieCleanCacheRejournal = ctx.GlobalDuration(CacheTrieRejournalFlag.Name)
}
if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheGCFlag.Name) {
cfg.TrieDirtyCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheGCFlag.Name) / 100
}
Expand Down
23 changes: 22 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ const (
// that's resident in a blockchain.
type CacheConfig struct {
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieCleanJournal string // Disk journal for saving clean cache entries.
TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
Expand Down Expand Up @@ -220,7 +222,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit, cacheConfig.TrieCleanJournal),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
Expand Down Expand Up @@ -328,6 +330,19 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bc.txLookupLimit = *txLookupLimit
go bc.maintainTxIndex(txIndexBlock)
}
// If periodic cache journal is required, spin it up.
if bc.cacheConfig.TrieCleanRejournal > 0 {
if bc.cacheConfig.TrieCleanRejournal < time.Minute {
log.Warn("Sanitizing invalid trie cache journal time", "provided", bc.cacheConfig.TrieCleanRejournal, "updated", time.Minute)
bc.cacheConfig.TrieCleanRejournal = time.Minute
}
triedb := bc.stateCache.TrieDB()
bc.wg.Add(1)
go func() {
defer bc.wg.Done()
triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
}()
}
return bc, nil
}

Expand Down Expand Up @@ -919,6 +934,12 @@ func (bc *BlockChain) Stop() {
log.Error("Dangling trie nodes after full cleanup")
}
}
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
if bc.cacheConfig.TrieCleanJournal != "" {
triedb := bc.stateCache.TrieDB()
triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
}
log.Info("Blockchain stopped")
}

Expand Down
2 changes: 1 addition & 1 deletion core/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func SetupGenesisBlock(db ethdb.Database, genesis *Genesis) (*params.ChainConfig
// We have the genesis block in database(perhaps in ancient database)
// but the corresponding state is missing.
header := rawdb.ReadHeader(db, stored, 0)
if _, err := state.New(header.Root, state.NewDatabaseWithCache(db, 0), nil); err != nil {
if _, err := state.New(header.Root, state.NewDatabaseWithCache(db, 0, ""), nil); err != nil {
if genesis == nil {
genesis = DefaultGenesisBlock()
}
Expand Down
6 changes: 3 additions & 3 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ type Trie interface {
// concurrent use, but does not retain any recent trie nodes in memory. To keep some
// historical state in memory, use the NewDatabaseWithCache constructor.
func NewDatabase(db ethdb.Database) Database {
return NewDatabaseWithCache(db, 0)
return NewDatabaseWithCache(db, 0, "")
}

// NewDatabaseWithCache creates a backing store for state. The returned database
// is safe for concurrent use and retains a lot of collapsed RLP trie nodes in a
// large memory cache.
func NewDatabaseWithCache(db ethdb.Database, cache int) Database {
func NewDatabaseWithCache(db ethdb.Database, cache int, journal string) Database {
csc, _ := lru.New(codeSizeCacheSize)
return &cachingDB{
db: trie.NewDatabaseWithCache(db, cache),
db: trie.NewDatabaseWithCache(db, cache, journal),
codeSizeCache: csc,
}
}
Expand Down
4 changes: 2 additions & 2 deletions eth/api_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl

// Ensure we have a valid starting state before doing any work
origin := start.NumberU64()
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16) // Chain tracing will probably start at genesis
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16, "") // Chain tracing will probably start at genesis

if number := start.NumberU64(); number > 0 {
start = api.eth.blockchain.GetBlock(start.ParentHash(), start.NumberU64()-1)
Expand Down Expand Up @@ -641,7 +641,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
}
// Otherwise try to reexec blocks until we find a state or reach our limit
origin := block.NumberU64()
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16)
database := state.NewDatabaseWithCache(api.eth.ChainDb(), 16, "")

for i := uint64(0); i < reexec; i++ {
block = api.eth.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1)
Expand Down
2 changes: 2 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: ctx.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
Expand Down
28 changes: 16 additions & 12 deletions eth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ var DefaultConfig = Config{
DatasetsOnDisk: 2,
DatasetsLockMmap: false,
},
NetworkId: 1,
LightPeers: 100,
UltraLightFraction: 75,
DatabaseCache: 512,
TrieCleanCache: 256,
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 256,
NetworkId: 1,
LightPeers: 100,
UltraLightFraction: 75,
DatabaseCache: 512,
TrieCleanCache: 256,
TrieCleanCacheJournal: "triecache",
TrieCleanCacheRejournal: 60 * time.Minute,
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 256,
Miner: miner.Config{
GasFloor: 8000000,
GasCeil: 8000000,
Expand Down Expand Up @@ -139,10 +141,12 @@ type Config struct {
DatabaseCache int
DatabaseFreezer string

TrieCleanCache int
TrieDirtyCache int
TrieTimeout time.Duration
SnapshotCache int
TrieCleanCache int
TrieCleanCacheJournal string `toml:",omitempty"` // Disk journal directory for trie cache to survive node restarts
TrieCleanCacheRejournal time.Duration `toml:",omitempty"` // Time interval to regenerate the journal for clean cache
TrieDirtyCache int
TrieTimeout time.Duration
SnapshotCache int

// Mining options
Miner miner.Config
Expand Down
12 changes: 12 additions & 0 deletions eth/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions light/postprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64, dis
diskdb: db,
odr: odr,
trieTable: trieTable,
triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down
triedb: trie.NewDatabaseWithCache(trieTable, 1, ""), // Use a tiny cache only to keep memory down
trieset: mapset.NewSet(),
sectionSize: size,
disablePruning: disablePruning,
Expand Down Expand Up @@ -340,7 +340,7 @@ func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uin
diskdb: db,
odr: odr,
trieTable: trieTable,
triedb: trie.NewDatabaseWithCache(trieTable, 1), // Use a tiny cache only to keep memory down
triedb: trie.NewDatabaseWithCache(trieTable, 1, ""), // Use a tiny cache only to keep memory down
trieset: mapset.NewSet(),
parentSize: parentSize,
size: size,
Expand Down
51 changes: 48 additions & 3 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"reflect"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -278,16 +279,20 @@ func expandNode(hash hashNode, n node) node {
// its written out to disk or garbage collected. No read cache is created, so all
// data retrievals will hit the underlying disk database.
func NewDatabase(diskdb ethdb.KeyValueStore) *Database {
return NewDatabaseWithCache(diskdb, 0)
return NewDatabaseWithCache(diskdb, 0, "")
}

// NewDatabaseWithCache creates a new trie database to store ephemeral trie content
// before its written out to disk or garbage collected. It also acts as a read cache
// for nodes loaded from disk.
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database {
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int, journal string) *Database {
var cleans *fastcache.Cache
if cache > 0 {
cleans = fastcache.New(cache * 1024 * 1024)
if journal == "" {
cleans = fastcache.New(cache * 1024 * 1024)
} else {
cleans = fastcache.LoadFromFileOrNew(journal, cache*1024*1024)
}
}
return &Database{
diskdb: diskdb,
Expand Down Expand Up @@ -867,3 +872,43 @@ func (db *Database) Size() (common.StorageSize, common.StorageSize) {
var metarootRefs = common.StorageSize(len(db.dirties[common.Hash{}].children) * (common.HashLength + 2))
return db.dirtiesSize + db.childrenSize + metadataSize - metarootRefs, db.preimagesSize
}

// saveCache saves clean state cache to given directory path
// using specified CPU cores.
func (db *Database) saveCache(dir string, threads int) error {
if db.cleans == nil {
return nil
}
log.Info("Writing clean trie cache to disk", "path", dir, "threads", threads)

start := time.Now()
err := db.cleans.SaveToFileConcurrent(dir, threads)
if err != nil {
log.Error("Failed to persist clean trie cache", "error", err)
return err
}
log.Info("Persisted the clean trie cache", "path", dir, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

// SaveCache atomically saves fast cache data to the given dir using all
// available CPU cores.
func (db *Database) SaveCache(dir string) error {
return db.saveCache(dir, runtime.GOMAXPROCS(0))
}

// SaveCachePeriodically atomically saves fast cache data to the given dir with
// the specified interval. All dump operation will only use a single CPU core.
func (db *Database) SaveCachePeriodically(dir string, interval time.Duration, stopCh <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
db.saveCache(dir, 1)
case <-stopCh:
return
}
}
}

0 comments on commit 93da0cf

Please sign in to comment.