Skip to content

Commit

Permalink
Fix bug that reassigned the nodeCardinality
Browse files Browse the repository at this point in the history
  • Loading branch information
desa committed Mar 2, 2017
1 parent bb3c2e5 commit a4bbdb9
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 61 deletions.
12 changes: 6 additions & 6 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
n.History = 2
}
an.states = make(map[models.GroupID]*alertState)
an.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
an.cardinalityMu.RLock()
l := len(an.states)
an.cardinalityMu.RUnlock()
return int64(l)
})

// Configure flapping
if n.UseFlapping {
Expand All @@ -456,6 +450,12 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
a.nodeCardinality.ValueF = func() int64 {
a.cardinalityMu.RLock()
l := len(a.states)
a.cardinalityMu.RUnlock()
return int64(l)
}
// Register delete hook
if a.hasAnonTopic() {
a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic))
Expand Down
15 changes: 7 additions & 8 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
Expand Down Expand Up @@ -35,13 +34,6 @@ func newCombineNode(et *ExecutingTask, n *pipeline.CombineNode, l *log.Logger) (
combination: combination{max: n.Max},
}

cn.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
cn.cardinalityMu.RLock()
l := len(cn.expressionsByGroup)
cn.cardinalityMu.RUnlock()
return int64(l)
})

// Create stateful expressions
cn.expressions = make([]stateful.Expression, len(n.Lambdas))
cn.scopePools = make([]stateful.ScopePool, len(n.Lambdas))
Expand Down Expand Up @@ -72,6 +64,13 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) }
func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

func (n *CombineNode) runCombine([]byte) error {
n.nodeCardinality.ValueF = func() int64 {
n.cardinalityMu.RLock()
l := len(n.expressionsByGroup)
n.cardinalityMu.RUnlock()
return int64(l)
}

switch n.Wants() {
case pipeline.StreamEdge:
buffers := make(map[models.GroupID]*buffer)
Expand Down
5 changes: 2 additions & 3 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -31,12 +30,12 @@ func (d *DerivativeNode) runDerivative([]byte) error {
case pipeline.StreamEdge:
var mu sync.RWMutex
previous := make(map[models.GroupID]models.Point)
d.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
d.nodeCardinality.ValueF = func() int64 {
mu.RLock()
l := len(previous)
mu.RUnlock()
return int64(l)
})
}
for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
mu.RLock()
Expand Down
12 changes: 6 additions & 6 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
e: n,
expressionsByGroup: make(map[models.GroupID][]stateful.Expression),
}
en.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
en.cardinalityMu.RLock()
l := len(en.expressionsByGroup)
en.cardinalityMu.RUnlock()
return int64(l)
})

// Create stateful expressions
en.expressions = make([]stateful.Expression, len(n.Lambdas))
Expand Down Expand Up @@ -75,6 +69,12 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
e.nodeCardinality.ValueF = func() int64 {
e.cardinalityMu.RLock()
l := len(e.expressionsByGroup)
e.cardinalityMu.RUnlock()
return int64(l)
}
switch e.Provides() {
case pipeline.StreamEdge:
var err error
Expand Down
3 changes: 3 additions & 0 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (v *IntFuncGauge) Add(delta int64) {}
func (v *IntFuncGauge) Set(value int64) {}

func (v *IntFuncGauge) IntValue() int64 {
if v == nil || v.ValueF == nil {
return 0
}
return v.ValueF()
}

Expand Down
9 changes: 4 additions & 5 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -54,12 +53,12 @@ func (n *FlattenNode) runFlatten([]byte) error {
switch n.Wants() {
case pipeline.StreamEdge:
flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer)
n.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
n.nodeCardinality.ValueF = func() int64 {
mu.RLock()
l := len(flattenBuffers)
mu.RUnlock()
return int64(l)
})
}

for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() {
n.timer.Start()
Expand Down Expand Up @@ -117,12 +116,12 @@ func (n *FlattenNode) runFlatten([]byte) error {
}
case pipeline.BatchEdge:
allBuffers := make(map[models.GroupID]*flattenBatchBuffer)
n.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
n.nodeCardinality.ValueF = func() int64 {
mu.RLock()
l := len(allBuffers)
mu.RUnlock()
return int64(l)
})
}
for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
t := b.TMax.Round(n.f.Tolerance)
Expand Down
14 changes: 7 additions & 7 deletions http_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path"
"sync"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httpd"
Expand All @@ -31,12 +30,6 @@ func newHTTPOutNode(et *ExecutingTask, n *pipeline.HTTPOutNode, l *log.Logger) (
groupSeriesIdx: make(map[models.GroupID]int),
result: new(models.Result),
}
hn.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
hn.mu.RLock()
l := len(hn.groupSeriesIdx)
hn.mu.RUnlock()
return int64(l)
})
et.registerOutput(hn.c.Endpoint, hn)
hn.node.runF = hn.runOut
hn.node.stopF = hn.stopOut
Expand All @@ -48,6 +41,13 @@ func (h *HTTPOutNode) Endpoint() string {
}

