Skip to content

Commit

Permalink
Merge pull request influxdata#6794 from influxdata/jw-cache
Browse files Browse the repository at this point in the history
Cache safety fixes
  • Loading branch information
jwilder committed Jun 6, 2016
2 parents b8e22d9 + 838a29c commit 9c0c2f9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
16 changes: 12 additions & 4 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,11 @@ func (c *Cache) Snapshot() (*Cache, error) {
// Deduplicate sorts the snapshot before returning it. The compactor and any queries
// coming in while it writes will need the values sorted
func (c *Cache) Deduplicate() {
c.mu.RLock()
for _, e := range c.store {
e.deduplicate()
}
c.mu.RUnlock()
}

// ClearSnapshot will remove the snapshot cache from the list of flushing caches and
Expand Down Expand Up @@ -334,21 +336,27 @@ func (c *Cache) DeleteRange(keys []string, min, max int64) {
defer c.mu.Unlock()

for _, k := range keys {
origSize := c.store[k].size()
// Make sure key exist in the cache, skip if it does not
e, ok := c.store[k]
if !ok {
continue
}

origSize := e.size()
if min == math.MinInt64 && max == math.MaxInt64 {
c.size -= uint64(origSize)
delete(c.store, k)
continue
}

c.store[k].filter(min, max)
if c.store[k].count() == 0 {
e.filter(min, max)
if e.count() == 0 {
delete(c.store, k)
c.size -= uint64(origSize)
continue
}

c.size -= uint64(origSize - c.store[k].size())
c.size -= uint64(origSize - e.size())
}
}

Expand Down
44 changes: 44 additions & 0 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"reflect"
"sync"
"testing"

"github.com/golang/snappy"
Expand Down Expand Up @@ -146,6 +148,7 @@ func TestCache_DeleteRange_NoValues(t *testing.T) {
t.Fatalf("cache values mismatch: got %v, exp %v", got, exp)
}
}

func TestCache_Cache_Delete(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
Expand Down Expand Up @@ -185,6 +188,16 @@ func TestCache_Cache_Delete(t *testing.T) {
}
}

func TestCache_Cache_Delete_NonExistent(t *testing.T) {
c := NewCache(1024, "")

c.Delete([]string{"bar"})

if got, exp := c.Size(), uint64(0); exp != got {
t.Fatalf("cache size incorrect exp %d, got %d", exp, got)
}
}

// This tests writing two batches to the same series. The first batch
// is sorted. The second batch is also sorted but contains duplicates.
func TestCache_CacheWriteMulti_Duplicates(t *testing.T) {
Expand Down Expand Up @@ -389,6 +402,37 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
}
}

func TestCache_Deduplicate_Concurrent(t *testing.T) {
values := make(map[string][]Value)

for i := 0; i < 1000; i++ {
for j := 0; j < 100; j++ {
values[fmt.Sprintf("cpu%d", i)] = []Value{NewValue(int64(i+j)+int64(rand.Intn(10)), float64(i))}
}
}

wg := sync.WaitGroup{}
c := NewCache(1000000, "")

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
c.WriteMulti(values)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
c.Deduplicate()
}
}()

wg.Wait()
}

// Ensure the CacheLoader can correctly load from a single segment, even if it's corrupted.
func TestCacheLoader_LoadSingle(t *testing.T) {
// Create a WAL segment.
Expand Down

0 comments on commit 9c0c2f9

Please sign in to comment.