Skip to content

Commit

Permalink
Optimize SHOW MEASUREMENTS so it consults the database index directly
Browse files Browse the repository at this point in the history
SHOW MEASUREMENTS doesn't need to visit every shard in the open source
version since all of them contain the same database index.
  • Loading branch information
jsternberg committed Jul 18, 2016
1 parent 27650da commit 4121590
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 101 deletions.
226 changes: 136 additions & 90 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,100 +419,11 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
}
}

// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
}

// Replace instances of "now()" with the current time, and check the resultant times.
nowValuer := influxql.NowValuer{Now: now}
stmt.Condition = influxql.Reduce(stmt.Condition, &nowValuer)
// Replace instances of "now()" with the current time in the dimensions.
for _, d := range stmt.Dimensions {
d.Expr = influxql.Reduce(d.Expr, &nowValuer)
}

var err error
opt.MinTime, opt.MaxTime, err = influxql.TimeRange(stmt.Condition)
itrs, stmt, err := e.createIterators(stmt, ctx)
if err != nil {
return err
}

if opt.MaxTime.IsZero() {
// In the case that we're executing a meta query where the user cannot
// specify a time condition, then we expand the default max time
// to the maximum possible value, to ensure that data where all points
// are in the future are returned.
if influxql.Sources(stmt.Sources).HasSystemSource() {
opt.MaxTime = time.Unix(0, influxql.MaxTime).UTC()
} else {
opt.MaxTime = now
}
}
if opt.MinTime.IsZero() {
opt.MinTime = time.Unix(0, 0)
}

// Convert DISTINCT into a call.
stmt.RewriteDistinct()

// Remove "time" from fields list.
stmt.RewriteTimeFields()

// Create an iterator creator based on the shards in the cluster.
ic, err := e.iteratorCreator(stmt, &opt)
if err != nil {
return err
}

// Expand regex sources to their actual source names.
if stmt.Sources.HasRegex() {
sources, err := ic.ExpandSources(stmt.Sources)
if err != nil {
return err
}
stmt.Sources = sources
}

// Rewrite wildcards, if any exist.
tmp, err := stmt.RewriteFields(ic)
if err != nil {
return err
}
stmt = tmp

if e.MaxSelectBucketsN > 0 && !stmt.IsRawQuery {
interval, err := stmt.GroupByInterval()
if err != nil {
return err
}

if interval > 0 {
// Determine the start and end time matched to the interval (may not match the actual times).
min := opt.MinTime.Truncate(interval)
max := opt.MaxTime.Truncate(interval).Add(interval)

// Determine the number of buckets by finding the time span and dividing by the interval.
buckets := int64(max.Sub(min)) / int64(interval)
if int(buckets) > e.MaxSelectBucketsN {
return fmt.Errorf("max select bucket count exceeded: %d buckets", buckets)
}
}
}

// Create a set of iterators from a selection.
itrs, err := influxql.Select(stmt, ic, &opt)
if err != nil {
return err
}

if e.MaxSelectPointN > 0 {
monitor := influxql.PointLimitMonitor(itrs, influxql.DefaultStatsInterval, e.MaxSelectPointN)
ctx.Query.Monitor(monitor)
}

// Generate a row emitter from the iterator set.
em := influxql.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
em.Columns = stmt.ColumnNames()
Expand Down Expand Up @@ -606,6 +517,141 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
return nil
}

func (e *StatementExecutor) createMeasurementsIterator(stmt *influxql.SelectStatement, index *tsdb.DatabaseIndex) ([]influxql.Iterator, error) {
opt := influxql.IteratorOptions{
Condition: stmt.Condition,
Aux: []influxql.VarRef{{Val: "name", Type: influxql.String}},
Limit: stmt.Limit,
Offset: stmt.Offset,
}

var input influxql.Iterator
input, err := tsdb.NewMeasurementIterator(index, opt)
if err != nil {
return nil, err
}

// Apply limit & offset.
if opt.Limit > 0 || opt.Offset > 0 {
input = influxql.NewLimitIterator(input, opt)
}

aitr := influxql.NewAuxIterator(input, opt)
itr := aitr.Iterator("name", influxql.String)
aitr.Background()
return []influxql.Iterator{itr}, nil
}

func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) ([]influxql.Iterator, *influxql.SelectStatement, error) {
// Handle SHOW MEASUREMENTS at the database level instead of delegating it to the shards.
if source, ok := stmt.Sources[0].(*influxql.Measurement); ok && source.Name == "_measurements" {
// Use the optimized version only if we have direct access to the database.
if store, ok := e.TSDBStore.(LocalTSDBStore); ok {
index := store.DatabaseIndex(source.Database)
if index == nil {
return nil, stmt, nil
}
itrs, err := e.createMeasurementsIterator(stmt, index)
return itrs, stmt, err
}
}

// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{
InterruptCh: ctx.InterruptCh,
NodeID: ctx.ExecutionOptions.NodeID,
}

