Skip to content

Commit

Permalink
Merge pull request influxdata#1242 from influxdata/revert-1240-revert…
Browse files Browse the repository at this point in the history
…-1221-md-issue#1173

Revert "Revert "WIP: Add cardinality stat to each node type.""
  • Loading branch information
desa authored Mar 6, 2017
2 parents 62d64e8 + c902ade commit 10f4f90
Show file tree
Hide file tree
Showing 19 changed files with 1,236 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Renamed `eval_errors` to `errors` in eval node.
- [#922](https://github.com/influxdata/kapacitor/issues/922): Expose server specific information in alert templates.
- [#1162](https://github.com/influxdata/kapacitor/pulls/1162): Add Pushover integration.
- [#1221](https://github.com/influxdata/kapacitor/pull/1221): Add `working_cardinality` stat to each node type that tracks the number of groups per node.

### Bugfixes

Expand Down
25 changes: 22 additions & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type AlertNode struct {
messageTmpl *text.Template
detailsTmpl *html.Template

statesMu sync.RWMutex

alertsTriggered *expvar.Int
oksTriggered *expvar.Int
infosTriggered *expvar.Int
Expand Down Expand Up @@ -448,6 +450,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
valueF := func() int64 {
a.statesMu.RLock()
l := len(a.states)
a.statesMu.RUnlock()
return int64(l)
}
a.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

// Register delete hook
if a.hasAnonTopic() {
a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic))
Expand Down Expand Up @@ -488,7 +498,7 @@ func (a *AlertNode) runAlert([]byte) error {
return err
}
var currentLevel alert.Level
if state, ok := a.states[p.Group]; ok {
if state, ok := a.getAlertState(p.Group); ok {
currentLevel = state.currentLevel()
} else {
// Check for previous state
Expand Down Expand Up @@ -580,7 +590,7 @@ func (a *AlertNode) runAlert([]byte) error {
var highestPoint *models.BatchPoint

var currentLevel alert.Level
if state, ok := a.states[b.Group]; ok {
if state, ok := a.getAlertState(b.Group); ok {
currentLevel = state.currentLevel()
} else {
// Check for previous state
Expand Down Expand Up @@ -934,12 +944,14 @@ func (a *alertState) percentChange() float64 {
}

func (a *AlertNode) updateState(t time.Time, level alert.Level, group models.GroupID) *alertState {
state, ok := a.states[group]
state, ok := a.getAlertState(group)
if !ok {
state = &alertState{
history: make([]alert.Level, a.a.History),
}
a.statesMu.Lock()
a.states[group] = state
a.statesMu.Unlock()
}
state.addEvent(level)

Expand Down Expand Up @@ -1074,3 +1086,10 @@ func (a *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group
details := tmpBuffer.String()
return msg, details, nil
}

func (a *AlertNode) getAlertState(id models.GroupID) (state *alertState, ok bool) {
a.statesMu.RLock()
state, ok = a.states[id]
a.statesMu.RUnlock()
return state, ok
}
17 changes: 17 additions & 0 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"log"
"sort"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
Expand All @@ -19,6 +21,8 @@ type CombineNode struct {
expressionsByGroup map[models.GroupID][]stateful.Expression
scopePools []stateful.ScopePool

expressionsByGroupMu sync.RWMutex

combination combination
}

Expand All @@ -30,6 +34,7 @@ func newCombineNode(et *ExecutingTask, n *pipeline.CombineNode, l *log.Logger) (
expressionsByGroup: make(map[models.GroupID][]stateful.Expression),
combination: combination{max: n.Max},
}

// Create stateful expressions
cn.expressions = make([]stateful.Expression, len(n.Lambdas))
cn.scopePools = make([]stateful.ScopePool, len(n.Lambdas))
Expand Down Expand Up @@ -60,6 +65,14 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) }
func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

func (n *CombineNode) runCombine([]byte) error {
valueF := func() int64 {
n.expressionsByGroupMu.RLock()
l := len(n.expressionsByGroup)
n.expressionsByGroupMu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch n.Wants() {
case pipeline.StreamEdge:
buffers := make(map[models.GroupID]*buffer)
Expand Down Expand Up @@ -162,13 +175,17 @@ func (n *CombineNode) combineBuffer(buf *buffer) error {
return nil
}
l := len(n.expressions)
n.expressionsByGroupMu.RLock()
expressions, ok := n.expressionsByGroup[buf.Group]
n.expressionsByGroupMu.RUnlock()
if !ok {
expressions = make([]stateful.Expression, l)
for i, expr := range n.expressions {
expressions[i] = expr.CopyReset()
}
n.expressionsByGroupMu.Lock()
n.expressionsByGroup[buf.Group] = expressions
n.expressionsByGroupMu.Unlock()
}

// Compute matching result for all points
Expand Down
17 changes: 17 additions & 0 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package kapacitor

import (
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -27,12 +29,25 @@ func newDerivativeNode(et *ExecutingTask, n *pipeline.DerivativeNode, l *log.Log
func (d *DerivativeNode) runDerivative([]byte) error {
switch d.Provides() {
case pipeline.StreamEdge:
var mu sync.RWMutex
previous := make(map[models.GroupID]models.Point)
valueF := func() int64 {
mu.RLock()
l := len(previous)
mu.RUnlock()
return int64(l)
}
d.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
mu.RLock()
pr, ok := previous[p.Group]
mu.RUnlock()
if !ok {
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
continue
}
Expand All @@ -51,7 +66,9 @@ func (d *DerivativeNode) runDerivative([]byte) error {
}
d.timer.Resume()
}
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
}
case pipeline.BatchEdge:
Expand Down
19 changes: 19 additions & 0 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"errors"
"fmt"
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
Expand All @@ -20,6 +22,10 @@ type EvalNode struct {
refVarList [][]string
scopePool stateful.ScopePool
tags map[string]bool

expressionsByGroupMu sync.RWMutex

evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand All @@ -32,6 +38,7 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
e: n,
expressionsByGroup: make(map[models.GroupID][]stateful.Expression),
}

// Create stateful expressions
en.expressions = make([]stateful.Expression, len(n.Lambdas))
en.refVarList = make([][]string, len(n.Lambdas))
Expand Down Expand Up @@ -62,6 +69,14 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
valueF := func() int64 {
e.expressionsByGroupMu.RLock()
l := len(e.expressionsByGroup)
e.expressionsByGroupMu.RUnlock()
return int64(l)
}
e.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch e.Provides() {
case pipeline.StreamEdge:
var err error
Expand Down Expand Up @@ -118,13 +133,17 @@ func (e *EvalNode) runEval(snapshot []byte) error {
func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags models.Tags) (models.Fields, models.Tags, error) {
vars := e.scopePool.Get()
defer e.scopePool.Put(vars)
e.expressionsByGroupMu.RLock()
expressions, ok := e.expressionsByGroup[group]
e.expressionsByGroupMu.RUnlock()
if !ok {
expressions = make([]stateful.Expression, len(e.expressions))
for i, exp := range e.expressions {
expressions[i] = exp.CopyReset()
}
e.expressionsByGroupMu.Lock()
e.expressionsByGroup[group] = expressions
e.expressionsByGroupMu.Unlock()
}
for i, expr := range expressions {
err := fillScope(vars, e.refVarList[i], now, fields, tags)
Expand Down
23 changes: 23 additions & 0 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ func (v *Int) IntValue() int64 {
return atomic.LoadInt64(&v.i)
}

// IntFuncGauge is a 64-bit integer variable that satisfies the expvar.Var interface.
type IntFuncGauge struct {
ValueF func() int64
}

func (v *IntFuncGauge) String() string {
return strconv.FormatInt(v.IntValue(), 10)
}

func (v *IntFuncGauge) Add(delta int64) {}
func (v *IntFuncGauge) Set(value int64) {}

func (v *IntFuncGauge) IntValue() int64 {
if v == nil || v.ValueF == nil {
return 0
}
return v.ValueF()
}

func NewIntFuncGauge(fn func() int64) *IntFuncGauge {
return &IntFuncGauge{fn}
}

// IntSum is a 64-bit integer variable that consists of multiple different parts
// and satisfies the expvar.Var interface.
// The value of the var is the sum of all its parts.
Expand Down
26 changes: 26 additions & 0 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -49,13 +50,24 @@ type flattenBatchBuffer struct {
}

func (n *FlattenNode) runFlatten([]byte) error {
var mu sync.RWMutex
switch n.Wants() {
case pipeline.StreamEdge:
flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer)
valueF := func() int64 {
mu.RLock()
l := len(flattenBuffers)
mu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() {
n.timer.Start()
t := p.Time.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := flattenBuffers[p.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenStreamBuffer{
Time: t,
Expand All @@ -64,7 +76,9 @@ func (n *FlattenNode) runFlatten([]byte) error {
Dimensions: p.Dimensions,
Tags: p.PointTags(),
}
mu.Lock()
flattenBuffers[p.Group] = currentBuf
mu.Unlock()
}
rp := models.RawPoint{
Time: t,
Expand Down Expand Up @@ -104,10 +118,20 @@ func (n *FlattenNode) runFlatten([]byte) error {
}
case pipeline.BatchEdge:
allBuffers := make(map[models.GroupID]*flattenBatchBuffer)
valueF := func() int64 {
mu.RLock()
l := len(allBuffers)
mu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
t := b.TMax.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := allBuffers[b.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenBatchBuffer{
Time: t,
Expand All @@ -116,7 +140,9 @@ func (n *FlattenNode) runFlatten([]byte) error {
Tags: b.Tags,
Points: make(map[time.Time][]models.RawPoint),
}
mu.Lock()
allBuffers[b.Group] = currentBuf
mu.Unlock()
}
if !t.Equal(currentBuf.Time) {
// Flatten/Emit old buffer
Expand Down
Loading

0 comments on commit 10f4f90

Please sign in to comment.