Skip to content

Commit

Permalink
Revert "WIP: Add cardinality stat to each node type."
Browse files Browse the repository at this point in the history
  • Loading branch information
desa authored Mar 6, 2017
1 parent 20627b3 commit 7e29e8f
Show file tree
Hide file tree
Showing 19 changed files with 10 additions and 1,236 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
Renamed `eval_errors` to `errors` in eval node.
- [#922](https://github.com/influxdata/kapacitor/issues/922): Expose server specific information in alert templates.
- [#1162](https://github.com/influxdata/kapacitor/pulls/1162): Add Pushover integration.
- [#1221](https://github.com/influxdata/kapacitor/pull/1221): Add `working_cardinality` stat to each node type that tracks the number of groups per node.

### Bugfixes

Expand Down
25 changes: 3 additions & 22 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ type AlertNode struct {
messageTmpl *text.Template
detailsTmpl *html.Template

statesMu sync.RWMutex

alertsTriggered *expvar.Int
oksTriggered *expvar.Int
infosTriggered *expvar.Int
Expand Down Expand Up @@ -450,14 +448,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
valueF := func() int64 {
a.statesMu.RLock()
l := len(a.states)
a.statesMu.RUnlock()
return int64(l)
}
a.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

// Register delete hook
if a.hasAnonTopic() {
a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic))
Expand Down Expand Up @@ -498,7 +488,7 @@ func (a *AlertNode) runAlert([]byte) error {
return err
}
var currentLevel alert.Level
if state, ok := a.getAlertState(p.Group); ok {
if state, ok := a.states[p.Group]; ok {
currentLevel = state.currentLevel()
} else {
// Check for previous state
Expand Down Expand Up @@ -590,7 +580,7 @@ func (a *AlertNode) runAlert([]byte) error {
var highestPoint *models.BatchPoint

var currentLevel alert.Level
if state, ok := a.getAlertState(b.Group); ok {
if state, ok := a.states[b.Group]; ok {
currentLevel = state.currentLevel()
} else {
// Check for previous state
Expand Down Expand Up @@ -944,14 +934,12 @@ func (a *alertState) percentChange() float64 {
}

func (a *AlertNode) updateState(t time.Time, level alert.Level, group models.GroupID) *alertState {
state, ok := a.getAlertState(group)
state, ok := a.states[group]
if !ok {
state = &alertState{
history: make([]alert.Level, a.a.History),
}
a.statesMu.Lock()
a.states[group] = state
a.statesMu.Unlock()
}
state.addEvent(level)

Expand Down Expand Up @@ -1086,10 +1074,3 @@ func (a *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group
details := tmpBuffer.String()
return msg, details, nil
}

func (a *AlertNode) getAlertState(id models.GroupID) (state *alertState, ok bool) {
a.statesMu.RLock()
state, ok = a.states[id]
a.statesMu.RUnlock()
return state, ok
}
17 changes: 0 additions & 17 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"fmt"
"log"
"sort"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
Expand All @@ -21,8 +19,6 @@ type CombineNode struct {
expressionsByGroup map[models.GroupID][]stateful.Expression
scopePools []stateful.ScopePool

expressionsByGroupMu sync.RWMutex

combination combination
}

Expand All @@ -34,7 +30,6 @@ func newCombineNode(et *ExecutingTask, n *pipeline.CombineNode, l *log.Logger) (
expressionsByGroup: make(map[models.GroupID][]stateful.Expression),
combination: combination{max: n.Max},
}

// Create stateful expressions
cn.expressions = make([]stateful.Expression, len(n.Lambdas))
cn.scopePools = make([]stateful.ScopePool, len(n.Lambdas))
Expand Down Expand Up @@ -65,14 +60,6 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) }
func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

func (n *CombineNode) runCombine([]byte) error {
valueF := func() int64 {
n.expressionsByGroupMu.RLock()
l := len(n.expressionsByGroup)
n.expressionsByGroupMu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch n.Wants() {
case pipeline.StreamEdge:
buffers := make(map[models.GroupID]*buffer)
Expand Down Expand Up @@ -175,17 +162,13 @@ func (n *CombineNode) combineBuffer(buf *buffer) error {
return nil
}
l := len(n.expressions)
n.expressionsByGroupMu.RLock()
expressions, ok := n.expressionsByGroup[buf.Group]
n.expressionsByGroupMu.RUnlock()
if !ok {
expressions = make([]stateful.Expression, l)
for i, expr := range n.expressions {
expressions[i] = expr.CopyReset()
}
n.expressionsByGroupMu.Lock()
n.expressionsByGroup[buf.Group] = expressions
n.expressionsByGroupMu.Unlock()
}

// Compute matching result for all points
Expand Down
17 changes: 0 additions & 17 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package kapacitor

import (
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -29,25 +27,12 @@ func newDerivativeNode(et *ExecutingTask, n *pipeline.DerivativeNode, l *log.Log
func (d *DerivativeNode) runDerivative([]byte) error {
switch d.Provides() {
case pipeline.StreamEdge:
var mu sync.RWMutex
previous := make(map[models.GroupID]models.Point)
valueF := func() int64 {
mu.RLock()
l := len(previous)
mu.RUnlock()
return int64(l)
}
d.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
mu.RLock()
pr, ok := previous[p.Group]
mu.RUnlock()
if !ok {
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
continue
}
Expand All @@ -66,9 +51,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
}
d.timer.Resume()
}
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
}
case pipeline.BatchEdge:
Expand Down
19 changes: 0 additions & 19 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"errors"
"fmt"
"log"
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
Expand All @@ -22,10 +20,6 @@ type EvalNode struct {
refVarList [][]string
scopePool stateful.ScopePool
tags map[string]bool

expressionsByGroupMu sync.RWMutex

evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand All @@ -38,7 +32,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
e: n,
expressionsByGroup: make(map[models.GroupID][]stateful.Expression),
}

// Create stateful expressions
en.expressions = make([]stateful.Expression, len(n.Lambdas))
en.refVarList = make([][]string, len(n.Lambdas))
Expand Down Expand Up @@ -69,14 +62,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
valueF := func() int64 {
e.expressionsByGroupMu.RLock()
l := len(e.expressionsByGroup)
e.expressionsByGroupMu.RUnlock()
return int64(l)
}
e.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch e.Provides() {
case pipeline.StreamEdge:
var err error
Expand Down Expand Up @@ -133,17 +118,13 @@ func (e *EvalNode) runEval(snapshot []byte) error {
func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags models.Tags) (models.Fields, models.Tags, error) {
vars := e.scopePool.Get()
defer e.scopePool.Put(vars)
e.expressionsByGroupMu.RLock()
expressions, ok := e.expressionsByGroup[group]
e.expressionsByGroupMu.RUnlock()
if !ok {
expressions = make([]stateful.Expression, len(e.expressions))
for i, exp := range e.expressions {
expressions[i] = exp.CopyReset()
}
e.expressionsByGroupMu.Lock()
e.expressionsByGroup[group] = expressions
e.expressionsByGroupMu.Unlock()
}
for i, expr := range expressions {
err := fillScope(vars, e.refVarList[i], now, fields, tags)
Expand Down
23 changes: 0 additions & 23 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,6 @@ func (v *Int) IntValue() int64 {
return atomic.LoadInt64(&v.i)
}

// IntFuncGauge is a 64-bit integer variable that satisfies the expvar.Var interface.
type IntFuncGauge struct {
ValueF func() int64
}

func (v *IntFuncGauge) String() string {
return strconv.FormatInt(v.IntValue(), 10)
}

func (v *IntFuncGauge) Add(delta int64) {}
func (v *IntFuncGauge) Set(value int64) {}

func (v *IntFuncGauge) IntValue() int64 {
if v == nil || v.ValueF == nil {
return 0
}
return v.ValueF()
}

func NewIntFuncGauge(fn func() int64) *IntFuncGauge {
return &IntFuncGauge{fn}
}

// IntSum is a 64-bit integer variable that consists of multiple different parts
// and satisfies the expvar.Var interface.
// The value of the var is the sum of all its parts.
Expand Down
26 changes: 0 additions & 26 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -50,24 +49,13 @@ type flattenBatchBuffer struct {
}

func (n *FlattenNode) runFlatten([]byte) error {
var mu sync.RWMutex
switch n.Wants() {
case pipeline.StreamEdge:
flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer)
valueF := func() int64 {
mu.RLock()
l := len(flattenBuffers)
mu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() {
n.timer.Start()
t := p.Time.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := flattenBuffers[p.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenStreamBuffer{
Time: t,
Expand All @@ -76,9 +64,7 @@ func (n *FlattenNode) runFlatten([]byte) error {
Dimensions: p.Dimensions,
Tags: p.PointTags(),
}
mu.Lock()
flattenBuffers[p.Group] = currentBuf
mu.Unlock()
}
rp := models.RawPoint{
Time: t,
Expand Down Expand Up @@ -118,20 +104,10 @@ func (n *FlattenNode) runFlatten([]byte) error {
}
case pipeline.BatchEdge:
allBuffers := make(map[models.GroupID]*flattenBatchBuffer)
valueF := func() int64 {
mu.RLock()
l := len(allBuffers)
mu.RUnlock()
return int64(l)
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
t := b.TMax.Round(n.f.Tolerance)
mu.RLock()
currentBuf, ok := allBuffers[b.Group]
mu.RUnlock()
if !ok {
currentBuf = &flattenBatchBuffer{
Time: t,
Expand All @@ -140,9 +116,7 @@ func (n *FlattenNode) runFlatten([]byte) error {
Tags: b.Tags,
Points: make(map[time.Time][]models.RawPoint),
}
mu.Lock()
allBuffers[b.Group] = currentBuf
mu.Unlock()
}
if !t.Equal(currentBuf.Time) {
// Flatten/Emit old buffer
Expand Down
Loading

0 comments on commit 7e29e8f

Please sign in to comment.