Skip to content

Commit

Permalink
Merge pull request influxdata#8485 from benbjohnson/tsi-1.3
Browse files Browse the repository at this point in the history
TSI Fixes (1.3)
  • Loading branch information
benbjohnson authored Jun 13, 2017
2 parents 9032b82 + 400147c commit e9ca036
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 174 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8417](https://github.com/influxdata/influxdb/issues/8417): Fix large field keys preventing snapshot compactions
- [#7957](https://github.com/influxdata/influxdb/issues/7957): URL query parameter credentials take priority over Authentication header.
- [#8443](https://github.com/influxdata/influxdb/issues/8443): TSI branch has duplicate tag values.

- [#8470](https://github.com/influxdata/influxdb/issues/8470): index file fd leak in tsi branch
- [#8468](https://github.com/influxdata/influxdb/pull/8468): Fix TSI non-contiguous compaction panic.

## v1.2.4 [2017-05-08]

Expand Down
34 changes: 28 additions & 6 deletions tsdb/index/tsi1/file_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet {
// Ensure all old files are contiguous.
for j := range oldFiles {
if fs.files[i+j] != oldFiles[j] {
panic("cannot replace non-contiguous files")
panic(fmt.Sprintf("cannot replace non-contiguous files: subset=%+v, fileset=%+v", Files(oldFiles).IDs(), Files(fs.files).IDs()))
}
}

Expand Down Expand Up @@ -134,13 +134,25 @@ func (fs *FileSet) IndexFiles() []*IndexFile {
return a
}

// IndexFilesByLevel returns all index files for a given level.
func (fs *FileSet) IndexFilesByLevel(level int) []*IndexFile {
// LastContiguousIndexFilesByLevel returns the last contiguous files by level.
// These can be used by the compaction scheduler.
func (fs *FileSet) LastContiguousIndexFilesByLevel(level int) []*IndexFile {
if level == 0 {
return nil
}

var a []*IndexFile
for _, f := range fs.files {
if f, ok := f.(*IndexFile); ok && f.Level() == level {
a = append(a, f)
for i := len(fs.files) - 1; i >= 0; i-- {
f := fs.files[i]

// Ignore files above level, stop on files below level.
if level < f.Level() {
continue
} else if level > f.Level() {
break
}

a = append([]*IndexFile{f.(*IndexFile)}, a...)
}
return a
}
Expand Down Expand Up @@ -930,3 +942,13 @@ type File interface {
Retain()
Release()
}

type Files []File

func (a Files) IDs() []int {
ids := make([]int, len(a))
for i := range a {
ids[i] = a[i].ID()
}
return ids
}
96 changes: 0 additions & 96 deletions tsdb/index/tsi1/file_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,102 +265,6 @@ func TestFileSet_TagKeyIterator(t *testing.T) {
})
}

/*
func TestFileSet_FilterNamesTags(t *testing.T) {
var mf internal.File
fs := tsi1.NewFileSet(nil, []tsi1.File{&mf})
var (
names [][]byte
tags []models.Tags
)
reset := func() {
names = [][]byte{[]byte("m1"), []byte("m2"), []byte("m3"), []byte("m4")}
tags = []models.Tags{
models.NewTags(map[string]string{"host": "server-1"}),
models.NewTags(map[string]string{"host": "server-2"}),
models.NewTags(map[string]string{"host": "server-3"}),
models.NewTags(map[string]string{"host": "server-3"}),
}
}
// Filter out first name/tags in arguments.
reset()
mf.FilterNameTagsf = func(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) {
newNames := names[:0]
newTags := tagsSlice[:0]
for i := range names {
name := names[i]
tags := tagsSlice[i]
if string(name) == "m1" && tags[0].String() == "{host server-1}" {
continue
}
newNames = append(newNames, names[i])
newTags = append(newTags, tagsSlice[i])
}
return newNames, newTags
}
gotNames, gotTags := fs.FilterNamesTags(names, tags)
reset()
if got, exp := gotNames, names[1:]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
} else if got, exp := gotTags, tags[1:]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
}
// Filter out middle name/tags in arguments.
reset()
mf.FilterNameTagsf = func(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) {
newNames := names[:0]
newTags := tagsSlice[:0]
for i := range names {
name := names[i]
tags := tagsSlice[i]
if string(name) == "m3" && tags[0].String() == "{host server-3}" {
continue
}
newNames = append(newNames, names[i])
newTags = append(newTags, tagsSlice[i])
}
return newNames, newTags
}
gotNames, gotTags = fs.FilterNamesTags(names, tags)
reset()
if got, exp := gotNames, append(names[:2], names[3]); !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
} else if got, exp := gotTags, append(tags[:2], tags[3]); !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
}
// Filter out last name/tags in arguments.
reset()
mf.FilterNameTagsf = func(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) {
newNames := names[:0]
newTags := tagsSlice[:0]
for i := range names {
name := names[i]
tags := tagsSlice[i]
if string(name) == "m4" && tags[0].String() == "{host server-3}" {
continue
}
newNames = append(newNames, names[i])
newTags = append(newTags, tagsSlice[i])
}
return newNames, newTags
}
gotNames, gotTags = fs.FilterNamesTags(names, tags)
reset()
if got, exp := gotNames, names[:3]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
} else if got, exp := gotTags, tags[:3]; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, expected %v", got, exp)
}
}
*/

var (
byteSliceResult [][]byte
tagsSliceResult []models.Tags
Expand Down
83 changes: 14 additions & 69 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,20 +804,11 @@ func (i *Index) compact() {
continue
}

// Collect files for the level.
files := fs.IndexFilesByLevel(level)

// Calculate total size. Skip level if it doesn't meet min size of next level.
var size int64
for _, f := range files {
size += f.Size()
}
if size < i.levels[level+1].MinSize {
// Collect contiguous files from the end of the level.
files := fs.LastContiguousIndexFilesByLevel(level)
if len(files) < 2 {
continue
}

// Limit the number of files that can be merged at once.
if len(files) > MaxIndexMergeCount {
} else if len(files) > MaxIndexMergeCount {
files = files[len(files)-MaxIndexMergeCount:]
}

Expand Down Expand Up @@ -1253,71 +1244,25 @@ func joinIntSlice(a []int, sep string) string {
return strings.Join(other, sep)
}

// CompactionLevel represents a grouping of index files based on size and
// bloom filter settings. By having the same bloom filter settings, the filters
// CompactionLevel represents a grouping of index files based on bloom filter
// settings. By having the same bloom filter settings, the filters
// can be merged and evaluated at a higher level.
type CompactionLevel struct {
// Minimum expected index size
MinSize int64 `json:"minSize,omitempty"`

// Bloom filter bit size & hash count
M uint64 `json:"m,omitempty"`
K uint64 `json:"k,omitempty"`
}

// DefaultCompactionLevels is the default settings used by the index.
var DefaultCompactionLevels = []CompactionLevel{
// Log files, no filter.
{M: 0, K: 0},

// Initial compaction, 4MB filter
{
MinSize: 0,
M: 1 << 25,
K: 6,
},

// 24MB min file, 4MB filter
{
MinSize: 24 * (1 << 20),
M: 1 << 25,
K: 6,
},

// 48MB min file, 8MB filter
{
MinSize: 48 * (1 << 20),
M: 1 << 26,
K: 6,
},

// 96MB min file, 8MB filter
{
MinSize: 96 * (1 << 20),
M: 1 << 27,
K: 6,
},

// 192MB min file, 33MB filter
{
MinSize: 192 * (1 << 20),
M: 1 << 28,
K: 6,
},

// 768MB min file, 66MB filter
{
MinSize: 768 * (1 << 20),
M: 1 << 29,
K: 6,
},

// 2GB min file, 134MB filter
{
MinSize: 2 * (1 << 30),
M: 1 << 30,
K: 6,
},
{M: 0, K: 0}, // L0: Log files, no filter.
{M: 1 << 25, K: 6}, // L1: Initial compaction
{M: 1 << 25, K: 6}, // L2
{M: 1 << 26, K: 6}, // L3
{M: 1 << 27, K: 6}, // L4
{M: 1 << 28, K: 6}, // L5
{M: 1 << 29, K: 6}, // L6
{M: 1 << 30, K: 6}, // L7
}

// MaxIndexMergeCount is the maximum number of files that can be merged together at once.
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index/tsi1/series_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,11 +976,11 @@ func mapIndexFileSeriesBlockFile(f *os.File) (*SeriesBlock, []byte, error) {
if err != nil {
return nil, nil, err
}
data = data[len(FileSignature):] // Skip file signature.
sblk_data := data[len(FileSignature):] // Skip file signature.

// Unmarshal block on top of mmap.
var sblk SeriesBlock
if err := sblk.UnmarshalBinary(data); err != nil {
if err := sblk.UnmarshalBinary(sblk_data); err != nil {
mmap.Unmap(data)
return nil, nil, err
}
Expand Down

0 comments on commit e9ca036

Please sign in to comment.