Skip to content

Commit

Permalink
Batch series being created
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Jul 9, 2018
1 parent 32ed165 commit 4a90d14
Showing 1 changed file with 57 additions and 7 deletions.
64 changes: 57 additions & 7 deletions cmd/influx_inspect/buildtsi/buildtsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.uber.org/zap"
)

const defaultBatchSize = 10000

// Command represents the program execution for "influx_inspect buildtsi".
type Command struct {
Stderr io.Writer
Expand All @@ -34,14 +36,16 @@ type Command struct {
retentionFilter string
shardFilter string
maxLogFileSize int64
batchSize int
}

// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
Logger: zap.NewNop(),
Stderr: os.Stderr,
Stdout: os.Stdout,
Logger: zap.NewNop(),
batchSize: defaultBatchSize,
}
}

Expand All @@ -54,6 +58,7 @@ func (cmd *Command) Run(args ...string) error {
fs.StringVar(&cmd.retentionFilter, "retention", "", "optional: retention policy")
fs.StringVar(&cmd.shardFilter, "shard", "", "optional: shard id")
fs.Int64Var(&cmd.maxLogFileSize, "max-log-file-size", tsdb.DefaultMaxIndexLogFileSize, "optional: maximum log file size")
fs.IntVar(&cmd.batchSize, "batch-size", defaultBatchSize, "optional: set the size of the batches we write to the index. Setting this can have adverse affects on performance and heap requirements")
fs.BoolVar(&cmd.Verbose, "v", false, "verbose")
fs.SetOutput(cmd.Stdout)
if err := fs.Parse(args); err != nil {
Expand Down Expand Up @@ -264,6 +269,10 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
}

cmd.Logger.Info("Iterating over cache")
keysBatch := make([][]byte, 0, cmd.batchSize)
namesBatch := make([][]byte, 0, cmd.batchSize)
tagsBatch := make([]models.Tags, 0, cmd.batchSize)

for _, key := range cache.Keys() {
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
name, tags := models.ParseKey(seriesKey)
Expand All @@ -272,9 +281,29 @@ func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string,
cmd.Logger.Info("Series", zap.String("name", string(name)), zap.String("tags", tags.String()))
}

if err := tsiIndex.CreateSeriesIfNotExists(seriesKey, []byte(name), tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
keysBatch = append(keysBatch, seriesKey)
namesBatch = append(namesBatch, []byte(name))
tagsBatch = append(tagsBatch, tags)

// Flush batch?
if len(keysBatch) == cmd.batchSize {
if err := tsiIndex.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch); err != nil {
return fmt.Errorf("problem creating series: (%s)", err)
}
keysBatch = keysBatch[:0]
namesBatch = namesBatch[:0]
tagsBatch = tagsBatch[:0]
}
}

// Flush any remaining series in the batches
if len(keysBatch) > 0 {
if err := tsiIndex.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch); err != nil {
return fmt.Errorf("problem creating series: (%s)", err)
}
keysBatch = nil
namesBatch = nil
tagsBatch = nil
}

// Attempt to compact the index & wait for all compactions to complete.
Expand Down Expand Up @@ -307,6 +336,10 @@ func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error {
}
defer r.Close()

keysBatch := make([][]byte, 0, cmd.batchSize)
namesBatch := make([][]byte, 0, cmd.batchSize)
tagsBatch := make([]models.Tags, 0, cmd.batchSize)

for i := 0; i < r.KeyCount(); i++ {
key, _ := r.KeyAt(i)
seriesKey, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
Expand All @@ -316,8 +349,25 @@ func (cmd *Command) processTSMFile(index *tsi1.Index, path string) error {
cmd.Logger.Info("Series", zap.String("name", string(name)), zap.String("tags", tags.String()))
}

if err := index.CreateSeriesIfNotExists(seriesKey, []byte(name), tags); err != nil {
return fmt.Errorf("cannot create series: %s %s (%s)", name, tags.String(), err)
keysBatch = append(keysBatch, seriesKey)
namesBatch = append(namesBatch, []byte(name))
tagsBatch = append(tagsBatch, tags)

// Flush batch?
if len(keysBatch) == cmd.batchSize {
if err := index.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch); err != nil {
return fmt.Errorf("problem creating series: (%s)", err)
}
keysBatch = keysBatch[:0]
namesBatch = namesBatch[:0]
tagsBatch = tagsBatch[:0]
}
}

// Flush any remaining series in the batches
if len(keysBatch) > 0 {
if err := index.CreateSeriesListIfNotExists(keysBatch, namesBatch, tagsBatch); err != nil {
return fmt.Errorf("problem creating series: (%s)", err)
}
}
return nil
Expand Down

0 comments on commit 4a90d14

Please sign in to comment.