Skip to content

Commit

Permalink
metrics:add rocksdb metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
fengzi committed Apr 21, 2018
1 parent 6ec2563 commit 0a32356
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 9 deletions.
2 changes: 1 addition & 1 deletion common/mvccdb/mvccdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func TestMVCCDB_ParallelsExeTowTx(t *testing.T) {
}

func TestPerformance(t *testing.T) {
store, _ := storage.NewRocksStorage("test.db")
store, _ := storage.NewRocksStorage("test.db", false)
defer os.RemoveAll("test.db")
db, _ := NewMVCCDB(store, true)

Expand Down
2 changes: 1 addition & 1 deletion neblet/neblet.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (n *Neblet) Setup() {
// storage
// n.storage, err = storage.NewDiskStorage(n.config.Chain.Datadir)
// n.storage, err = storage.NewMemoryStorage()
n.storage, err = storage.NewRocksStorage(n.config.Chain.Datadir)
n.storage, err = storage.NewRocksStorage(n.config.Chain.Datadir, n.config.Stats.EnableMetrics)
if err != nil {
logging.CLog().WithFields(logrus.Fields{
"dir": n.config.Chain.Datadir,
Expand Down
5 changes: 5 additions & 0 deletions storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ var (
// rocksdb metrics
metricsRocksdbFlushTime = metrics.NewGauge("neb.rocksdb.flushtime")
metricsRocksdbFlushLen = metrics.NewGauge("neb.rocksdb.flushlen")

metricsBlocksdbCacheSize = metrics.NewGauge("neb.rocksdb.cache.size") //block_cache->GetUsage()
metricsBlocksdbCachePinnedSize = metrics.NewGauge("neb.rocksdb.cachepinned.size") //block_cache->GetPinnedUsage()
metricsBlocksdbTableReaderMem = metrics.NewGauge("neb.rocksdb.tablereader.mem") //estimate-table-readers-mem
metricsBlocksdbAllMemTables = metrics.NewGauge("neb.rocksdb.alltables.mem") //cur-size-all-mem-tables
)
49 changes: 45 additions & 4 deletions storage/rocks_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ type RocksStorage struct {

ro *gorocksdb.ReadOptions
wo *gorocksdb.WriteOptions

cache *gorocksdb.Cache
}

// NewRocksStorage init a storage
func NewRocksStorage(path string) (*RocksStorage, error) {
func NewRocksStorage(path string, enableMetrics bool) (*RocksStorage, error) {

filter := gorocksdb.NewBloomFilter(10)
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetFilterPolicy(filter)
bbto.SetBlockCache(gorocksdb.NewLRUCache(512 << 20))

cache := gorocksdb.NewLRUCache(512 << 20)
bbto.SetBlockCache(cache)
opts := gorocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
opts.SetCreateIfMissing(true)
Expand All @@ -39,13 +43,19 @@ func NewRocksStorage(path string) (*RocksStorage, error) {
return nil, err
}

return &RocksStorage{
storage := &RocksStorage{
db: db,
cache: cache,
enableBatch: false,
batchOpts: make(map[string]*batchOpt),
ro: gorocksdb.NewDefaultReadOptions(),
wo: gorocksdb.NewDefaultWriteOptions(),
}, nil
}

if enableMetrics {
go RecordMetrics(storage)
}
return storage, nil
}

// Get return value to the key in Storage
Expand Down Expand Up @@ -151,3 +161,34 @@ func (storage *RocksStorage) DisableBatch() {

storage.enableBatch = false
}

// RecordMetrics record rocksdb metrics
func RecordMetrics(storage *RocksStorage) {
metricsUpdateChan := time.NewTicker(5 * time.Second).C

for {
select {
case <-metricsUpdateChan:

readersMemStr := storage.db.GetProperty("rocksdb.estimate-table-readers-mem")
allMemTablesStr := storage.db.GetProperty("rocksdb.cur-size-all-mem-tables")
cacheSize := storage.cache.GetUsage()
pinnedSize := storage.cache.GetPinnedUsage()

readersMemByte, err := byteutils.FromHex(readersMemStr)
if err != nil {
break
}

allMemTablesByte, err := byteutils.FromHex(allMemTablesStr)
if err != nil {
break
}

metricsBlocksdbAllMemTables.Update(byteutils.Int64(allMemTablesByte))
metricsBlocksdbTableReaderMem.Update(byteutils.Int64(readersMemByte))
metricsBlocksdbCacheSize.Update(int64(cacheSize))
metricsBlocksdbCachePinnedSize.Update(int64(pinnedSize))
}
}
}
6 changes: 3 additions & 3 deletions storage/rocks_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ func TestNewRocksStorage(t *testing.T) {
want *RocksStorage
wantErr bool
}{
// TODO: Add test cases.
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewRocksStorage(tt.args.path)
got, err := NewRocksStorage(tt.args.path, false)
if (err != nil) != tt.wantErr {
t.Errorf("NewRocksStorage() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -32,7 +32,7 @@ func TestNewRocksStorage(t *testing.T) {
})
}

s, err := NewRocksStorage("./rock.db/")
s, err := NewRocksStorage("./rock.db/", false)
assert.Nil(t, err)

key := []byte("key")
Expand Down

0 comments on commit 0a32356

Please sign in to comment.