Skip to content

Commit

Permalink
Look for fully compacted block in addition to max size during compaction
Browse files Browse the repository at this point in the history
Some data shapes would cause files to grow larger than the max size more
quickly which resulted in them getting skipped by the full compaction planner
at times.  Some datasets that could make this happen are very large keys or
very large numbers of keys (10M).  When this happened, multiple max sized
files would accumulate but the blocks would not be full.  When the shard went
cold for writes, these files would get recompacted down to the optimal size, but
a lot of space would be wasted in the mean time.
  • Loading branch information
jwilder committed Jan 7, 2016
1 parent 0264d77 commit 756421e
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 7 deletions.
35 changes: 30 additions & 5 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type DefaultPlanner struct {
FileStore interface {
Stats() []FileStat
LastModified() time.Time
BlockCount(path string, idx int) int
}

// CompactFullWriteColdDuration specifies the length of time after
Expand Down Expand Up @@ -202,7 +203,28 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
// first check if we should be doing a full compaction because nothing has been written in a long time
if !c.lastPlanCompactedFull && c.CompactFullWriteColdDuration > 0 && time.Now().Sub(lastWrite) > c.CompactFullWriteColdDuration && len(generations) > 1 {
var tsmFiles []string
for _, group := range generations {
for i, group := range generations {
var skip bool

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if group.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() {
skip = true
}

// We need to look at the level of the next file because it may need to be combined with this generation
// but won't get picked up on it's own if this generation is skipped. This allows the most recently
// created files to get picked up by the full compaction planner and avoids having a few less optimally
// compressed files.
if i < len(generations)-1 {
if generations[i+1].level() <= 3 {
skip = false
}
}

if skip {
continue
}

for _, f := range group.files {
tsmFiles = append(tsmFiles, f.Path)
}
Expand Down Expand Up @@ -254,12 +276,13 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
continue
}

if g.size() > uint64(maxTSMFileSize) {
// Skip the file if it's over the max size and contains a full block
if g.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {
start = i + 1
}

// This is an edge case that can happen after multiple compactions run. The files at the beginning
// can become larger faster than thes ofter them. We want to skip those really big ones and just
// can become larger faster than ones after them. We want to skip those really big ones and just
// compact the smaller ones until they are closer in size.
if i > 0 {
if g.size()*2 < generations[i-1].size() {
Expand Down Expand Up @@ -287,7 +310,8 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
startIndex := i

for j := i; j < i+step && j < len(generations); j++ {
lvl := generations[j].level()
gen := generations[j]
lvl := gen.level()

// Skip compacting this group if there happens to be any lower level files in the
// middle. These will get picked up by the level compactors.
Expand All @@ -296,7 +320,8 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
break
}

if generations[j].size() >= uint64(maxTSMFileSize) {
// Skip the file if it's over the max size and it contains a full block
if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {
startIndex++
continue
}
Expand Down
75 changes: 73 additions & 2 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
PathsFn: func() []tsm1.FileStat {
return testSet
},
blockCount: 1000,
}

cp := &tsm1.DefaultPlanner{
Expand All @@ -917,11 +918,11 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
// skip planning if all files are over the limit
over := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-01.tsm1",
Path: "01-05.tsm1",
Size: 2049 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-02.tsm1",
Path: "02-05.tsm1",
Size: 2049 * 1024 * 1024,
},
}
Expand All @@ -930,6 +931,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
PathsFn: func() []tsm1.FileStat {
return over
},
blockCount: 1000,
}

cp.FileStore = overFs
Expand All @@ -950,6 +952,70 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) {
}
}

// Ensure that the planner will return files over the max file
// size, but do not contain full blocks
func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) {
testSet := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-05.tsm1",
Size: 256 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-05.tsm1",
Size: 256 * 1024 * 1024,
},
tsm1.FileStat{
Path: "03-05.tsm1",
Size: 256 * 1024 * 1024,
},
tsm1.FileStat{
Path: "04-04.tsm1",
Size: 256 * 1024 * 1024,
},
}

fs := &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return testSet
},
blockCount: 100,
}

cp := &tsm1.DefaultPlanner{
FileStore: fs,
CompactFullWriteColdDuration: time.Nanosecond,
}

// first verify that our test set would return files
if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}

// skip planning if all files are over the limit
over := []tsm1.FileStat{
tsm1.FileStat{
Path: "01-05.tsm1",
Size: 2049 * 1024 * 1024,
},
tsm1.FileStat{
Path: "02-05.tsm1",
Size: 2049 * 1024 * 1024,
},
}