// Replace instances of "now()" with the current time, and check the resultant times.
nowValuer := influxql.NowValuer{Now: now}
stmt.Condition = influxql.Reduce(stmt.Condition, &nowValuer)
// Replace instances of "now()" with the current time in the dimensions.
for _, d := range stmt.Dimensions {
d.Expr = influxql.Reduce(d.Expr, &nowValuer)
}

var err error
opt.MinTime, opt.MaxTime, err = influxql.TimeRange(stmt.Condition)
if err != nil {
return nil, stmt, err
}

if opt.MaxTime.IsZero() {
// In the case that we're executing a meta query where the user cannot
// specify a time condition, then we expand the default max time
// to the maximum possible value, to ensure that data where all points
// are in the future are returned.
if influxql.Sources(stmt.Sources).HasSystemSource() {
opt.MaxTime = time.Unix(0, influxql.MaxTime).UTC()
} else {
opt.MaxTime = now
}
}
if opt.MinTime.IsZero() {
opt.MinTime = time.Unix(0, 0)
}

// Convert DISTINCT into a call.
stmt.RewriteDistinct()

// Remove "time" from fields list.
stmt.RewriteTimeFields()

// Create an iterator creator based on the shards in the cluster.
ic, err := e.iteratorCreator(stmt, &opt)
if err != nil {
return nil, stmt, err
}

// Expand regex sources to their actual source names.
if stmt.Sources.HasRegex() {
sources, err := ic.ExpandSources(stmt.Sources)
if err != nil {
return nil, stmt, err
}
stmt.Sources = sources
}

// Rewrite wildcards, if any exist.
tmp, err := stmt.RewriteFields(ic)
if err != nil {
return nil, stmt, err
}
stmt = tmp

if e.MaxSelectBucketsN > 0 && !stmt.IsRawQuery {
interval, err := stmt.GroupByInterval()
if err != nil {
return nil, stmt, err
}

if interval > 0 {
// Determine the start and end time matched to the interval (may not match the actual times).
min := opt.MinTime.Truncate(interval)
max := opt.MaxTime.Truncate(interval).Add(interval)

// Determine the number of buckets by finding the time span and dividing by the interval.
buckets := int64(max.Sub(min)) / int64(interval)
if int(buckets) > e.MaxSelectBucketsN {
return nil, stmt, fmt.Errorf("max select bucket count exceeded: %d buckets", buckets)
}
}
}

// Create a set of iterators from a selection.
itrs, err := influxql.Select(stmt, ic, &opt)
if err != nil {
return nil, stmt, err
}

if e.MaxSelectPointN > 0 {
monitor := influxql.PointLimitMonitor(itrs, influxql.DefaultStatsInterval, e.MaxSelectPointN)
ctx.Query.Monitor(monitor)
}
return itrs, stmt, nil
}

// iteratorCreator returns a new instance of IteratorCreator based on stmt.
func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
// Retrieve a list of shard IDs.
Expand Down
16 changes: 5 additions & 11 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
case "_fieldKeys":
return NewFieldKeysIterator(s, opt)
case "_measurements":
return NewMeasurementIterator(s, opt)
return NewMeasurementIterator(s.index, opt)
case "_series":
return NewSeriesIterator(s, opt)
case "_tagKeys":
Expand Down Expand Up @@ -900,24 +900,18 @@ func (itr *fieldKeysIterator) Next() (*influxql.FloatPoint, error) {

// MeasurementIterator represents a string iterator that emits all measurement names in a shard.
type MeasurementIterator struct {
mms Measurements
source *influxql.Measurement
mms Measurements
}

// NewMeasurementIterator returns a new instance of MeasurementIterator.
func NewMeasurementIterator(sh *Shard, opt influxql.IteratorOptions) (*MeasurementIterator, error) {
func NewMeasurementIterator(dbi *DatabaseIndex, opt influxql.IteratorOptions) (*MeasurementIterator, error) {
itr := &MeasurementIterator{}

// Extract source.
if len(opt.Sources) > 0 {
itr.source, _ = opt.Sources[0].(*influxql.Measurement)
}

// Retrieve measurements from shard. Filter if condition specified.
if opt.Condition == nil {
itr.mms = sh.index.Measurements()
itr.mms = dbi.Measurements()
} else {
mms, _, err := sh.index.measurementsByExpr(opt.Condition)
mms, _, err := dbi.measurementsByExpr(opt.Condition)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 4121590

Please sign in to comment.