Skip to content

Commit

Permalink
Merge pull request influxdata#9552 from influxdata/jw-wal-disable
Browse files Browse the repository at this point in the history
Add option to disable WAL
  • Loading branch information
jwilder authored Mar 12, 2018
2 parents 9cafd4b + 444ad74 commit 15de887
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
2 changes: 2 additions & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type EngineOptions struct {

CompactionLimiter limiter.Fixed
CompactionThroughputLimiter limiter.Rate
WALEnabled bool

Config Config
SeriesIDSets SeriesIDSets
Expand All @@ -166,6 +167,7 @@ func NewEngineOptions() EngineOptions {
EngineVersion: DefaultEngine,
IndexVersion: DefaultIndex,
Config: NewConfig(),
WALEnabled: true,
}
}

Expand Down
27 changes: 18 additions & 9 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ type Engine struct {
// a snapshot of the cache to a TSM file
CacheFlushWriteColdDuration time.Duration

// WALEnabled determines whether writes to the WAL are enabled. If this is false,
// writes will only exist in the cache and can be lost if a snapshot has not occurred.
WALEnabled bool

// Controls whether to enabled compactions when the engine is open
enableCompactionsOnOpen bool

Expand Down Expand Up @@ -222,10 +226,11 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
CacheFlushMemorySizeThreshold: uint64(opt.Config.CacheSnapshotMemorySize),
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
stats: stats,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,
WALEnabled: opt.WALEnabled,
stats: stats,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,
}

if e.traceLogging {
Expand Down Expand Up @@ -1235,7 +1240,9 @@ func (e *Engine) WritePoints(points []models.Point) error {
return err
}

_, err = e.WAL.WriteMulti(values)
if e.WALEnabled {
_, err = e.WAL.WriteMulti(values)
}
return err
}

Expand Down Expand Up @@ -1402,8 +1409,10 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
e.Cache.DeleteRange(deleteKeys, min, max)

// delete from the WAL
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
return err
if e.WALEnabled {
if _, err := e.WAL.DeleteRange(deleteKeys, min, max); err != nil {
return err
}
}

// The series are deleted on disk, but the index may still say they exist.
Expand Down Expand Up @@ -1766,7 +1775,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
}

return sz > e.CacheFlushMemorySizeThreshold ||
time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration
(e.WALEnabled && time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration)
}

func (e *Engine) compact(wg *sync.WaitGroup) {
Expand All @@ -1788,7 +1797,7 @@ func (e *Engine) compact(wg *sync.WaitGroup) {
level1Groups := e.CompactionPlan.PlanLevel(1)
level2Groups := e.CompactionPlan.PlanLevel(2)
level3Groups := e.CompactionPlan.PlanLevel(3)
level4Groups := e.CompactionPlan.Plan(e.WAL.LastWriteTime())
level4Groups := e.CompactionPlan.Plan(e.FileStore.LastModified())
atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, int64(len(level4Groups)))

// If no full compactions are need, see if an optimize is needed
Expand Down

0 comments on commit 15de887

Please sign in to comment.