Skip to content

Commit

Permalink
Merge pull request influxdata#8629 from influxdata/jw-compaction-abort
Browse files Browse the repository at this point in the history
Interrupt in progress TSM compactions
  • Loading branch information
jwilder authored Jul 27, 2017
2 parents ac2a969 + 18a02d5 commit 6e6cc99
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
- [#8097](https://github.com/influxdata/influxdb/pull/8097): Return query parsing errors in CSV formats.
- [#8607](https://github.com/influxdata/influxdb/issues/8607): Fix time zone shifts when the shift happens on a time zone boundary.

## v1.3.2 [unreleased]

### Bugfixes

- [#8629](https://github.com/influxdata/influxdb/pull/8629): Interrupt in progress TSM compactions

## v1.3.1 [unreleased]

### Bugfixes
Expand Down
79 changes: 65 additions & 14 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,11 @@ type Compactor struct {
snapshotsEnabled bool
compactionsEnabled bool

// The channel to signal that any in progress snapshots should be aborted.
snapshotsInterrupt chan struct{}
// The channel to signal that any in progress level compactions should be aborted.
compactionsInterrupt chan struct{}

files map[string]struct{}
}

Expand All @@ -604,6 +609,9 @@ func (c *Compactor) Open() {

c.snapshotsEnabled = true
c.compactionsEnabled = true
c.snapshotsInterrupt = make(chan struct{})
c.compactionsInterrupt = make(chan struct{})

c.files = make(map[string]struct{})
}

Expand All @@ -616,47 +624,68 @@ func (c *Compactor) Close() {
}
c.snapshotsEnabled = false
c.compactionsEnabled = false
if c.compactionsInterrupt != nil {
close(c.compactionsInterrupt)
}
if c.snapshotsInterrupt != nil {
close(c.snapshotsInterrupt)
}
}

// DisableSnapshots disables the compactor from performing snapshots.
func (c *Compactor) DisableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = false
if c.snapshotsInterrupt != nil {
close(c.snapshotsInterrupt)
c.snapshotsInterrupt = nil
}
c.mu.Unlock()
}

// EnableSnapshots allows the compactor to perform snapshots.
func (c *Compactor) EnableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = true
if c.snapshotsInterrupt == nil {
c.snapshotsInterrupt = make(chan struct{})
}
c.mu.Unlock()
}

// DisableSnapshots disables the compactor from performing compactions.
func (c *Compactor) DisableCompactions() {
c.mu.Lock()
c.compactionsEnabled = false
if c.compactionsInterrupt != nil {
close(c.compactionsInterrupt)
c.compactionsInterrupt = nil
}
c.mu.Unlock()
}

// EnableCompactions allows the compactor to perform compactions.
func (c *Compactor) EnableCompactions() {
c.mu.Lock()
c.compactionsEnabled = true
if c.compactionsInterrupt == nil {
c.compactionsInterrupt = make(chan struct{})
}
c.mu.Unlock()
}

// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
c.mu.RUnlock()

if !enabled {
return nil, errSnapshotsDisabled
}

iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock)
iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)

// See if we were disabled while writing a snapshot
Expand Down Expand Up @@ -717,7 +746,11 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, nil
}

tsm, err := NewTSMKeyIterator(size, fast, trs...)
c.mu.RLock()
intC := c.compactionsInterrupt
c.mu.RUnlock()

tsm, err := NewTSMKeyIterator(size, fast, intC, trs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -994,7 +1027,8 @@ type tsmKeyIterator struct {

// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
merged blocks
interrupt chan struct{}
}

type block struct {
Expand Down Expand Up @@ -1046,7 +1080,7 @@ func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// NewTSMKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) {
func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
iter = append(iter, r.BlockIterator())
Expand All @@ -1060,6 +1094,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
iterators: iter,
fast: fast,
buf: make([]blocks, len(iter)),
interrupt: interrupt,
}, nil
}

Expand Down Expand Up @@ -1203,6 +1238,13 @@ func (k *tsmKeyIterator) merge() {
}

