Skip to content

Commit

Permalink
Merge pull request influxdata#45 from influxdb/nc-issue#44
Browse files Browse the repository at this point in the history
Add SampleNode and fix Grouping in Where node
  • Loading branch information
Nathaniel Cook committed Nov 10, 2015
2 parents 42bdd5d + 2f9dfa9 commit 0393f37
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 41 deletions.
27 changes: 12 additions & 15 deletions integrations/data/TestStream_TopSelector.srpl
Original file line number Diff line number Diff line change
Expand Up @@ -486,21 +486,6 @@ rpname
top_scores_gap,game=g1 gap=142,topFirst=998,topLast=856 1447114070
dbname
rpname
top_scores,game=g1,player=p14 top=998 1447114070
dbname
rpname
top_scores,game=g1,player=p8 top=940 1447114070
dbname
rpname
top_scores,game=g1,player=p17 top=926 1447114070
dbname
rpname
top_scores,game=g1,player=p4 top=861 1447114070
dbname
rpname
top_scores,game=g1,player=p9 top=856 1447114070
dbname
rpname
scores,game=g0,player=p0 value=971 1447114141
dbname
rpname
Expand Down Expand Up @@ -619,3 +604,15 @@ scores,game=g1,player=p18 value=739 1447114141
dbname
rpname
scores,game=g1,player=p19 value=861 1447114141
dbname
rpname
scores,game=g0,player=p16 value=748 1447114142
dbname
rpname
scores,game=g1,player=p16 value=748 1447114142
dbname
rpname
scores,game=g0,player=p17 value=434 1447114143
dbname
rpname
scores,game=g1,player=p17 value=434 1447114143
72 changes: 59 additions & 13 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,51 +1216,80 @@ stream
func TestStream_TopSelector(t *testing.T) {

var script = `
stream
var topScores = stream
.from('scores')
// Get the most recent score for each player
.groupBy('game', 'player')
.window()
.period(5s)
.period(2s)
.every(2s)
.align()
.mapReduce(influxql.last('value'))
// Calculate the top 5 scores per game
.groupBy('game')
.mapReduce(influxql.top(5, 'last', 'player'))
topScores
.httpOut('top_scores')
topScores.sample(4s)
.mapReduce(influxql.count('top'))
.httpOut('top_scores_sampled')
`

tw := time.Date(1970, 1, 1, 0, 0, 2, 0, time.UTC)
tw := time.Date(1970, 1, 1, 0, 0, 4, 0, time.UTC)
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "scores",
Tags: map[string]string{"game": "g0"},
Columns: []string{"time", "player", "top"},
Values: [][]interface{}{
{tw, "p11", 931.0},
{tw, "p13", 894.0},
{tw, "p2", 872.0},
{tw, "p6", 843.0},
{tw, "p4", 840.0},
{tw, "p7", 978.0},
{tw, "p10", 957.0},
{tw, "p9", 878.0},
{tw, "p5", 877.0},
{tw, "p15", 791.0},
},
},
{
Name: "scores",
Tags: map[string]string{"game": "g1"},
Columns: []string{"time", "player", "top"},
Values: [][]interface{}{
{tw, "p0", 965.0},
{tw, "p8", 953.0},
{tw, "p12", 833.0},
{tw, "p18", 813.0},
{tw, "p13", 734.0},
{tw, "p19", 926.0},
{tw, "p12", 887.0},
{tw, "p0", 879.0},
{tw, "p15", 872.0},
{tw, "p16", 863.0},
},
},
},
}

sampleER := kapacitor.Result{
Series: imodels.Rows{
{
Name: "scores",
Tags: map[string]string{"game": "g0"},
Columns: []string{"time", "count"},
Values: [][]interface{}{{
time.Date(1970, 1, 1, 0, 0, 4, 0, time.UTC),
5.0,
}},
},
{
Name: "scores",
Tags: map[string]string{"game": "g1"},
Columns: []string{"time", "count"},
Values: [][]interface{}{{
time.Date(1970, 1, 1, 0, 0, 4, 0, time.UTC),
5.0,
}},
},
},
}

clock, et, errCh, tm := testStreamer(t, "TestStream_TopSelector", script)
defer tm.Close()

Expand Down Expand Up @@ -1291,6 +1320,23 @@ stream
if eq, msg := compareResults(er, result); !eq {
t.Error(msg)
}

// Get the result
output, err = et.GetOutput("top_scores_sampled")
if err != nil {
t.Fatal(err)
}

resp, err = http.Get(output.Endpoint())
if err != nil {
t.Fatal(err)
}

// Assert we got the expected result
result = kapacitor.ResultFromJSON(resp.Body)
if eq, msg := compareResults(sampleER, result); !eq {
t.Error(msg)
}
}

// Helper test function for streamer
Expand Down
6 changes: 5 additions & 1 deletion models/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import (
"github.com/influxdb/influxdb/models"
)

