Skip to content

Commit

Permalink
Joins now have which indicates the type of join to perform.
Browse files Browse the repository at this point in the history
Can join multiple parents
  • Loading branch information
nathanielc committed Nov 16, 2015
1 parent 72f148e commit b0f5175
Show file tree
Hide file tree
Showing 19 changed files with 1,293 additions and 178 deletions.
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error)
return nil, err
}
// Set fill
switch fill := n.FillOption.(type) {
switch fill := n.Fill.(type) {
case string:
switch fill {
case "null":
Expand Down
7 changes: 7 additions & 0 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ func (e *Edge) Close() {
}
}

func (e *Edge) Next() (p models.PointInterface, ok bool) {
if e.stream != nil {
return e.NextPoint()
}
return e.NextBatch()
}

func (e *Edge) NextPoint() (p models.Point, ok bool) {
if wlog.LogLevel == wlog.DEBUG {
// Explicitly check log level since this is expensive and frequent
Expand Down
1 change: 1 addition & 0 deletions examples/error_percent/error_percent.tick
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ errors.join(views)
.influxDBOut()
.database('pages')
.measurement('error_percent')

5 changes: 4 additions & 1 deletion functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ func mr(field, newField string, et pipeline.EdgeType, m func(*tsdb.MapInput) int

// wrap tsdb.reduceFunc for ReduceFunc
func reduce(f func([]interface{}) interface{}, field string, et pipeline.EdgeType) ReduceFunc {
return ReduceFunc(func(in []interface{}, tmax time.Time, usePointTimes bool) interface{} {
return ReduceFunc(func(in []interface{}, tmax time.Time, usePointTimes bool, as string) interface{} {
v := f(in)
if as != "" {
field = as
}
switch et {
case pipeline.StreamEdge:
return reduceResultToPoint(field, v, tmax, usePointTimes)
Expand Down
162 changes: 162 additions & 0 deletions integrations/data/TestStream_JoinFill.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
dbname
rpname
errors,service=cartA,dc=A value=7 0000000001
dbname
rpname
views,service=cartA,dc=A value=700 0000000001
dbname
rpname
errors,service=login,dc=B value=9 0000000001
dbname
rpname
views,service=login,dc=B value=900 0000000001
dbname
rpname
disk,service=sda,dc=B value=39 0000000001
dbname
rpname
errors,service=front,dc=A value=2 0000000001
dbname
rpname
views,service=front,dc=A value=200 0000000002
dbname
rpname
errors,service=cartA,dc=B value=9 0000000002
dbname
rpname
views,service=cartA,dc=B value=900 0000000002
dbname
rpname
errors,service=login,dc=A value=5 0000000003
dbname
rpname
views,service=login,dc=A value=500 0000000003
dbname
rpname
errors,service=front,dc=B value=9 0000000003
dbname
rpname
views,service=front,dc=B value=900 0000000003
dbname
rpname
errors,service=cartA,dc=A value=3 0000000004
dbname
rpname
views,service=cartA,dc=A value=300 0000000004
dbname
rpname
errors,service=login,dc=B value=9 0000000004
dbname
rpname
views,service=login,dc=B value=900 0000000004
dbname
rpname
errors,service=front,dc=A value=2 0000000005
dbname
rpname
views,service=front,dc=A value=200 0000000005
dbname
rpname
errors,service=login,dc=B value=2 0000000005
dbname
rpname
views,service=login,dc=B value=200 0000000005
dbname
rpname
errors,service=front,dc=A value=5 0000000006
dbname
rpname
views,service=front,dc=A value=500 0000000006
dbname
rpname
errors,service=cartA,dc=B value=9 0000000006
dbname
rpname
views,service=cartA,dc=B value=900 0000000006
dbname
rpname
errors,service=login,dc=C value=7 0000000006
dbname
rpname
views,service=login,dc=C value=700 0000000006
dbname
rpname
errors,service=front,dc=A value=4 0000000007
dbname
rpname
views,service=front,dc=A value=400 0000000007
dbname
rpname
errors,service=cartA,dc=B value=8 0000000007
dbname
rpname
views,service=cartA,dc=B value=800 0000000007
dbname
rpname
errors,service=front,dc=A value=6 0000000008
dbname
rpname
views,service=front,dc=A value=600 0000000008
dbname
rpname
errors,service=cartA,dc=B value=6 0000000008
dbname
rpname
views,service=cartA,dc=B value=600 0000000008
dbname
rpname
errors,service=login,dc=A value=10 0000000009
dbname
rpname
views,service=login,dc=A value=1000 0000000009
dbname
rpname
errors,service=front,dc=B value=4 0000000009
dbname
rpname
views,service=front,dc=B value=400 0000000009
dbname
rpname
disk,service=sda,dc=B value=423 0000000009
dbname
rpname
errors,service=cartA,dc=A value=5 0000000010
dbname
rpname
views,service=cartA,dc=A value=500 0000000010
dbname
rpname
errors,service=login,dc=B value=3 0000000010
dbname
rpname
views,service=login,dc=B value=300 0000000010
dbname
rpname
errors,service=cartA,dc=A value=5 0000000011
dbname
rpname
views,service=cartA,dc=A value=500 0000000011
dbname
rpname
errors,service=login,dc=B value=6 0000000011
dbname
rpname
views,service=login,dc=B value=600 0000000011
dbname
rpname
errors,service=cartA,dc=A value=8 0000000012
dbname
rpname
views,service=cartA,dc=A value=800 0000000012
dbname
rpname
errors,service=front,dc=A value=9 0000000012
dbname
rpname
views,service=front,dc=A value=900 0000000012
dbname
rpname
errors,service=login,dc=B value=5 0000000012
dbname
rpname
views,service=login,dc=B value=500 0000000012
171 changes: 171 additions & 0 deletions integrations/data/TestStream_JoinN.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
dbname
rpname
cpu,cpu=0,host=serverA value=98 0000000001
dbname
rpname
cpu,cpu=1,host=serverB value=97 0000000001
dbname
rpname
memory,type=free,host=serverA value=1000 0000000001
dbname
rpname
disk,device=sda,host=serverB value=39 0000000001
dbname
rpname
cpu,cpu=total,host=serverA value=92 0000000002
dbname
rpname
disk,device=sdb,host=serverB value=39 0000000002
dbname
rpname
memory,type=free,host=serverA value=1000 0000000002
dbname
rpname
cpu,cpu=total,host=serverB value=92 0000000002
dbname
rpname
cpu,cpu=total,host=serverA value=93 0000000004
dbname
rpname
memory,type=cached,host=serverA value=1000 0000000004
dbname
rpname
cpu,cpu=0,host=serverB value=93 0000000004
dbname
rpname
disk,device=sdb,host=serverB value=39 0000000004
dbname
rpname
memory,type=free,host=serverA value=1000 0000000004
dbname
rpname
cpu,cpu=1,host=serverA value=92 0000000005
dbname
rpname
memory,type=free,host=serverA value=1000 0000000005
dbname
rpname
cpu,cpu=total,host=serverB value=92 0000000005
dbname
rpname
disk,device=sda,host=serverB value=39 0000000005
dbname
rpname
memory,type=used,host=serverA value=1000 0000000005
dbname
rpname
cpu,cpu=0,host=serverA value=95 0000000006
dbname
rpname
cpu,cpu=1,host=serverB value=95 0000000006
dbname
rpname
memory,type=free,host=serverA value=1000 0000000006
dbname
rpname
cpu,cpu=total,host=serverC value=95 0000000006
dbname
rpname
disk,device=sdb,host=serverB value=39 0000000006
dbname
rpname
cpu,cpu=0,host=serverA value=92 0000000007
dbname
rpname
cpu,cpu=1,host=serverB value=92 0000000007
dbname
rpname
memory,type=cached,host=serverA value=1000 0000000007
dbname
rpname
disk,device=sda,host=serverB value=39 0000000007
dbname
rpname
cpu,cpu=total,host=serverA value=96 0000000008
dbname
rpname
disk,device=sdb,host=serverB value=39 0000000008
dbname
rpname
cpu,cpu=0,host=serverB value=96 0000000008
dbname
rpname
memory,type=free,host=serverA value=1000 0000000008
dbname
rpname
disk,device=sda,host=serverB value=39 0000000008
dbname
rpname
cpu,cpu=1,host=serverA value=93 0000000009
dbname
rpname
memory,type=used,host=serverA value=1000 0000000009
dbname
rpname
disk,device=sdb,host=serverB value=39 0000000008
dbname
rpname
memory,type=cache,host=serverA value=1000 0000000009
dbname
rpname
cpu,cpu=total,host=serverB value=93 0000000009
dbname
rpname
memory,type=free,host=serverA value=1000 0000000009
dbname
rpname
disk,device=sda,host=serverB value=39 0000000009
dbname
rpname
disk,device=sdb,host=serverB value=42 0000000009
dbname
rpname
cpu,cpu=0,host=serverA value=95 0000000010
dbname
rpname
memory,type=free,host=serverA value=1000 0000000010
dbname
rpname
disk,device=sda,host=serverB value=39 0000000010
dbname
rpname
cpu,cpu=1,host=serverB value=95 0000000010
dbname
rpname
disk,device=sdb,host=serverB value=39 0000000010
dbname
rpname
memory,type=used,host=serverA value=1000 0000000010
dbname
rpname
cpu,cpu=total,host=serverA value=96 0000000011
dbname
rpname
disk,device=sda,host=serverB value=39 0000000011
dbname
rpname
cpu,cpu=0,host=serverB value=96 0000000011
dbname
rpname
memory,type=cache,host=serverA value=1000 0000000011
dbname
rpname
disk,device=sdb,host=serverB value=39 0000000011
dbname
rpname
cpu,cpu=1,host=serverA value=95 0000000012
dbname
rpname
memory,type=free,host=serverA value=1000 0000000012
dbname
rpname
disk,device=sda,host=serverB value=39 0000000012
dbname
rpname
cpu,cpu=total,host=serverB value=95 0000000012
dbname
rpname
memory,type=free,host=serverA value=1000 0000000012
dbname
rpname
memory,type=used,host=serverA value=1000 0000000012
Loading

0 comments on commit b0f5175

Please sign in to comment.