Skip to content

Commit

Permalink
Merge pull request influxdata#8645 from influxdata/jw-cursors
Browse files Browse the repository at this point in the history
Reduce memory usage when running queries
  • Loading branch information
jwilder authored Jul 28, 2017
2 parents fcc5184 + 3d12c62 commit de666db
Show file tree
Hide file tree
Showing 28 changed files with 608 additions and 594 deletions.
4 changes: 2 additions & 2 deletions cmd/influx_inspect/dumptsm/dumptsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (cmd *Command) dump() error {
var pos int
for i := 0; i < keyCount; i++ {
key, _ := r.KeyAt(i)
for _, e := range r.Entries(string(key)) {
for _, e := range r.Entries(key) {
pos++
split := strings.Split(string(key), "#!~#")

Expand Down Expand Up @@ -146,7 +146,7 @@ func (cmd *Command) dump() error {
// Start at the beginning and read every block
for j := 0; j < keyCount; j++ {
key, _ := r.KeyAt(j)
for _, e := range r.Entries(string(key)) {
for _, e := range r.Entries(key) {

f.Seek(int64(e.Offset), 0)
f.Read(b[:4])
Expand Down
2 changes: 1 addition & 1 deletion cmd/influx_inspect/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error {

for i := 0; i < r.KeyCount(); i++ {
key, _ := r.KeyAt(i)
values, err := r.ReadAll(string(key))
values, err := r.ReadAll(key)
if err != nil {
fmt.Fprintf(cmd.Stderr, "unable to read key %q in %s, skipping: %s\n", string(key), tsmFilePath, err.Error())
continue
Expand Down
2 changes: 1 addition & 1 deletion cmd/influx_inspect/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func writeCorpusToTSMFile(c corpus) *os.File {
}
sort.Strings(keys)
for _, k := range keys {
if err := w.Write(k, c[k]); err != nil {
if err := w.Write([]byte(k), c[k]); err != nil {
panic(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/influx_tsm/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *Converter) Process(iter KeyIterator) error {
}
keyCount = map[string]int{}
}
if err := w.Write(k, v); err != nil {
if err := w.Write([]byte(k), v); err != nil {
return err
}
keyCount[k]++
Expand Down
10 changes: 5 additions & 5 deletions influxql/point.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ type FloatPoint struct {
Tags Tags

Time int64
Nil bool
Value float64
Aux []interface{}

// Total number of points that were combined into this point from an aggregate.
// If this is zero, the point is not the result of an aggregate function.
Aggregated uint32
Nil bool
}

func (v *FloatPoint) name() string { return v.Name }
Expand Down Expand Up @@ -233,13 +233,13 @@ type IntegerPoint struct {
Tags Tags

Time int64
Nil bool
Value int64
Aux []interface{}

// Total number of points that were combined into this point from an aggregate.
// If this is zero, the point is not the result of an aggregate function.
Aggregated uint32
Nil bool
}

func (v *IntegerPoint) name() string { return v.Name }
Expand Down Expand Up @@ -444,13 +444,13 @@ type UnsignedPoint struct {
Tags Tags

Time int64
Nil bool
Value uint64
Aux []interface{}

// Total number of points that were combined into this point from an aggregate.
// If this is zero, the point is not the result of an aggregate function.
Aggregated uint32
Nil bool
}

func (v *UnsignedPoint) name() string { return v.Name }
Expand Down Expand Up @@ -653,13 +653,13 @@ type StringPoint struct {
Tags Tags

Time int64
Nil bool
Value string
Aux []interface{}

// Total number of points that were combined into this point from an aggregate.
// If this is zero, the point is not the result of an aggregate function.
Aggregated uint32
Nil bool
}

func (v *StringPoint) name() string { return v.Name }
Expand Down Expand Up @@ -864,13 +864,13 @@ type BooleanPoint struct {
Tags Tags

Time int64
Nil bool
Value bool
Aux []interface{}

// Total number of points that were combined into this point from an aggregate.
// If this is zero, the point is not the result of an aggregate function.
Aggregated uint32
Nil bool
}

func (v *BooleanPoint) name() string { return v.Name }
Expand Down
2 changes: 1 addition & 1 deletion influxql/point.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type {{.Name}}Point struct {
Tags Tags

Time int64
Nil bool
Value {{.Type}}
Aux []interface{}

// Total number of points that were combined into this point from an aggregate.
// If this is zero, the point is not the result of an aggregate function.
Aggregated uint32
Nil bool
}

func (v *{{.Name}}Point) name() string { return v.Name }
Expand Down
34 changes: 17 additions & 17 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ const (

// storer is the interface that descibes a cache's store.
type storer interface {
entry(key string) (*entry, bool) // Get an entry by its key.
write(key string, values Values) error // Write an entry to the store.
add(key string, entry *entry) // Add a new entry to the store.
remove(key string) // Remove an entry from the store.
keys(sorted bool) []string // Return an optionally sorted slice of entry keys.
apply(f func(string, *entry) error) error // Apply f to all entries in the store in parallel.
applySerial(f func(string, *entry) error) error // Apply f to all entries in serial.
entry(key []byte) (*entry, bool) // Get an entry by its key.
write(key []byte, values Values) error // Write an entry to the store.
add(key []byte, entry *entry) // Add a new entry to the store.
remove(key []byte) // Remove an entry from the store.
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
reset() // Reset the store to an initial unused state.
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (c *Cache) Statistics(tags map[string]string) []models.Statistic {

// Write writes the set of values for the key to the cache. This function is goroutine-safe.
// It returns an error if the cache will exceed its max size by adding the new values.
func (c *Cache) Write(key string, values []Value) error {
func (c *Cache) Write(key []byte, values []Value) error {
addedSize := uint64(Values(values).Size())

// Enough room in the cache?
Expand Down Expand Up @@ -307,7 +307,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// We'll optimistially set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
for k, v := range values {
if err := store.write(k, v); err != nil {
if err := store.write([]byte(k), v); err != nil {
// The write failed, hold onto the error and adjust the size delta.
werr = err
addedSize -= uint64(Values(v).Size())
Expand Down Expand Up @@ -388,7 +388,7 @@ func (c *Cache) Deduplicate() {

// Apply a function that simply calls deduplicate on each entry in the ring.
// apply cannot return an error in this invocation.
_ = store.apply(func(_ string, e *entry) error { e.deduplicate(); return nil })
_ = store.apply(func(_ []byte, e *entry) error { e.deduplicate(); return nil })
}

// ClearSnapshot removes the snapshot cache from the list of flushing caches and
Expand Down Expand Up @@ -436,7 +436,7 @@ func (c *Cache) MaxSize() uint64 {
}

// Keys returns a sorted slice of all keys under management by the cache.
func (c *Cache) Keys() []string {
func (c *Cache) Keys() [][]byte {
c.mu.RLock()
store := c.store
c.mu.RUnlock()
Expand All @@ -445,15 +445,15 @@ func (c *Cache) Keys() []string {

// unsortedKeys returns a slice of all keys under management by the cache. The
// keys are not sorted.
func (c *Cache) unsortedKeys() []string {
func (c *Cache) unsortedKeys() [][]byte {
c.mu.RLock()
store := c.store
c.mu.RUnlock()
return store.keys(false)
}

// Values returns a copy of all values, deduped and sorted, for the given key.
func (c *Cache) Values(key string) Values {
func (c *Cache) Values(key []byte) Values {
var snapshotEntries *entry

c.mu.RLock()
Expand Down Expand Up @@ -510,15 +510,15 @@ func (c *Cache) Values(key string) Values {
}

// Delete removes all values for the given keys from the cache.
func (c *Cache) Delete(keys []string) {
func (c *Cache) Delete(keys [][]byte) {
c.DeleteRange(keys, math.MinInt64, math.MaxInt64)
}

// DeleteRange removes the values for all keys containing points
// with timestamps between between min and max from the cache.
//
// TODO(edd): Lock usage could possibly be optimised if necessary.
func (c *Cache) DeleteRange(keys []string, min, max int64) {
func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -558,7 +558,7 @@ func (c *Cache) SetMaxSize(size uint64) {
// values returns the values for the key. It assumes the data is already sorted.
// It doesn't lock the cache but it does read-lock the entry if there is one for the key.
// values should only be used in compact.go in the CacheKeyIterator.
func (c *Cache) values(key string) Values {
func (c *Cache) values(key []byte) Values {
e, _ := c.store.entry(key)
if e == nil {
return nil
Expand All @@ -572,7 +572,7 @@ func (c *Cache) values(key string) Values {
// ApplyEntryFn applies the function f to each entry in the Cache.
// ApplyEntryFn calls f on each entry in turn, within the same goroutine.
// It is safe for use by multiple goroutines.
func (c *Cache) ApplyEntryFn(f func(key string, entry *entry) error) error {
func (c *Cache) ApplyEntryFn(f func(key []byte, entry *entry) error) error {
c.mu.RLock()
store := c.store
c.mu.RUnlock()
Expand Down
22 changes: 11 additions & 11 deletions tsdb/engine/tsm1/cache_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
values := make(tsm1.Values, 1000)
timestamps := make([]int64, len(values))
series := make([]string, 100)
series := make([][]byte, 100)
for i := range timestamps {
timestamps[i] = int64(rand.Int63n(int64(len(values))))
}
Expand All @@ -22,7 +22,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
}

for i := range series {
series[i] = fmt.Sprintf("series%d", i)
series[i] = []byte(fmt.Sprintf("series%d", i))
}

wg := sync.WaitGroup{}
Expand All @@ -34,17 +34,17 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
c.Write(s, tsm1.Values{v})
}
wg.Add(3)
go func(s string) {
go func(s []byte) {
defer wg.Done()
<-ch
c.Values(s)
}(s)
go func(s string) {
go func(s []byte) {
defer wg.Done()
<-ch
c.Values(s)
}(s)
go func(s string) {
go func(s []byte) {
defer wg.Done()
<-ch
c.Values(s)
Expand All @@ -57,7 +57,7 @@ func TestCacheCheckConcurrentReadsAreSafe(t *testing.T) {
func TestCacheRace(t *testing.T) {
values := make(tsm1.Values, 1000)
timestamps := make([]int64, len(values))
series := make([]string, 100)
series := make([][]byte, 100)
for i := range timestamps {
timestamps[i] = int64(rand.Int63n(int64(len(values))))
}
Expand All @@ -67,7 +67,7 @@ func TestCacheRace(t *testing.T) {
}

for i := range series {
series[i] = fmt.Sprintf("series%d", i)
series[i] = []byte(fmt.Sprintf("series%d", i))
}

wg := sync.WaitGroup{}
Expand All @@ -79,7 +79,7 @@ func TestCacheRace(t *testing.T) {
c.Write(s, tsm1.Values{v})
}
wg.Add(1)
go func(s string) {
go func(s []byte) {
defer wg.Done()
<-ch
c.Values(s)
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestCacheRace(t *testing.T) {
func TestCacheRace2Compacters(t *testing.T) {
values := make(tsm1.Values, 1000)
timestamps := make([]int64, len(values))
series := make([]string, 100)
series := make([][]byte, 100)
for i := range timestamps {
timestamps[i] = int64(rand.Int63n(int64(len(values))))
}
Expand All @@ -132,7 +132,7 @@ func TestCacheRace2Compacters(t *testing.T) {
}

for i := range series {
series[i] = fmt.Sprintf("series%d", i)
series[i] = []byte(fmt.Sprintf("series%d", i))
}

wg := sync.WaitGroup{}
Expand All @@ -144,7 +144,7 @@ func TestCacheRace2Compacters(t *testing.T) {
c.Write(s, tsm1.Values{v})
}
wg.Add(1)
go func(s string) {
go func(s []byte) {
defer wg.Done()
<-ch
c.Values(s)
Expand Down
Loading

0 comments on commit de666db

Please sign in to comment.