Skip to content

Commit

Permalink
add missing working_cardinality stats on state* nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 10, 2017
1 parent 60e2a07 commit ba90de8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### Bugfixes

- [#1370](https://github.com/influxdata/kapacitor/issues/1370): Fix missing working_cardinality stats on stateDuration and stateCount nodes.

## v1.3.0-rc1 [2017-05-08]

### Release Notes
Expand Down
47 changes: 36 additions & 11 deletions state_tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package kapacitor
import (
"fmt"
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
Expand All @@ -28,30 +30,53 @@ type StateTrackingNode struct {
as string

newTracker func() stateTracker
groups map[models.GroupID]*stateTrackingGroup

groupsMu sync.RWMutex
groups map[models.GroupID]*stateTrackingGroup
}

func (stn *StateTrackingNode) group(g models.GroupID) (*stateTrackingGroup, error) {
stn.groupsMu.RLock()
stg := stn.groups[g]
if stg == nil {
stg = &stateTrackingGroup{}
stn.groupsMu.RUnlock()

var err error
stg.Expression, err = stateful.NewExpression(stn.lambda.Expression)
if err != nil {
return nil, fmt.Errorf("Failed to compile expression: %v", err)
}
if stg == nil {
// Grab the write lock
stn.groupsMu.Lock()
defer stn.groupsMu.Unlock()

// Check again now that we have the write lock
stg = stn.groups[g]
if stg == nil {
// Create a new tracking group
stg = &stateTrackingGroup{}

var err error
stg.Expression, err = stateful.NewExpression(stn.lambda.Expression)
if err != nil {
return nil, fmt.Errorf("Failed to compile expression: %v", err)
}

stg.ScopePool = stateful.NewScopePool(ast.FindReferenceVariables(stn.lambda.Expression))
stg.ScopePool = stateful.NewScopePool(ast.FindReferenceVariables(stn.lambda.Expression))

stg.tracker = stn.newTracker()
stg.tracker = stn.newTracker()

stn.groups[g] = stg
stn.groups[g] = stg
}
}
return stg, nil
}

func (stn *StateTrackingNode) runStateTracking(_ []byte) error {
// Setup working_cardinality gauage.
valueF := func() int64 {
stn.groupsMu.RLock()
l := len(stn.groups)
stn.groupsMu.RUnlock()
return int64(l)
}
stn.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch stn.Provides() {
case pipeline.StreamEdge:
for p, ok := stn.ins[0].NextPoint(); ok; p, ok = stn.ins[0].NextPoint() {
Expand Down

0 comments on commit ba90de8

Please sign in to comment.