func (h *HTTPOutNode) runOut([]byte) error {
h.nodeCardinality.ValueF = func() int64 {
h.mu.RLock()
l := len(h.groupSeriesIdx)
h.mu.RUnlock()
return int64(l)
}

hndl := func(w http.ResponseWriter, req *http.Request) {
h.mu.RLock()
defer h.mu.RUnlock()
Expand Down
5 changes: 2 additions & 3 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/pkg/errors"
Expand Down Expand Up @@ -74,12 +73,12 @@ func (c *baseReduceContext) Time() time.Time {
func (n *InfluxQLNode) runStreamInfluxQL() error {
var mu sync.RWMutex
contexts := make(map[models.GroupID]reduceContext)
n.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
n.nodeCardinality.ValueF = func() int64 {
mu.RLock()
l := len(contexts)
mu.RUnlock()
return int64(l)
})
}

for p, ok := n.ins[0].NextPoint(); ok; {
n.timer.Start()
Expand Down
4 changes: 2 additions & 2 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode, l *log.Logger) (*JoinN

func (j *JoinNode) runJoin([]byte) error {
j.groups = make(map[models.GroupID]*group)
j.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
j.nodeCardinality.ValueF = func() int64 {
j.cardinalityMu.RLock()
l := len(j.groups)
j.cardinalityMu.RUnlock()
return int64(l)
})
}

groupErrs := make(chan error, 1)
done := make(chan struct{}, len(j.ins))
Expand Down
13 changes: 7 additions & 6 deletions k8s_autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,19 @@ func newK8sAutoscaleNode(et *ExecutingTask, n *pipeline.K8sAutoscaleNode, l *log
// Initialize the replicas lambda expression scope pool
if n.Replicas != nil {
kn.replicasExprs = make(map[models.GroupID]stateful.Expression)
kn.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
kn.cardinalityMu.RLock()
l := len(kn.replicasExprs)
kn.cardinalityMu.RUnlock()
return int64(l)
})
kn.replicasScopePool = stateful.NewScopePool(stateful.FindReferenceVariables(n.Replicas.Expression))
}
return kn, nil
}

func (k *K8sAutoscaleNode) runAutoscale([]byte) error {
k.nodeCardinality.ValueF = func() int64 {
k.cardinalityMu.RLock()
l := len(k.replicasExprs)
k.cardinalityMu.RUnlock()
return int64(l)
}

k.increaseCount = &expvar.Int{}
k.decreaseCount = &expvar.Int{}
k.cooldownDropsCount = &expvar.Int{}
Expand Down
5 changes: 3 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

const (
statErrorCount = "errors"
statErrorCount = "errors"
// TODO: Change to working_cardinality
statsCardinalityGauge = "cardinality"
statAverageExecTime = "avg_exec_time_ns"
)
Expand Down Expand Up @@ -111,7 +112,7 @@ func (n *node) init() {
n.statMap.Set(statAverageExecTime, avgExecVar)
n.nodeErrors = &kexpvar.Int{}
n.statMap.Set(statErrorCount, n.nodeErrors)
n.nodeCardinality = kexpvar.NewIntFuncGauge(func() int64 { return 0 })
n.nodeCardinality = kexpvar.NewIntFuncGauge(nil)
n.statMap.Set(statsCardinalityGauge, n.nodeCardinality)
n.timer = n.et.tm.TimingService.NewTimer(avgExecVar)
n.errCh = make(chan error, 1)
Expand Down
14 changes: 7 additions & 7 deletions sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -29,12 +28,6 @@ func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode, l *log.Logger) (*S
counts: make(map[models.GroupID]int64),
duration: n.Duration,
}
sn.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
sn.cardinalityMu.RLock()
l := len(sn.counts)
sn.cardinalityMu.RUnlock()
return int64(l)
})
sn.node.runF = sn.runSample
if n.Duration == 0 && n.N == 0 {
return nil, errors.New("invalid sample rate: must be positive integer or duration")
Expand All @@ -43,6 +36,13 @@ func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode, l *log.Logger) (*S
}

func (s *SampleNode) runSample([]byte) error {
s.nodeCardinality.ValueF = func() int64 {
s.cardinalityMu.RLock()
l := len(s.counts)
s.cardinalityMu.RUnlock()
return int64(l)
}

switch s.Wants() {
case pipeline.StreamEdge:
for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() {
Expand Down
5 changes: 2 additions & 3 deletions where.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"sync"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
Expand Down Expand Up @@ -38,12 +37,12 @@ func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, l *log.Logger) (wn *

func (w *WhereNode) runWhere(snapshot []byte) error {
var mu sync.RWMutex
w.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
w.nodeCardinality.ValueF = func() int64 {
mu.RLock()
l := len(w.expressions)
mu.RUnlock()
return int64(l)
})
}

switch w.Wants() {
case pipeline.StreamEdge:
Expand Down
6 changes: 3 additions & 3 deletions window.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

kexpvar "github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -34,12 +33,13 @@ type window interface {
func (w *WindowNode) runWindow([]byte) error {
var mu sync.RWMutex
windows := make(map[models.GroupID]window)
w.nodeCardinality = kexpvar.NewIntFuncGauge(func() int64 {
w.nodeCardinality.ValueF = func() int64 {
mu.RLock()
l := len(windows)
mu.RUnlock()
return int64(l)
})
}

// Loops through points windowing by group
for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() {
w.timer.Start()
Expand Down

0 comments on commit a4bbdb9

Please sign in to comment.