Skip to content

Commit

Permalink
Add cardinality stat to FlattenNode
Browse files Browse the repository at this point in the history
  • Loading branch information
desa committed Mar 2, 2017
1 parent 09bf19d commit f3fb481
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -49,13 +50,24 @@ type flattenBatchBuffer struct {
}

func (n *FlattenNode) runFlatten([]byte) error {
var mu sync.RWMutex
switch n.Wants() {
case pipeline.StreamEdge:
flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer)
cardinalityGauge := expvar.NewIntFuncGauge(func() int {
mu.RLock()
l := len(flattenBuffers)
mu.RUnlock()
return l
})
n.statMap.Set(statsCardinalityGauge, cardinalityGauge)

for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() {
n.timer.Start()
t := p.Time.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := flattenBuffers[p.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenStreamBuffer{
Time: t,
Expand All @@ -64,7 +76,9 @@ func (n *FlattenNode) runFlatten([]byte) error {
Dimensions: p.Dimensions,
Tags: p.PointTags(),
}
mu.Lock()
flattenBuffers[p.Group] = currentBuf
mu.Unlock()
}
rp := models.RawPoint{
Time: t,
Expand Down Expand Up @@ -104,10 +118,19 @@ func (n *FlattenNode) runFlatten([]byte) error {
}
case pipeline.BatchEdge:
allBuffers := make(map[models.GroupID]*flattenBatchBuffer)
cardinalityGauge := expvar.NewIntFuncGauge(func() int {
mu.RLock()
l := len(allBuffers)
mu.RUnlock()
return l
})
n.statMap.Set(statsCardinalityGauge, cardinalityGauge)
for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
t := b.TMax.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := allBuffers[b.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenBatchBuffer{
Time: t,
Expand All @@ -116,7 +139,9 @@ func (n *FlattenNode) runFlatten([]byte) error {
Tags: b.Tags,
Points: make(map[time.Time][]models.RawPoint),
}
mu.Lock()
allBuffers[b.Group] = currentBuf
mu.Unlock()
}
if !t.Equal(currentBuf.Time) {
// Flatten/Emit old buffer
Expand Down

0 comments on commit f3fb481

Please sign in to comment.