func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) {
// See if compactions were disabled while we were running.
select {
case <-k.interrupt:
return "", 0, 0, nil, errCompactionAborted
default:
}

if len(k.merged) == 0 {
return "", 0, 0, nil, k.err
}
Expand All @@ -1228,9 +1270,10 @@ type cacheKeyIterator struct {
size int
order []string

i int
blocks [][]cacheBlock
ready []chan struct{}
i int
blocks [][]cacheBlock
ready []chan struct{}
interrupt chan struct{}
}

type cacheBlock struct {
Expand All @@ -1241,7 +1284,7 @@ type cacheBlock struct {
}

// NewCacheKeyIterator returns a new KeyIterator from a Cache.
func NewCacheKeyIterator(cache *Cache, size int) KeyIterator {
func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator {
keys := cache.Keys()

chans := make([]chan struct{}, len(keys))
Expand All @@ -1250,12 +1293,13 @@ func NewCacheKeyIterator(cache *Cache, size int) KeyIterator {
}

cki := &cacheKeyIterator{
i: -1,
size: size,
cache: cache,
order: keys,
ready: chans,
blocks: make([][]cacheBlock, len(keys)),
i: -1,
size: size,
cache: cache,
order: keys,
ready: chans,
blocks: make([][]cacheBlock, len(keys)),
interrupt: interrupt,
}
go cki.encode()
return cki
Expand Down Expand Up @@ -1334,6 +1378,13 @@ func (c *cacheKeyIterator) Next() bool {
}

func (c *cacheKeyIterator) Read() (string, int64, int64, []byte, error) {
// See if snapshot compactions were disabled while we were running.
select {
case <-c.interrupt:
return "", 0, 0, nil, errCompactionAborted
default:
}

blk := c.blocks[c.i][0]
return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err
}
Expand Down
82 changes: 77 additions & 5 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func TestTSMKeyIterator_Single(t *testing.T) {

r := MustTSMReader(dir, 1, writes)

iter, err := tsm1.NewTSMKeyIterator(1, false, r)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -890,7 +890,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) {

r2 := MustTSMReader(dir, 2, writes2)

iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -951,7 +951,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
r2 := MustTSMReader(dir, 2, points2)
r2.Delete([]string{"cpu,host=A#!~#count"})

iter, err := tsm1.NewTSMKeyIterator(1, false, r1, r2)
iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}
Expand Down Expand Up @@ -993,6 +993,41 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) {
}
}

// Tests that the TSMKeyIterator will abort if the interrupt channel is closed
func TestTSMKeyIterator_Abort(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)

v1 := tsm1.NewValue(1, 1.1)
writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v1},
}

r := MustTSMReader(dir, 1, writes)

intC := make(chan struct{})
iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r)
if err != nil {
t.Fatalf("unexpected error creating WALKeyIterator: %v", err)
}

var aborted bool
for iter.Next() {
// Abort
close(intC)

_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}

if !aborted {
t.Fatalf("iteration not aborted")
}
}

func TestCacheKeyIterator_Single(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)

Expand All @@ -1008,7 +1043,7 @@ func TestCacheKeyIterator_Single(t *testing.T) {
}
}

iter := tsm1.NewCacheKeyIterator(c, 1)
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
for iter.Next() {
key, _, _, block, err := iter.Read()
Expand Down Expand Up @@ -1056,7 +1091,7 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
}

iter := tsm1.NewCacheKeyIterator(c, 1)
iter := tsm1.NewCacheKeyIterator(c, 1, nil)
var readValues bool
var chunk int
for iter.Next() {
Expand Down Expand Up @@ -1090,6 +1125,43 @@ func TestCacheKeyIterator_Chunked(t *testing.T) {
}
}

// Tests that the CacheKeyIterator will abort if the interrupt channel is closed
func TestCacheKeyIterator_Abort(t *testing.T) {
v0 := tsm1.NewValue(1, 1.0)

writes := map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{v0},
}

c := tsm1.NewCache(0, "")

for k, v := range writes {
if err := c.Write(k, v); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}
}

intC := make(chan struct{})

iter := tsm1.NewCacheKeyIterator(c, 1, intC)

var aborted bool
for iter.Next() {
//Abort
close(intC)

_, _, _, _, err := iter.Read()
if err == nil {
t.Fatalf("unexpected error read: %v", err)
}
aborted = err != nil
}

if !aborted {
t.Fatalf("iteration not aborted")
}
}

func TestDefaultPlanner_Plan_Min(t *testing.T) {
cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
Expand Down
2 changes: 2 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func (e *Engine) enableSnapshotCompactions() {
return
}

e.Compactor.EnableSnapshots()
quit := make(chan struct{})
e.snapDone = quit
e.snapWG.Add(1)
Expand All @@ -304,6 +305,7 @@ func (e *Engine) disableSnapshotCompactions() {
if e.snapDone != nil {
close(e.snapDone)
e.snapDone = nil
e.Compactor.DisableSnapshots()
}

e.mu.Unlock()
Expand Down

0 comments on commit 6e6cc99

Please sign in to comment.