Skip to content

Commit

Permalink
fix the distinct and derivative tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Nov 5, 2013
1 parent 6a89bda commit fcb3be6
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 34 deletions.
89 changes: 89 additions & 0 deletions src/engine/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var registeredAggregators = make(map[string]AggregatorInitializer)

func init() {
registeredAggregators["count"] = NewCountAggregator
registeredAggregators["derivative"] = NewDerivativeAggregator
registeredAggregators["max"] = NewMaxAggregator
registeredAggregators["min"] = NewMinAggregator
registeredAggregators["sum"] = NewSumAggregator
Expand Down Expand Up @@ -70,6 +71,94 @@ func NewCompositeAggregator(left, right Aggregator) (Aggregator, error) {
return &CompositeAggregator{left, right}, nil
}

//
// Derivative Aggregator
//

type DerivativeAggregator struct {
fieldIndex int
fieldName string
lastValues map[string]map[interface{}]*protocol.Point
points map[string]map[interface{}][]*protocol.FieldValue
}

func (self *DerivativeAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error {
lastValues := self.lastValues[series]
if lastValues == nil {
lastValues = make(map[interface{}]*protocol.Point)
self.lastValues[series] = lastValues
}

var value float64
if ptr := p.Values[self.fieldIndex].Int64Value; ptr != nil {
value = float64(*ptr)
} else if ptr := p.Values[self.fieldIndex].DoubleValue; ptr != nil {
value = *ptr
} else {
// else ignore this point
return nil
}

newValue := &protocol.Point{
Timestamp: p.Timestamp,
SequenceNumber: p.SequenceNumber,
Values: []*protocol.FieldValue{&protocol.FieldValue{DoubleValue: &value}},
}

var oldValue *protocol.Point
oldValue, lastValues[group] = lastValues[group], newValue
if oldValue == nil {
return nil
}

// if an old value exist, then compute the derivative and insert it in the points slice
deltaT := float64(*newValue.Timestamp-*oldValue.Timestamp) / float64(time.Second/time.Microsecond)
deltaV := *newValue.Values[self.fieldIndex].DoubleValue - *oldValue.Values[self.fieldIndex].DoubleValue
derivative := deltaV / deltaT
points := self.points[series]
if points == nil {
points = make(map[interface{}][]*protocol.FieldValue)
self.points[series] = points
}
points[group] = append(points[group], &protocol.FieldValue{DoubleValue: &derivative})
return nil
}

func (self *DerivativeAggregator) ColumnName() string {
return "derivative"
}

func (self *DerivativeAggregator) GetValue(series string, group interface{}) []*protocol.FieldValue {
return self.points[series][group]
}

func (self *DerivativeAggregator) InitializeFieldsMetadata(series *protocol.Series) error {
for idx, field := range series.Fields {
if field == self.fieldName {
self.fieldIndex = idx
return nil
}
}

return common.NewQueryError(common.InvalidArgument, fmt.Sprintf("Unknown column name %s", self.fieldName))
}

func NewDerivativeAggregator(q *parser.Query, v *parser.Value) (Aggregator, error) {
if len(v.Elems) != 1 {
return nil, common.NewQueryError(common.WrongNumberOfArguments, "function derivative() requires exactly one argument")
}

if v.Elems[0].Type == parser.ValueWildcard {
return nil, common.NewQueryError(common.InvalidArgument, "function derivative() doesn't work with wildcards")
}

return &DerivativeAggregator{
fieldName: v.Elems[0].Name,
lastValues: make(map[string]map[interface{}]*protocol.Point),
points: make(map[string]map[interface{}][]*protocol.FieldValue),
}, nil
}

//
// Count Aggregator
//
Expand Down
81 changes: 47 additions & 34 deletions src/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,21 @@ func createValuesToInterface(groupBy parser.GroupByClause, fields []string) (Map
}
}

func crossProduct(values [][]*protocol.FieldValue) [][]*protocol.FieldValue {
if len(values) == 0 {
return [][]*protocol.FieldValue{[]*protocol.FieldValue{}}
}

_returnedValues := crossProduct(values[:len(values)-1])
returnValues := [][]*protocol.FieldValue{}
for _, v := range values[len(values)-1] {
for _, values := range _returnedValues {
returnValues = append(returnValues, append(values, v))
}
}
return returnValues
}

func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database string, query *parser.Query,
yield func(*protocol.Series) error) error {
duration, err := query.GetGroupByClause().GetGroupByTime()
Expand Down Expand Up @@ -279,48 +294,46 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(user common.User, database
points := []*protocol.Point{}
for groupId, _ := range tableGroups {
timestamp := *timestampAggregator.GetValue(table, groupId)[0].Int64Value
/* groupPoints := []*protocol.Point{} */
point := &protocol.Point{
SequenceNumber: &sequenceNumber,
Values: []*protocol.FieldValue{},
}
point.SetTimestampInMicroseconds(timestamp)
values := [][]*protocol.FieldValue{}

for _, aggregator := range aggregators {
// point.Values = append(point.Values, aggregator.GetValue(table, groupId)[0])
returnValues := aggregator.GetValue(table, groupId)
returnDepth := len(returnValues)
for _, value := range returnValues {
if returnDepth > 1 {
// do some crazy shit
} else {
point.Values = append(point.Values, value)
}
}
values = append(values, aggregator.GetValue(table, groupId))
}

// FIXME: this should be looking at the fields slice not the group by clause
// FIXME: we should check whether the selected columns are in the group by clause
for idx, _ := range groupBy {
if duration != nil && idx == 0 {
continue
// do cross product of all the values
values = crossProduct(values)

for _, v := range values {
/* groupPoints := []*protocol.Point{} */
point := &protocol.Point{
SequenceNumber: &sequenceNumber,
Values: v,
}
point.SetTimestampInMicroseconds(timestamp)

// FIXME: this should be looking at the fields slice not the group by clause
// FIXME: we should check whether the selected columns are in the group by clause
for idx, _ := range groupBy {
if duration != nil && idx == 0 {
continue
}

value := inverse(groupId, idx)

switch x := value.(type) {
case string:
point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x})
case bool:
point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x})
case float64:
point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x})
case int64:
point.Values = append(point.Values, &protocol.FieldValue{Int64Value: &x})
value := inverse(groupId, idx)

switch x := value.(type) {
case string:
point.Values = append(point.Values, &protocol.FieldValue{StringValue: &x})
case bool:
point.Values = append(point.Values, &protocol.FieldValue{BoolValue: &x})
case float64:
point.Values = append(point.Values, &protocol.FieldValue{DoubleValue: &x})
case int64:
point.Values = append(point.Values, &protocol.FieldValue{Int64Value: &x})
}
}
}

points = append(points, point)
points = append(points, point)
}
}
expectedData := &protocol.Series{
Name: &tempTable,
Expand Down

0 comments on commit fcb3be6

Please sign in to comment.