Skip to content

Commit

Permalink
refactor: last cursor optimization function name (influxdata#21353)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesam authored May 13, 2021
1 parent 4becb6d commit d89b9cb
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
13 changes: 7 additions & 6 deletions storage/reads/aggregate_resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ type windowAggregateResultSet struct {
err error
}

// IsAscendingWindowAggregate checks two things: If the request passed in
// IsLastDescendingAggregateOptimization checks two things: If the request passed in
// is using the `last` aggregate type, and if it doesn't have a window. If both
// conditions are met, it returns false, otherwise, it returns true.
func IsAscendingWindowAggregate(req *datatypes.ReadWindowAggregateRequest) bool {
func IsLastDescendingAggregateOptimization(req *datatypes.ReadWindowAggregateRequest) bool {
if len(req.Aggregate) != 1 {
// Descending optimization for last only applies when it is the only aggregate.
return false
}

Expand All @@ -38,13 +39,13 @@ func IsAscendingWindowAggregate(req *datatypes.ReadWindowAggregateRequest) bool
if req.Aggregate[0].Type == datatypes.AggregateTypeLast {
if req.Window == nil {
if req.WindowEvery == 0 || req.WindowEvery == math.MaxInt64 {
return false
return true
}
} else if (req.Window.Every.Nsecs == 0 && req.Window.Every.Months == 0) || req.Window.Every.Nsecs == math.MaxInt64 {
return false
return true
}
}
return true
return false
}

func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowAggregateRequest, cursor SeriesCursor) (ResultSet, error) {
Expand All @@ -60,7 +61,7 @@ func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowA
return nil, errors.Errorf(errors.InternalError, "attempt to create a windowAggregateResultSet with %v aggregate functions", nAggs)
}

ascending := IsAscendingWindowAggregate(req)
ascending := !IsLastDescendingAggregateOptimization(req)
results := &windowAggregateResultSet{
ctx: ctx,
req: req,
Expand Down
8 changes: 4 additions & 4 deletions storage/reads/group_resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func GroupOptionNilSortLo() GroupOption {
}
}

// IsAscendingGroupAggregate checks if this request is using the `last` aggregate type.
// IsLastDescendingGroupOptimization checks if this request is using the `last` aggregate type.
// It returns true if an ascending cursor should be used (all other conditions)
// or a descending cursor (when `last` is used).
func IsAscendingGroupAggregate(req *datatypes.ReadGroupRequest) bool {
return req.Aggregate == nil || req.Aggregate.Type != datatypes.AggregateTypeLast
func IsLastDescendingGroupOptimization(req *datatypes.ReadGroupRequest) bool {
return req.Aggregate != nil && req.Aggregate.Type == datatypes.AggregateTypeLast
}

func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet {
Expand All @@ -61,7 +61,7 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new
o(g)
}

ascending := IsAscendingGroupAggregate(req)
ascending := !IsLastDescendingGroupOptimization(req)
g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, ascending)

for i, k := range req.GroupKeys {
Expand Down
4 changes: 2 additions & 2 deletions v1/services/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAg
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := !reads.IsAscendingWindowAggregate(req)
descending := reads.IsLastDescendingAggregateOptimization(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
Expand Down Expand Up @@ -212,7 +212,7 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := !reads.IsAscendingGroupAggregate(req)
descending := reads.IsLastDescendingGroupOptimization(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
Expand Down

0 comments on commit d89b9cb

Please sign in to comment.