Skip to content

Commit

Permalink
Add stats and diagnostics to the TSM engine
Browse files Browse the repository at this point in the history
Track the number of TSM files in the file store and keep engine
statistics related to the number of TSM compactions.
  • Loading branch information
jsternberg committed Jul 8, 2016
1 parent f4c663a commit 12a33fe
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#6938](https://github.com/influxdata/influxdb/issues/6938): Added favicon
- [#6507](https://github.com/influxdata/influxdb/issues/6507): Refactor monitor service to avoid expvar and write monitor statistics on a truncated time interval.
- [#6805](https://github.com/influxdata/influxdb/issues/6805): Allow any variant of the help option to trigger the help.
- [#5499](https://github.com/influxdata/influxdb/issues/5499): Add stats and diagnostics to the TSM engine.

### Bugfixes

Expand Down
74 changes: 73 additions & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/influxdb/influxql"
Expand All @@ -36,6 +37,21 @@ const (
keyFieldSeparator = "#!~#"
)

// Statistics gathered by the engine.
const (
statCacheCompactions = "cacheCompactions"
statCacheCompactionError = "cacheCompactionErr"
statCacheCompactionDuration = "cacheCompactionDuration"
statTSMLevel1Compactions = "tsmLevel1Compactions"
statTSMLevel1CompactionDuration = "tsmLevel1CompactionDuration"
statTSMLevel2Compactions = "tsmLevel2Compactions"
statTSMLevel2CompactionDuration = "tsmLevel2CompactionDuration"
statTSMLevel3Compactions = "tsmLevel3Compactions"
statTSMLevel3CompactionDuration = "tsmLevel3CompactionDuration"
statTSMFullCompactions = "tsmFullCompactions"
statTSMFullCompactionDuration = "tsmFullCompactionDuration"
)

// Engine represents a storage engine with compressed blocks.
type Engine struct {
mu sync.RWMutex
Expand Down Expand Up @@ -70,6 +86,8 @@ type Engine struct {

// Controls whether to enabled compactions when the engine is open
enableCompactionsOnOpen bool

stats *EngineStatistics
}

// NewEngine returns a new instance of Engine.
Expand Down Expand Up @@ -105,6 +123,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize,
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
stats: &EngineStatistics{},
}
e.SetLogOutput(os.Stderr)

Expand Down Expand Up @@ -198,9 +217,38 @@ func (e *Engine) Format() tsdb.EngineFormat {
return tsdb.TSM1Format
}

// EngineStatistics maintains statistics for the engine.
type EngineStatistics struct {
CacheCompactions int64
CacheCompactionErrors int64
CacheCompactionDuration int64
TSMCompactions [3]int64
TSMCompactionErrors [3]int64
TSMCompactionDuration [3]int64
TSMFullCompactions int64
TSMFullCompactionErrors int64
TSMFullCompactionDuration int64
}

// Statistics returns statistics for periodic monitoring.
func (e *Engine) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, 0, 3)
statistics := make([]models.Statistic, 0, 4)
statistics = append(statistics, models.Statistic{
Name: "tsm1_engine",
Tags: tags,
Values: map[string]interface{}{
statCacheCompactions: atomic.LoadInt64(&e.stats.CacheCompactions),
statCacheCompactionDuration: atomic.LoadInt64(&e.stats.CacheCompactionDuration),
statTSMLevel1Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[0]),
statTSMLevel1CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[0]),
statTSMLevel2Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[1]),
statTSMLevel2CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[1]),
statTSMLevel3Compactions: atomic.LoadInt64(&e.stats.TSMCompactions[2]),
statTSMLevel3CompactionDuration: atomic.LoadInt64(&e.stats.TSMCompactionDuration[2]),
statTSMFullCompactions: atomic.LoadInt64(&e.stats.TSMFullCompactions),
statTSMFullCompactionDuration: atomic.LoadInt64(&e.stats.TSMFullCompactionDuration),
},
})
statistics = append(statistics, e.Cache.Statistics(tags)...)
statistics = append(statistics, e.FileStore.Statistics(tags)...)
statistics = append(statistics, e.WAL.Statistics(tags)...)
Expand Down Expand Up @@ -716,10 +764,15 @@ func (e *Engine) compactCache() {
default:
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
start := time.Now()
err := e.WriteSnapshot()
if err != nil {
e.logger.Printf("error writing snapshot: %v", err)
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
atomic.AddInt64(&e.stats.CacheCompactions, 1)
}
atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())
}
}
time.Sleep(time.Second)
Expand Down Expand Up @@ -755,6 +808,9 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
continue
}