//Represents purely a set of fields and a time.
// A point in batch, similar to Point but most information is
// found on the containing Batch.
//
// Tags on a BatchPoint are a superset of the tags on the Batch
// All points in a batch should have the same tag keys.
type BatchPoint struct {
Fields Fields `json:"fields"`
Time time.Time `json:"time"`
Expand Down
9 changes: 9 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,12 @@ func (n *chainnode) Window() *WindowNode {
n.linkChild(w)
return w
}

// Create a new node that samples the incoming points or batches.
//
// One point will be emitted every count or duration specified.
func (n *chainnode) Sample(rate interface{}) *SampleNode {
s := newSampleNode(n.Provides(), rate)
n.linkChild(s)
return s
}
52 changes: 52 additions & 0 deletions pipeline/sample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package pipeline

import (
"time"
)

// Sample points or batches.
// One point will be emitted every count or duration specified.
//
// Example:
// stream.
// .sample(3)
//
// Keep every third data point or batch.
//
// Example:
// stream.
// .sample(10s)
//
// Keep only samples that land on the 10s boundary.
// See WindowNode.Align or BatchNode.GroupBy time,
// for ensuring data is aligned with a boundary.
type SampleNode struct {
chainnode

// Keep every Count point or batch
// tick:ignore
Count int64

// Keep one point or batch every Duration
// tick:ignore
Duration time.Duration
}

func newSampleNode(wants EdgeType, rate interface{}) *SampleNode {
var c int64
var d time.Duration
switch r := rate.(type) {
case int64:
c = r
case time.Duration:
d = r
default:
panic("must pass int64 or duration to new sample node")
}

return &SampleNode{
chainnode: newBasicChainNode("sample", wants, wants),
Count: c,
Duration: d,
}
}
73 changes: 73 additions & 0 deletions sample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kapacitor

import (
"errors"
"time"

"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
)

type SampleNode struct {
node
s *pipeline.SampleNode

counts map[models.GroupID]int64
duration time.Duration
}

// Create a new SampleNode which filters data from a source.
func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode) (*SampleNode, error) {
sn := &SampleNode{
node: node{Node: n, et: et},
s: n,
counts: make(map[models.GroupID]int64),
duration: n.Duration,
}
sn.node.runF = sn.runSample
if n.Duration == 0 && n.Count == 0 {
return nil, errors.New("invalid sample rate: must be positive integer or duration")
}
return sn, nil
}

func (s *SampleNode) runSample() error {
switch s.Wants() {
case pipeline.StreamEdge:
for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() {
if s.shouldKeep(p.Group, p.Time) {
for _, child := range s.outs {
err := child.CollectPoint(p)
if err != nil {
return err
}
}
}
}
case pipeline.BatchEdge:
for b, ok := s.ins[0].NextBatch(); ok; b, ok = s.ins[0].NextBatch() {
if s.shouldKeep(b.Group, b.TMax) {
for _, child := range s.outs {
err := child.CollectBatch(b)
if err != nil {
return err
}
}
}
}
}
return nil
}

func (s *SampleNode) shouldKeep(group models.GroupID, t time.Time) bool {
if s.duration != 0 {
keepTime := t.Truncate(s.duration)
return t.Equal(keepTime)
} else {
count := s.counts[group]
keep := count%s.s.Count == 0
count++
s.counts[group] = count
return keep
}
}
2 changes: 2 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node) (Node, error) {
return newApplyNode(et, t)
case *pipeline.WhereNode:
return newWhereNode(et, t)
case *pipeline.SampleNode:
return newSampleNode(et, t)
default:
return nil, fmt.Errorf("unknown pipeline node type %T", p)
}
Expand Down
15 changes: 15 additions & 0 deletions tick/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewFunctions() Funcs {
funcs["bool"] = &boolean{}
funcs["int"] = &integer{}
funcs["float"] = &float{}
funcs["count"] = &count{}

return funcs
}
Expand Down Expand Up @@ -100,6 +101,20 @@ func (*float) Call(args ...interface{}) (v interface{}, err error) {
return
}

type count struct {
n int64
}

func (c *count) Reset() {
c.n = 0
}

// Counts the number of values processed.
func (c *count) Call(args ...interface{}) (v interface{}, err error) {
c.n++
return c.n, nil
}

type sigma struct {
mean float64
variance float64
Expand Down
12 changes: 7 additions & 5 deletions tick/lex.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
tokenMinus
tokenMult
tokenDiv
tokenMod

//end mathematical operators
end_tok_operator_math
Expand Down Expand Up @@ -73,6 +74,7 @@ var operatorStr = [...]string{
tokenMinus: "-",
tokenMult: "*",
tokenDiv: "/",
tokenMod: "%",
tokenEqual: "==",
tokenNotEqual: "!=",
tokenLess: "<",
Expand Down Expand Up @@ -304,7 +306,7 @@ func lexToken(l *lexer) stateFn {
}
}

const operatorChars = "+-*/><!="
const operatorChars = "+-*/><!=%"

func isOperatorChar(r rune) bool {
return strings.IndexRune(operatorChars, r) != -1
Expand All @@ -313,6 +315,10 @@ func isOperatorChar(r rune) bool {
func lexOperator(l *lexer) stateFn {
for {
switch l.next() {
case '+', '-', '*', '%':
op := strToOperator[l.current()]
l.emit(op)
return lexToken
case '/':
if l.peek() == '/' {
l.backup()
Expand All @@ -321,10 +327,6 @@ func lexOperator(l *lexer) stateFn {
op := strToOperator[l.current()]
l.emit(op)
return lexToken
case '+', '-', '*':
op := strToOperator[l.current()]
l.emit(op)
return lexToken
case '!':
if l.peek() == '=' || l.peek() == '~' {
l.next()
Expand Down
Loading

0 comments on commit 0393f37

Please sign in to comment.