overFs := &fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return over
},
blockCount: 100,
}

cp.FileStore = overFs
if exp, got := 1, len(cp.Plan(time.Now().Add(-time.Second))); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
}

// Ensure that the planner will compact files that are past the smallest step
// size even if there is a single file in the smaller step size
func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) {
Expand Down Expand Up @@ -1106,6 +1172,7 @@ func (w *fakeWAL) ClosedSegments() ([]string, error) {
type fakeFileStore struct {
PathsFn func() []tsm1.FileStat
lastModified time.Time
blockCount int
}

func (w *fakeFileStore) Stats() []tsm1.FileStat {
Expand All @@ -1119,3 +1186,7 @@ func (w *fakeFileStore) NextGeneration() int {
func (w *fakeFileStore) LastModified() time.Time {
return w.lastModified
}

func (w *fakeFileStore) BlockCount(path string, idx int) int {
return w.blockCount
}
31 changes: 31 additions & 0 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type TSMFile interface {

// Stats returns summary information about the TSM file.
Stats() FileStat

// BlockIterator returns an iterator pointing to the first block in the file and
// allows sequential iteration to each every block.
BlockIterator() *BlockIterator
}

type FileStore struct {
Expand Down Expand Up @@ -409,6 +413,33 @@ func (f *FileStore) LastModified() time.Time {
return f.lastModified
}

// BlockCount returns number of values stored in the block at location idx
// in the file at path. If path does not match any file in the store, 0 is
// returned. If idx is out of range for the number of blocks in the file,
// 0 is returned.
func (f *FileStore) BlockCount(path string, idx int) int {
f.mu.RLock()
defer f.mu.RUnlock()

if idx < 0 {
return 0
}

for _, fd := range f.files {
if fd.Path() == path {
iter := fd.BlockIterator()
for i := 0; i < idx; i++ {
if !iter.Next() {
return 0
}
}
_, _, _, block, _ := iter.Read()
return BlockCount(block)
}
}
return 0
}

// locations returns the files and index blocks for a key and time. ascending indicates
// whether the key will be scan in ascending time order or descenging time order.
func (f *FileStore) locations(key string, t time.Time, ascending bool) []*location {
Expand Down
29 changes: 29 additions & 0 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,35 @@ func TestFileStore_Delete(t *testing.T) {
}
}

func TestFileStore_Stats(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)

// Create 3 TSM files...
data := []keyValues{
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}},
keyValues{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 2.0)}},
keyValues{"mem", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}},
}

_, err := newFileDir(dir, data...)
if err != nil {
fatal(t, "creating test files", err)
}

fs := tsm1.NewFileStore(dir)
if err := fs.Open(); err != nil {
fatal(t, "opening file store", err)
}
defer fs.Close()

stats := fs.Stats()
if got, exp := len(stats), 3; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}

}

func newFileDir(dir string, values ...keyValues) ([]string, error) {
var files []string

Expand Down
40 changes: 40 additions & 0 deletions tsdb/engine/tsm1/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,46 @@ func TestIndirectIndex_UnmarshalBinary_BlockCountOverflow(t *testing.T) {
defer r.Close()
}

func TestCompacted_NotFull(t *testing.T) {
var b bytes.Buffer
w, err := tsm1.NewTSMWriter(&b)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}

values := []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}
if err := w.Write("cpu", values); err != nil {
t.Fatalf("unexpected error writing: %v", err)

}
if err := w.WriteIndex(); err != nil {
t.Fatalf("unexpected error writing index: %v", err)
}

if err := w.Close(); err != nil {
t.Fatalf("unexpected error closing: %v", err)
}

r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
if err != nil {
t.Fatalf("unexpected error created reader: %v", err)
}

iter := r.BlockIterator()
if !iter.Next() {
t.Fatalf("expected next, got false")
}

_, _, _, block, err := iter.Read()
if err != nil {
t.Fatalf("unexpected error reading block: %v", err)
}

if got, exp := tsm1.BlockCount(block), 1; got != exp {
t.Fatalf("block count mismatch: got %v, exp %v", got, exp)
}
}

func BenchmarkIndirectIndex_UnmarshalBinary(b *testing.B) {
index := tsm1.NewDirectIndex()
for i := 0; i < 100000; i++ {
Expand Down

0 comments on commit 756421e

Please sign in to comment.