// Keep track of the start time for statistics.
start := time.Now()

var wg sync.WaitGroup
for i, group := range tsmFiles {
wg.Add(1)
Expand All @@ -773,32 +829,39 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {
files, err = e.Compactor.CompactFast(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
} else {
files, err = e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
}

if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}

for i, f := range files {
e.logger.Printf("compacted level %d group (%d) into %s (#%d)", level, groupNum, f, i)
}
atomic.AddInt64(&e.stats.TSMCompactions[level-1], 1)
e.logger.Printf("compacted level %d group %d of %d files into %d files in %s",
level, groupNum, len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()

// Track the amount of time spent compacting the groups.
atomic.AddInt64(&e.stats.TSMCompactionDuration[level-1], time.Since(start).Nanoseconds())
}
}
}
Expand All @@ -819,6 +882,9 @@ func (e *Engine) compactTSMFull() {
continue
}

// Keep track of the start time for statistics.
start := time.Now()

var wg sync.WaitGroup
for i, group := range tsmFiles {
wg.Add(1)
Expand All @@ -833,24 +899,30 @@ func (e *Engine) compactTSMFull() {
files, err := e.Compactor.CompactFull(group)
if err != nil {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)
time.Sleep(time.Second)
return
}

if err := e.FileStore.Replace(group, files); err != nil {
e.logger.Printf("error replacing new TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)
time.Sleep(time.Second)
return
}

for i, f := range files {
e.logger.Printf("compacted full group (%d) into %s (#%d)", groupNum, f, i)
}
atomic.AddInt64(&e.stats.TSMFullCompactions, 1)
e.logger.Printf("compacted full %d files into %d files in %s",
len(group), len(files), time.Since(start))
}(i, group)
}
wg.Wait()

// Track the amount of time spent compacting the groups.
atomic.AddInt64(&e.stats.TSMFullCompactionDuration, time.Since(start).Nanoseconds())
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type TSMFile interface {
// Statistics gathered by the FileStore.
const (
statFileStoreBytes = "diskBytes"
statFileStoreCount = "numFiles"
)

type FileStore struct {
Expand Down Expand Up @@ -156,6 +157,7 @@ func (f *FileStore) SetLogOutput(w io.Writer) {
// FileStoreStatistics keeps statistics about the file store.
type FileStoreStatistics struct {
DiskBytes int64
FileCount int64
}

// Statistics returns statistics for periodic monitoring.
Expand All @@ -165,6 +167,7 @@ func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
Tags: tags,
Values: map[string]interface{}{
statFileStoreBytes: atomic.LoadInt64(&f.stats.DiskBytes),
statFileStoreCount: atomic.LoadInt64(&f.stats.FileCount),
},
}}
}
Expand Down Expand Up @@ -206,6 +209,7 @@ func (f *FileStore) Add(files ...TSMFile) {
}
f.files = append(f.files, files...)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
}

// Remove removes the files with matching paths from the set of active files. It does
Expand All @@ -232,6 +236,7 @@ func (f *FileStore) Remove(paths ...string) {
}
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
}

// WalkKeys calls fn for every key in every TSM file known to the FileStore. If the key
Expand Down Expand Up @@ -384,6 +389,7 @@ func (f *FileStore) Open() error {
close(readerC)

sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
return nil
}

Expand All @@ -396,6 +402,7 @@ func (f *FileStore) Close() error {
}

f.files = nil
atomic.StoreInt64(&f.stats.FileCount, 0)
return nil
}

Expand Down Expand Up @@ -506,6 +513,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {

f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))

// Recalculate the disk size stat
var totalSize int64
Expand Down

0 comments on commit 12a33fe

Please sign in to comment.