Skip to content

Commit

Permalink
Remove nodeCardinality and mutex from node struct
Browse files Browse the repository at this point in the history
  • Loading branch information
desa committed Mar 2, 2017
1 parent 3b815bc commit ed190fa
Show file tree
Hide file tree
Showing 15 changed files with 48 additions and 56 deletions.
6 changes: 3 additions & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
a.statMu.Lock()
a.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
a.cardinalityMu.RLock()
l := len(a.states)
a.cardinalityMu.RUnlock()
return int64(l)
}
a.statMu.Unlock()
a.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

// Register delete hook
if a.hasAnonTopic() {
a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic))
Expand Down
6 changes: 3 additions & 3 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
Expand Down Expand Up @@ -64,14 +65,13 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) }
func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

func (n *CombineNode) runCombine([]byte) error {
n.statMu.Lock()
n.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
n.cardinalityMu.RLock()
l := len(n.expressionsByGroup)
n.cardinalityMu.RUnlock()
return int64(l)
}
n.statMu.Unlock()
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch n.Wants() {
case pipeline.StreamEdge:
Expand Down
7 changes: 4 additions & 3 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -30,14 +31,14 @@ func (d *DerivativeNode) runDerivative([]byte) error {
case pipeline.StreamEdge:
var mu sync.RWMutex
previous := make(map[models.GroupID]models.Point)
d.statMu.Lock()
d.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
mu.RLock()
l := len(previous)
mu.RUnlock()
return int64(l)
}
d.statMu.Unlock()
d.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
mu.RLock()
Expand Down
6 changes: 3 additions & 3 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
e.statMu.Lock()
e.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
e.cardinalityMu.RLock()
l := len(e.expressionsByGroup)
e.cardinalityMu.RUnlock()
return int64(l)
}
e.statMu.Unlock()
e.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch e.Provides() {
case pipeline.StreamEdge:
var err error
Expand Down
10 changes: 2 additions & 8 deletions expvar/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (v *Int) IntValue() int64 {
// IntFuncGauge is a 64-bit integer variable that satisfies the expvar.Var interface.
type IntFuncGauge struct {
ValueF func() int64
mu *sync.Mutex
}

func (v *IntFuncGauge) String() string {
Expand All @@ -60,19 +59,14 @@ func (v *IntFuncGauge) Add(delta int64) {}
func (v *IntFuncGauge) Set(value int64) {}

func (v *IntFuncGauge) IntValue() int64 {
v.mu.Lock()
defer v.mu.Unlock()
if v == nil || v.ValueF == nil {
return 0
}
return v.ValueF()
}

func NewIntFuncGauge(fn func() int64, mu *sync.Mutex) *IntFuncGauge {
return &IntFuncGauge{
ValueF: fn,
mu: mu,
}
func NewIntFuncGauge(fn func() int64) *IntFuncGauge {
return &IntFuncGauge{fn}
}

// IntSum is a 64-bit integer variable that consists of multiple different parts
Expand Down
12 changes: 6 additions & 6 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -53,14 +54,13 @@ func (n *FlattenNode) runFlatten([]byte) error {
switch n.Wants() {
case pipeline.StreamEdge:
flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer)
n.statMu.Lock()
n.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
mu.RLock()
l := len(flattenBuffers)
mu.RUnlock()
return int64(l)
}
n.statMu.Unlock()
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() {
n.timer.Start()
Expand Down Expand Up @@ -118,14 +118,14 @@ func (n *FlattenNode) runFlatten([]byte) error {
}
case pipeline.BatchEdge:
allBuffers := make(map[models.GroupID]*flattenBatchBuffer)
n.statMu.Lock()
n.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
mu.RLock()
l := len(allBuffers)
mu.RUnlock()
return int64(l)
}
n.statMu.Unlock()
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
t := b.TMax.Round(n.f.Tolerance)
Expand Down
7 changes: 4 additions & 3 deletions group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
Expand Down Expand Up @@ -54,14 +55,14 @@ func (g *GroupByNode) runGroupBy([]byte) error {
var mu sync.RWMutex
var lastTime time.Time
groups := make(map[models.GroupID]*models.Batch)
g.statMu.Lock()
g.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
mu.RLock()
l := len(groups)
mu.RUnlock()
return int64(l)
}
g.statMu.Unlock()
g.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for b, ok := g.ins[0].NextBatch(); ok; b, ok = g.ins[0].NextBatch() {
g.timer.Start()
if !b.TMax.Equal(lastTime) {
Expand Down
6 changes: 3 additions & 3 deletions http_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path"
"sync"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httpd"
Expand Down Expand Up @@ -41,14 +42,13 @@ func (h *HTTPOutNode) Endpoint() string {
}

func (h *HTTPOutNode) runOut([]byte) error {
h.statMu.Lock()
h.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
h.mu.RLock()
l := len(h.groupSeriesIdx)
h.mu.RUnlock()
return int64(l)
}
h.statMu.Unlock()
h.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

hndl := func(w http.ResponseWriter, req *http.Request) {
h.mu.RLock()
Expand Down
6 changes: 3 additions & 3 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/pkg/errors"
Expand Down Expand Up @@ -73,14 +74,13 @@ func (c *baseReduceContext) Time() time.Time {
func (n *InfluxQLNode) runStreamInfluxQL() error {
var mu sync.RWMutex
contexts := make(map[models.GroupID]reduceContext)
n.statMu.Lock()
n.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
mu.RLock()
l := len(contexts)
mu.RUnlock()
return int64(l)
}
n.statMu.Unlock()
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

for p, ok := n.ins[0].NextPoint(); ok; {
n.timer.Start()
Expand Down
5 changes: 2 additions & 3 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,13 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode, l *log.Logger) (*JoinN

func (j *JoinNode) runJoin([]byte) error {
j.groups = make(map[models.GroupID]*group)
j.statMu.Lock()
j.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
j.cardinalityMu.RLock()
l := len(j.groups)
j.cardinalityMu.RUnlock()
return int64(l)
}
j.statMu.Unlock()
j.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

groupErrs := make(chan error, 1)
done := make(chan struct{}, len(j.ins))
Expand Down
5 changes: 2 additions & 3 deletions k8s_autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,13 @@ func newK8sAutoscaleNode(et *ExecutingTask, n *pipeline.K8sAutoscaleNode, l *log
}

func (k *K8sAutoscaleNode) runAutoscale([]byte) error {
k.statMu.Lock()
k.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
k.cardinalityMu.RLock()
l := len(k.replicasExprs)
k.cardinalityMu.RUnlock()
return int64(l)
}
k.statMu.Unlock()
k.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

k.increaseCount = &expvar.Int{}
k.decreaseCount = &expvar.Int{}
Expand Down
10 changes: 4 additions & 6 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
)

const (
statErrorCount = "errors"
statsCardinalityGauge = "working_cardinality"
statAverageExecTime = "avg_exec_time_ns"
statErrorCount = "errors"
statCardinalityGauge = "working_cardinality"
statAverageExecTime = "avg_exec_time_ns"
)

// A node that can be in an executor.
Expand Down Expand Up @@ -84,7 +84,6 @@ type node struct {
timer timer.Timer
statsKey string
statMap *kexpvar.Map
statMu sync.Mutex

nodeErrors *kexpvar.Int
nodeCardinality *kexpvar.IntFuncGauge
Expand Down Expand Up @@ -112,8 +111,7 @@ func (n *node) init() {
n.statMap.Set(statAverageExecTime, avgExecVar)
n.nodeErrors = &kexpvar.Int{}
n.statMap.Set(statErrorCount, n.nodeErrors)
n.nodeCardinality = kexpvar.NewIntFuncGauge(nil, &n.statMu)
n.statMap.Set(statsCardinalityGauge, n.nodeCardinality)
n.statMap.Set(statCardinalityGauge, kexpvar.NewIntFuncGauge(nil))
n.timer = n.et.tm.TimingService.NewTimer(avgExecVar)
n.errCh = make(chan error, 1)
}
Expand Down
6 changes: 3 additions & 3 deletions sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand Down Expand Up @@ -36,14 +37,13 @@ func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode, l *log.Logger) (*S
}

func (s *SampleNode) runSample([]byte) error {
s.statMu.Lock()
s.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
s.cardinalityMu.RLock()
l := len(s.counts)
s.cardinalityMu.RUnlock()
return int64(l)
}
s.statMu.Unlock()
s.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch s.Wants() {
case pipeline.StreamEdge:
Expand Down
6 changes: 3 additions & 3 deletions where.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"sync"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/stateful"
Expand Down Expand Up @@ -37,14 +38,13 @@ func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, l *log.Logger) (wn *

func (w *WhereNode) runWhere(snapshot []byte) error {
var mu sync.RWMutex
w.statMu.Lock()
w.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
mu.RLock()
l := len(w.expressions)
mu.RUnlock()
return int64(l)
}
w.statMu.Unlock()
w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

switch w.Wants() {
case pipeline.StreamEdge:
Expand Down
6 changes: 3 additions & 3 deletions window.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)
Expand All @@ -33,14 +34,13 @@ type window interface {
func (w *WindowNode) runWindow([]byte) error {
var mu sync.RWMutex
windows := make(map[models.GroupID]window)
w.statMu.Lock()
w.nodeCardinality.ValueF = func() int64 {
valueF := func() int64 {
mu.RLock()
l := len(windows)
mu.RUnlock()
return int64(l)
}
w.statMu.Unlock()
w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

// Loops through points windowing by group
for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() {
Expand Down

0 comments on commit ed190fa

Please sign in to comment.