Skip to content

Commit

Permalink
elastic: implemented histogram aggregation method
Browse files Browse the repository at this point in the history
  • Loading branch information
adamstruck committed May 2, 2018
1 parent 284a49c commit d334b2c
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 48 deletions.
46 changes: 38 additions & 8 deletions aql/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package aql

import (
"io"
//"log"
//"fmt"
// "log"
"context"
"fmt"
"sort"

"github.com/bmeg/arachne/protoutil"
Expand Down Expand Up @@ -211,25 +211,33 @@ func (aggRes *AggregationResult) AsMap() map[string]interface{} {
}
}

// SortedInsert inserts an AggregationResult into a slice of AggregationResults
// SortedInsert inserts an AggregationResult into the Buckets field
// and returns the index of the insertion
func (namedAggRes *NamedAggregationResult) SortedInsert(el *AggregationResult) int {
func (namedAggRes *NamedAggregationResult) SortedInsert(el *AggregationResult) (int, error) {
if !namedAggRes.IsValueSorted() {
return 0, fmt.Errorf("buckets are not value sorted")
}

if len(namedAggRes.Buckets) == 0 {
namedAggRes.Buckets = []*AggregationResult{el}
return 0
return 0, nil
}

index := sort.Search(len(namedAggRes.Buckets), func(i int) bool {
if namedAggRes.Buckets[i] == nil {
return true
}
return namedAggRes.Buckets[i].Value < el.Value
return el.Value > namedAggRes.Buckets[i].Value
})

namedAggRes.Buckets = append(namedAggRes.Buckets, &AggregationResult{})
copy(namedAggRes.Buckets[index+1:], namedAggRes.Buckets[index:])
namedAggRes.Buckets[index] = el
return index

return index, nil
}

// SortOnValue sorts a slice of AggregationResults by Value
// SortOnValue sorts Buckets by Value in descending order
func (namedAggRes *NamedAggregationResult) SortOnValue() {
sort.Slice(namedAggRes.Buckets, func(i, j int) bool {
if namedAggRes.Buckets[i] == nil && namedAggRes.Buckets[j] != nil {
Expand All @@ -244,3 +252,25 @@ func (namedAggRes *NamedAggregationResult) SortOnValue() {
return namedAggRes.Buckets[i].Value > namedAggRes.Buckets[j].Value
})
}

// IsValueSorted returns true if the Buckets are sorted by Value
func (namedAggRes *NamedAggregationResult) IsValueSorted() bool {
for i := range namedAggRes.Buckets {
j := i + 1
if i < len(namedAggRes.Buckets)-2 {
if namedAggRes.Buckets[i] != nil && namedAggRes.Buckets[j] == nil {
return true
}
if namedAggRes.Buckets[i] == nil && namedAggRes.Buckets[j] != nil {
return false
}
if namedAggRes.Buckets[i] == nil && namedAggRes.Buckets[j] == nil {
return true
}
if namedAggRes.Buckets[i].Value < namedAggRes.Buckets[j].Value {
return false
}
}
}
return true
}
36 changes: 24 additions & 12 deletions aql/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,31 @@ func TestNamedAggregationResultInsert(t *testing.T) {
size := 5
aggRes := NamedAggregationResult{
Name: "test",
Buckets: make([]*AggregationResult, size),
Buckets: []*AggregationResult{},
}

for i := 0; i < 5; i++ {
aggRes.SortedInsert(&AggregationResult{Value: float64((i + 1) * 2)})
for i := 0; i < size; i++ {
aggRes.Buckets = append(aggRes.Buckets, &AggregationResult{Value: float64((i + 1) * 2)})
}

for i := range aggRes.Buckets {
if i < len(aggRes.Buckets)-2 {
if aggRes.Buckets[i].Value < aggRes.Buckets[i+1].Value {
t.Errorf("unexpected bucket order %+v", aggRes.Buckets)
}
}
t.Logf("initial list: %v", aggRes.Buckets)

index, err := aggRes.SortedInsert(&AggregationResult{Value: float64(5)})
if err == nil {
t.Error("expected error for SortedInsert")
}

aggRes.SortOnValue()
t.Logf("sorted initial list: %v", aggRes.Buckets)

index, err = aggRes.SortedInsert(&AggregationResult{Value: float64(5)})
if err != nil {
t.Error("unexpected error for SortedInsert", err)
}
t.Logf("list after insert: %v", aggRes.Buckets)

index := aggRes.SortedInsert(&AggregationResult{Value: float64(5)})
if len(aggRes.Buckets) != size {
t.Errorf("unexpected list size %d != %d", size, len(aggRes.Buckets))
if len(aggRes.Buckets) != size+1 {
t.Errorf("unexpected list size %d != %d", size+1, len(aggRes.Buckets))
}
if index != 3 {
t.Errorf("incorrect index returned %d != %d", 3, index)
Expand All @@ -49,6 +56,11 @@ func TestNamedAggregationResultSort(t *testing.T) {
t.Logf("initial list: %+v", aggRes.Buckets)
aggRes.SortOnValue()
t.Logf("sorted list: %+v", aggRes.Buckets)

if !aggRes.IsValueSorted() {
t.Errorf("unexpected bucket order %+v", aggRes.Buckets)
}

for i := range aggRes.Buckets {
if i < len(aggRes.Buckets)-2 {
if aggRes.Buckets[i].Value < aggRes.Buckets[i+1].Value {
Expand Down
18 changes: 16 additions & 2 deletions conformance/tests/ot_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,12 @@ def test_histogram_aggregation(O):

count = 0
for row in O.aggregate(aql.histogram("test-agg", "Person", "age", 5)):
print(row)
count += 1
if len(row["buckets"]) != 5:
if len(row["buckets"]) != 6:
errors.append(
"Unexpected number of terms: %d != %d" %
(len(row["buckets"]), 5)
(len(row["buckets"]), 6)
)

if row['name'] != 'test-agg':
Expand Down Expand Up @@ -179,4 +180,17 @@ def test_histogram_aggregation(O):

# def test_percentile_aggregation(O):
# errors = []
# setupGraph(O)

# count = 0
# percents = [1, 5, 25, 50, 75, 95, 99]
# for row in O.aggregate(aql.percentile("test-agg", "Person", "age", percents)):
# print(row)
# count += 1
# if len(row["buckets"]) != len(percents):
# errors.append(
# "Unexpected number of terms: %d != %d" %
# (len(row["buckets"]), len(percents))
# )

# return errors
35 changes: 31 additions & 4 deletions elastic/es_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ func (es *Graph) GetVertexIndexList() chan aql.IndexID {

// GetVertexTermAggregation returns the count of every term across vertices
func (es *Graph) GetVertexTermAggregation(ctx context.Context, name string, label string, field string, size uint32) (*aql.NamedAggregationResult, error) {
log.Printf("Running GetVertexTermAggregation: { label: %s, field: %s }", label, field)

log.Printf("Running GetVertexTermAggregation: { label: %s, field: %s size: %v}", label, field, size)
out := &aql.NamedAggregationResult{
Name: name,
Buckets: []*aql.AggregationResult{},
Expand All @@ -104,7 +103,12 @@ func (es *Graph) GetVertexTermAggregation(ctx context.Context, name string, labe
if agg, found := res.Aggregations.Terms(aggName); found {
for _, bucket := range agg.Buckets {
term := protoutil.WrapValue(bucket.Key.(string))
out.Buckets = append(out.Buckets, &aql.AggregationResult{Key: term, Value: float64(bucket.DocCount)})
out.SortedInsert(&aql.AggregationResult{Key: term, Value: float64(bucket.DocCount)})
if size > 0 {
if len(out.Buckets) > int(size) {
out.Buckets = out.Buckets[:size]
}
}
}
}

Expand All @@ -113,11 +117,34 @@ func (es *Graph) GetVertexTermAggregation(ctx context.Context, name string, labe

//GetVertexHistogramAggregation get binned counts of a term across vertices
func (es *Graph) GetVertexHistogramAggregation(ctx context.Context, name string, label string, field string, interval uint32) (*aql.NamedAggregationResult, error) {
return nil, fmt.Errorf("not implemented")
log.Printf("Running GetVertexHistogramAggregation: { label: %s, field: %s interval: %v }", label, field, interval)
out := &aql.NamedAggregationResult{
Name: name,
Buckets: []*aql.AggregationResult{},
}

q := es.client.Search().Index(es.vertexIndex).Type("vertex")
q = q.Query(elastic.NewBoolQuery().Filter(elastic.NewTermQuery("label", label)))
aggName := fmt.Sprintf("histogram.aggregation.%s.%s", label, field)
q = q.Aggregation(aggName,
elastic.NewHistogramAggregation().Field("data."+field).Interval(float64(interval)).OrderByKeyAsc())
res, err := q.Do(ctx)
if err != nil {
return nil, fmt.Errorf("histogram aggregation failed: %s", err)
}
if agg, found := res.Aggregations.Terms(aggName); found {
for _, bucket := range agg.Buckets {
term := protoutil.WrapValue(bucket.Key.(float64))
out.Buckets = append(out.Buckets, &aql.AggregationResult{Key: term, Value: float64(bucket.DocCount)})
}
}

return out, nil
}

//GetVertexPercentileAggregation get percentiles of a term across vertices
func (es *Graph) GetVertexPercentileAggregation(ctx context.Context, name string, label string, field string, percents []uint32) (*aql.NamedAggregationResult, error) {
log.Printf("Running GetVertexPercentileAggregation: { label: %s, field: %s percents: %v }", label, field, percents)
return nil, fmt.Errorf("not implemented")
}

Expand Down
7 changes: 6 additions & 1 deletion engine/core/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func (agg *aggregate) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP

aggOut := &aql.NamedAggregationResult{
Name: a.Name,
Buckets: make([]*aql.AggregationResult, tagg.Size),
Buckets: []*aql.AggregationResult{},
}

prefix := kvindex.TermPrefix(tagg.Field)
Expand All @@ -810,6 +810,11 @@ func (agg *aggregate) Process(ctx context.Context, man gdbi.Manager, in gdbi.InP
countBytes, _ := it.Value()
count, _ := binary.Uvarint(countBytes)
aggOut.SortedInsert(&aql.AggregationResult{Key: termVal, Value: float64(count)})
if tagg.Size > 0 {
if len(aggOut.Buckets) > int(tagg.Size) {
aggOut.Buckets = aggOut.Buckets[:tagg.Size]
}
}
}
return nil
})
Expand Down
7 changes: 6 additions & 1 deletion graphserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,12 @@ func (server *ArachneServer) Aggregate(req *aql.AggregationsRequest, stream aql.
stream.Send(res)

case *aql.Aggregate_Percentile:
return fmt.Errorf("percentile aggregation not implemented")
pagg := agg.GetPercentile()
res, err := graph.GetVertexPercentileAggregation(stream.Context(), agg.Name, pagg.Label, pagg.Field, pagg.Percents)
if err != nil {
return fmt.Errorf("percentile aggregation failed: %s", err)
}
stream.Send(res)

case *aql.Aggregate_Histogram:
histagg := agg.GetHistogram()
Expand Down
15 changes: 8 additions & 7 deletions kvgraph/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (kgdb *KVInterfaceGDB) GetVertexTermAggregation(ctx context.Context, name s
log.Printf("Running GetVertexTermAggregation: { label: %s, field: %s size: %v}", label, field, size)
out := &aql.NamedAggregationResult{
Name: name,
Buckets: make([]*aql.AggregationResult, size),
Buckets: []*aql.AggregationResult{},
}

parts := strings.Split(field, ".")
Expand All @@ -103,6 +103,11 @@ func (kgdb *KVInterfaceGDB) GetVertexTermAggregation(ctx context.Context, name s
s := tcount.String // BUG: This is ignoring number terms
t := protoutil.WrapValue(s)
out.SortedInsert(&aql.AggregationResult{Key: t, Value: float64(tcount.Count)})
if size > 0 {
if len(out.Buckets) > int(size) {
out.Buckets = out.Buckets[:size]
}
}
}

return out, nil
Expand All @@ -115,22 +120,18 @@ func (kgdb *KVInterfaceGDB) GetVertexHistogramAggregation(ctx context.Context, n
Name: name,
Buckets: []*aql.AggregationResult{},
}
buckets := []*aql.AggregationResult{}

min := kgdb.kvg.idx.FieldTermNumberMin(fmt.Sprintf("%s.v.%s.%s", kgdb.graph, label, field))
max := kgdb.kvg.idx.FieldTermNumberMax(fmt.Sprintf("%s.v.%s.%s", kgdb.graph, label, field))

i := float64(interval)
for bucket := math.Floor(min/i) * i; bucket < max; bucket += i {
for bucket := math.Floor(min/i) * i; bucket <= max; bucket += i {
var count uint64
for tcount := range kgdb.kvg.idx.FieldTermNumberRange(fmt.Sprintf("%s.v.%s.%s", kgdb.graph, label, field), bucket, bucket+i) {
count += tcount.Count
}

buckets = append(buckets, &aql.AggregationResult{Key: protoutil.WrapValue(bucket), Value: float64(count)})
out.Buckets = append(out.Buckets, &aql.AggregationResult{Key: protoutil.WrapValue(bucket), Value: float64(count)})
}

out.Buckets = buckets
return out, nil
}

Expand Down
Loading

0 comments on commit d334b2c

Please sign in to comment.