Skip to content

Commit

Permalink
Merge pull request FeatureBaseDB#778 from benbjohnson/621-aggregate-f…
Browse files Browse the repository at this point in the history
…ield-queries

Sum() and Average() field queries.
  • Loading branch information
benbjohnson authored Aug 22, 2017
2 parents 2fea07f + 562fba9 commit 5bea524
Show file tree
Hide file tree
Showing 9 changed files with 674 additions and 101 deletions.
142 changes: 141 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (e *Executor) executeCall(ctx context.Context, index string, c *pql.Call, s
indexTag := fmt.Sprintf("index:%s", index)
// Special handling for mutation and top-n calls.
switch c.Name {
case "Average":
e.Holder.Stats.CountWithCustomTags(c.Name, 1, 1.0, []string{indexTag})
return e.executeAverage(ctx, index, c, slices, opt)
case "ClearBit":
return e.executeClearBit(ctx, index, c, opt)
case "Count":
Expand All @@ -174,6 +177,9 @@ func (e *Executor) executeCall(ctx context.Context, index string, c *pql.Call, s
return nil, e.executeSetRowAttrs(ctx, index, c, opt)
case "SetColumnAttrs":
return nil, e.executeSetColumnAttrs(ctx, index, c, opt)
case "Sum":
e.Holder.Stats.CountWithCustomTags(c.Name, 1, 1.0, []string{indexTag})
return e.executeSum(ctx, index, c, slices, opt)
case "TopN":
e.Holder.Stats.CountWithCustomTags(c.Name, 1, 1.0, []string{indexTag})
return e.executeTopN(ctx, index, c, slices, opt)
Expand Down Expand Up @@ -202,6 +208,41 @@ func (e *Executor) validateCallArgs(c *pql.Call) error {
return nil
}

// executeAverage executes an average() call.
func (e *Executor) executeAverage(ctx context.Context, index string, c *pql.Call, slices []uint64, opt *ExecOptions) (int64, error) {
if frame, _ := c.Args["frame"]; frame == "" {
return 0, errors.New("Average(): frame required")
} else if field, _ := c.Args["field"]; field == "" {
return 0, errors.New("Average(): field required")
}

if len(c.Children) > 1 {
return 0, errors.New("Average() only accepts a single bitmap input")
}

// Execute calls in bulk on each remote node and merge.
mapFn := func(slice uint64) (interface{}, error) {
return e.executeSumCountSlice(ctx, index, c, slice)
}

// Merge returned results at coordinating node.
reduceFn := func(prev, v interface{}) interface{} {
other, _ := prev.(SumCount)
return other.Add(v.(SumCount))
}

result, err := e.mapReduce(ctx, index, slices, c, opt, mapFn, reduceFn)
if err != nil {
return 0, err
}
other, _ := result.(SumCount)

if other.Count == 0 {
return 0, nil
}
return other.Sum / other.Count, nil
}

// executeBitmapCall executes a call that returns a bitmap.
func (e *Executor) executeBitmapCall(ctx context.Context, index string, c *pql.Call, slices []uint64, opt *ExecOptions) (*Bitmap, error) {
// Execute calls in bulk on each remote node and merge.
Expand Down Expand Up @@ -280,6 +321,77 @@ func (e *Executor) executeBitmapCallSlice(ctx context.Context, index string, c *
}
}

// executeSum executes a sum() call.
func (e *Executor) executeSum(ctx context.Context, index string, c *pql.Call, slices []uint64, opt *ExecOptions) (int64, error) {
if frame, _ := c.Args["frame"]; frame == "" {
return 0, errors.New("Sum(): frame required")
} else if field, _ := c.Args["field"]; field == "" {
return 0, errors.New("Sum(): field required")
}

if len(c.Children) > 1 {
return 0, errors.New("Sum() only accepts a single bitmap input")
}

// Execute calls in bulk on each remote node and merge.
mapFn := func(slice uint64) (interface{}, error) {
return e.executeSumCountSlice(ctx, index, c, slice)
}

// Merge returned results at coordinating node.
reduceFn := func(prev, v interface{}) interface{} {
other, _ := prev.(SumCount)
return other.Add(v.(SumCount))
}

result, err := e.mapReduce(ctx, index, slices, c, opt, mapFn, reduceFn)
if err != nil {
return 0, err
}
other, _ := result.(SumCount)

return other.Sum, nil
}

// executeSumCountSlice executes calculates the sum & count for fields on a slice.
func (e *Executor) executeSumCountSlice(ctx context.Context, index string, c *pql.Call, slice uint64) (SumCount, error) {
var filter *Bitmap
if len(c.Children) == 1 {
bm, err := e.executeBitmapCallSlice(ctx, index, c.Children[0], slice)
if err != nil {
return SumCount{}, err
}
filter = bm
}

frameName, _ := c.Args["frame"].(string)
fieldName, _ := c.Args["field"].(string)

frame := e.Holder.Frame(index, frameName)
if frame == nil {
return SumCount{}, nil
}

field := frame.Field(fieldName)
if field == nil {
return SumCount{}, nil
}

view := e.Holder.Fragment(index, frameName, ViewFieldPrefix+fieldName, slice)
if view == nil {
return SumCount{}, nil
}

vsum, vcount, err := view.FieldSum(filter, field.BitDepth())
if err != nil {
return SumCount{}, err
}
return SumCount{
Sum: int64(vsum) + (int64(vcount) * field.Min),
Count: int64(vcount),
}, nil
}

// executeTopN executes a TopN() call.
// This first performs the TopN() to determine the top results and then
// requeries to retrieve the full counts for each of the top results.
Expand Down Expand Up @@ -1179,6 +1291,8 @@ func (e *Executor) exec(ctx context.Context, node *Node, index string, q *pql.Qu
var err error

switch call.Name {
case "Average", "Sum":
v, err = decodeSumCount(pb.Results[i].GetSumCount()), nil
case "TopN":
v, err = decodePairs(pb.Results[i].GetPairs()), nil
case "Count":
Expand Down Expand Up @@ -1297,7 +1411,6 @@ func (e *Executor) mapper(ctx context.Context, ch chan mapResponse, nodes []*Nod
if n.Host == e.Host {
resp.result, resp.err = e.mapperLocal(ctx, nodeSlices, mapFn, reduceFn)
} else if !opt.Remote {

results, err := e.exec(ctx, n, index, &pql.Query{Calls: []*pql.Call{c}}, nodeSlices, opt)
if len(results) > 0 {
resp.result = results[0]
Expand Down Expand Up @@ -1414,3 +1527,30 @@ func needsSlices(calls []*pql.Call) bool {
}
return false
}

// SumCount represents a grouping of sum & count for Sum() and Average() calls.
type SumCount struct {
Sum int64 `json:"sum"`
Count int64 `json:"count"`
}

func (sc *SumCount) Add(other SumCount) SumCount {
return SumCount{
Sum: sc.Sum + other.Sum,
Count: sc.Count + other.Count,
}
}

func encodeSumCount(sc SumCount) *internal.SumCount {
return &internal.SumCount{
Sum: sc.Sum,
Count: sc.Count,
}
}

func decodeSumCount(pb *internal.SumCount) SumCount {
return SumCount{
Sum: pb.Sum,
Count: pb.Count,
}
}
122 changes: 122 additions & 0 deletions executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestExecutor_Execute_TopN(t *testing.T) {
}
})
}

func TestExecutor_Execute_TopN_fill(t *testing.T) {
hldr := test.MustOpenHolder()
defer hldr.Close()
Expand Down Expand Up @@ -559,7 +560,128 @@ func TestExecutor_Execute_TopN_Attr_Src(t *testing.T) {
}}) {
t.Fatalf("unexpected result: %s", spew.Sdump(result))
}
}

// Ensure a Sum() query can be executed.
func TestExecutor_Execute_Sum(t *testing.T) {
hldr := test.MustOpenHolder()
defer hldr.Close()
e := test.NewExecutor(hldr.Holder, test.NewCluster(1))

idx, err := hldr.CreateIndex("i", pilosa.IndexOptions{})
if err != nil {
t.Fatal(err)
}

if _, err := idx.CreateFrame("f", pilosa.FrameOptions{
RangeEnabled: true,
Fields: []*pilosa.Field{
{Name: "foo", Type: pilosa.FieldTypeInt, Min: 10, Max: 100},
{Name: "bar", Type: pilosa.FieldTypeInt, Min: 0, Max: 100000},
},
}); err != nil {
t.Fatal(err)
}

if _, err := idx.CreateFrame("other", pilosa.FrameOptions{
RangeEnabled: true,
Fields: []*pilosa.Field{
{Name: "foo", Type: pilosa.FieldTypeInt, Min: 0, Max: 1000},
},
}); err != nil {
t.Fatal(err)
}

if _, err := e.Execute(context.Background(), "i", test.MustParse(`
SetBit(frame=f, rowID=0, columnID=0)
SetBit(frame=f, rowID=0, columnID=`+strconv.Itoa(SliceWidth+1)+`)
SetFieldValue(frame=f, foo=20, bar=2000, columnID=0)
SetFieldValue(frame=f, foo=30, columnID=`+strconv.Itoa(SliceWidth)+`)
SetFieldValue(frame=f, foo=40, columnID=`+strconv.Itoa(SliceWidth+2)+`)
SetFieldValue(frame=f, foo=50, columnID=`+strconv.Itoa((5*SliceWidth)+100)+`)
SetFieldValue(frame=f, foo=60, columnID=`+strconv.Itoa(SliceWidth+1)+`)
SetFieldValue(frame=other, foo=1000, columnID=0)
`), nil, nil); err != nil {
t.Fatal(err)
}

t.Run("NoFilter", func(t *testing.T) {
if result, err := e.Execute(context.Background(), "i", test.MustParse(`Sum(frame=f, field=foo)`), nil, nil); err != nil {
t.Fatal(err)
} else if result[0] != int64(200) {
t.Fatalf("unexpected result: %s", spew.Sdump(result))
}
})

t.Run("WithFilter", func(t *testing.T) {
if result, err := e.Execute(context.Background(), "i", test.MustParse(`Sum(Bitmap(frame=f, rowID=0), frame=f, field=foo)`), nil, nil); err != nil {
t.Fatal(err)
} else if result[0] != int64(80) {
t.Fatalf("unexpected result: %s", spew.Sdump(result))
}
})
}

// Ensure a Average() query can be executed.
func TestExecutor_Execute_Average(t *testing.T) {
hldr := test.MustOpenHolder()
defer hldr.Close()
e := test.NewExecutor(hldr.Holder, test.NewCluster(1))

idx, err := hldr.CreateIndex("i", pilosa.IndexOptions{})
if err != nil {
t.Fatal(err)
}

if _, err := idx.CreateFrame("f", pilosa.FrameOptions{
RangeEnabled: true,
Fields: []*pilosa.Field{
{Name: "foo", Type: pilosa.FieldTypeInt, Min: 10, Max: 100},
{Name: "bar", Type: pilosa.FieldTypeInt, Min: 0, Max: 100000},
},
}); err != nil {
t.Fatal(err)
}

if _, err := idx.CreateFrame("other", pilosa.FrameOptions{
RangeEnabled: true,
Fields: []*pilosa.Field{
{Name: "foo", Type: pilosa.FieldTypeInt, Min: 0, Max: 1000},
},
}); err != nil {
t.Fatal(err)
}

if _, err := e.Execute(context.Background(), "i", test.MustParse(`
SetBit(frame=f, rowID=0, columnID=0)
SetBit(frame=f, rowID=0, columnID=`+strconv.Itoa(SliceWidth+2)+`)
SetFieldValue(frame=f, foo=20, bar=2000, columnID=0)
SetFieldValue(frame=f, foo=30, columnID=`+strconv.Itoa(SliceWidth)+`)
SetFieldValue(frame=f, foo=40, columnID=`+strconv.Itoa(SliceWidth+2)+`)
SetFieldValue(frame=f, foo=50, columnID=`+strconv.Itoa((5*SliceWidth)+100)+`)
SetFieldValue(frame=f, foo=60, columnID=`+strconv.Itoa(SliceWidth+1)+`)
SetFieldValue(frame=other, foo=1000, columnID=0)
`), nil, nil); err != nil {
t.Fatal(err)
}

t.Run("NoFilter", func(t *testing.T) {
if result, err := e.Execute(context.Background(), "i", test.MustParse(`Average(frame=f, field=foo)`), nil, nil); err != nil {
t.Fatal(err)
} else if result[0] != int64(40) {
t.Fatalf("unexpected result: %s", spew.Sdump(result))
}
})

t.Run("WithFilter", func(t *testing.T) {
if result, err := e.Execute(context.Background(), "i", test.MustParse(`Average(Bitmap(frame=f, rowID=0), frame=f, field=foo)`), nil, nil); err != nil {
t.Fatal(err)
} else if result[0] != int64(30) {
t.Fatalf("unexpected result: %s", spew.Sdump(result))
}
})
}

// Ensure a range query can be executed.
Expand Down
31 changes: 31 additions & 0 deletions fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,37 @@ func (f *Fragment) SetFieldValue(columnID uint64, bitDepth uint, value uint64) (
return changed, nil
}

// FieldSum returns the sum of a given field as well as the number of columns involved.
// A bitmap can be passed in to optionally filter the computed columns.
func (f *Fragment) FieldSum(filter *Bitmap, bitDepth uint) (sum, count uint64, err error) {
f.mu.Lock()
defer f.mu.Unlock()

// Compute count based on the existance bit.
row := f.row(uint64(bitDepth), true, true)
if filter != nil {
row = row.Intersect(filter)
}
count = row.Count()

// Compute the sum based on the bit count of each row multiplied by the
// place value of each row. For example, 10 bits in the 1's place plus
// 4 bits in the 2's place plus 3 bits in the 4's place equals a total
// sum of 30:
//
// 10*(2^0) + 4*(2^1) + 3*(2^2) = 30
//
for i := uint(0); i < bitDepth; i++ {
row := f.row(uint64(i), true, true)
if filter != nil {
row = row.Intersect(filter)
}
sum += (1 << i) * row.Count()
}

return sum, count, nil
}

func (f *Fragment) FieldRange(op string, bitDepth uint, predicate uint64) (*Bitmap, error) {
switch op {
case RangeOpEQ:
Expand Down
Loading

0 comments on commit 5bea524

Please sign in to comment.