From ed190fa3c73993bbac48c200683b97d2bda87c90 Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Tue, 28 Feb 2017 14:07:48 -0500 Subject: [PATCH] Remove nodeCardinality and mutex from node struct --- alert.go | 6 +++--- combine.go | 6 +++--- derivative.go | 7 ++++--- eval.go | 6 +++--- expvar/expvar.go | 10 ++-------- flatten.go | 12 ++++++------ group_by.go | 7 ++++--- http_out.go | 6 +++--- influxql.go | 6 +++--- join.go | 5 ++--- k8s_autoscale.go | 5 ++--- node.go | 10 ++++------ sample.go | 6 +++--- where.go | 6 +++--- window.go | 6 +++--- 15 files changed, 48 insertions(+), 56 deletions(-) diff --git a/alert.go b/alert.go index 1438ec5cc..d047fb326 100644 --- a/alert.go +++ b/alert.go @@ -450,14 +450,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * } func (a *AlertNode) runAlert([]byte) error { - a.statMu.Lock() - a.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { a.cardinalityMu.RLock() l := len(a.states) a.cardinalityMu.RUnlock() return int64(l) } - a.statMu.Unlock() + a.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + // Register delete hook if a.hasAnonTopic() { a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic)) diff --git a/combine.go b/combine.go index 76763eab8..8deea0e0f 100644 --- a/combine.go +++ b/combine.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/stateful" @@ -64,14 +65,13 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) } func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (n *CombineNode) runCombine([]byte) error { - n.statMu.Lock() - n.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { n.cardinalityMu.RLock() l := len(n.expressionsByGroup) n.cardinalityMu.RUnlock() return int64(l) } - n.statMu.Unlock() + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) switch n.Wants() { case pipeline.StreamEdge: diff --git a/derivative.go b/derivative.go index 029978efb..ce415f9a4 100644 --- a/derivative.go +++ b/derivative.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -30,14 +31,14 @@ func (d *DerivativeNode) runDerivative([]byte) error { case pipeline.StreamEdge: var mu sync.RWMutex previous := make(map[models.GroupID]models.Point) - d.statMu.Lock() - d.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { mu.RLock() l := len(previous) mu.RUnlock() return int64(l) } - d.statMu.Unlock() + d.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() { d.timer.Start() mu.RLock() diff --git a/eval.go b/eval.go index 2a57c5aab..38bca3551 100644 --- a/eval.go +++ b/eval.go @@ -69,14 +69,14 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN } func (e *EvalNode) runEval(snapshot []byte) error { - e.statMu.Lock() - e.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { e.cardinalityMu.RLock() l := len(e.expressionsByGroup) e.cardinalityMu.RUnlock() return int64(l) } - e.statMu.Unlock() + e.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + switch e.Provides() { case pipeline.StreamEdge: var err error diff --git a/expvar/expvar.go b/expvar/expvar.go index 58e371905..a60451e66 100644 --- a/expvar/expvar.go +++ b/expvar/expvar.go @@ -49,7 +49,6 @@ func (v *Int) IntValue() int64 { // IntFuncGauge is a 64-bit integer variable that satisfies the expvar.Var interface. type IntFuncGauge struct { ValueF func() int64 - mu *sync.Mutex } func (v *IntFuncGauge) String() string { @@ -60,19 +59,14 @@ func (v *IntFuncGauge) Add(delta int64) {} func (v *IntFuncGauge) Set(value int64) {} func (v *IntFuncGauge) IntValue() int64 { - v.mu.Lock() - defer v.mu.Unlock() if v == nil || v.ValueF == nil { return 0 } return v.ValueF() } -func NewIntFuncGauge(fn func() int64, mu *sync.Mutex) *IntFuncGauge { - return &IntFuncGauge{ - ValueF: fn, - mu: mu, - } +func NewIntFuncGauge(fn func() int64) *IntFuncGauge { + return &IntFuncGauge{fn} } // IntSum is a 64-bit integer variable that consists of multiple different parts diff --git a/flatten.go b/flatten.go index 507f70d98..0beec49fe 100644 --- a/flatten.go +++ b/flatten.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -53,14 +54,13 @@ func (n *FlattenNode) runFlatten([]byte) error { switch n.Wants() { case pipeline.StreamEdge: flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer) - n.statMu.Lock() - n.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { mu.RLock() l := len(flattenBuffers) mu.RUnlock() return int64(l) } - n.statMu.Unlock() + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() { n.timer.Start() @@ -118,14 +118,14 @@ func (n *FlattenNode) runFlatten([]byte) error { } case pipeline.BatchEdge: allBuffers := make(map[models.GroupID]*flattenBatchBuffer) - n.statMu.Lock() - n.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { mu.RLock() l := len(allBuffers) mu.RUnlock() return int64(l) } - n.statMu.Unlock() + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() { n.timer.Start() t := b.TMax.Round(n.f.Tolerance) diff --git a/group_by.go b/group_by.go index 39ce8f5cb..f23972d5b 100644 --- a/group_by.go +++ b/group_by.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/ast" @@ -54,14 +55,14 @@ func (g *GroupByNode) runGroupBy([]byte) error { var mu sync.RWMutex var lastTime time.Time groups := make(map[models.GroupID]*models.Batch) - g.statMu.Lock() - g.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { mu.RLock() l := len(groups) mu.RUnlock() return int64(l) } - g.statMu.Unlock() + g.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for b, ok := g.ins[0].NextBatch(); ok; b, ok = g.ins[0].NextBatch() { g.timer.Start() if !b.TMax.Equal(lastTime) { diff --git a/http_out.go b/http_out.go index 089ba220b..d34ba1083 100644 --- a/http_out.go +++ b/http_out.go @@ -7,6 +7,7 @@ import ( "path" "sync" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/services/httpd" @@ -41,14 +42,13 @@ func (h *HTTPOutNode) Endpoint() string { } func (h *HTTPOutNode) runOut([]byte) error { - h.statMu.Lock() - h.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { h.mu.RLock() l := len(h.groupSeriesIdx) h.mu.RUnlock() return int64(l) } - h.statMu.Unlock() + h.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) hndl := func(w http.ResponseWriter, req *http.Request) { h.mu.RLock() diff --git a/influxql.go b/influxql.go index 15077d356..0d101f368 100644 --- a/influxql.go +++ b/influxql.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/pkg/errors" @@ -73,14 +74,13 @@ func (c *baseReduceContext) Time() time.Time { func (n *InfluxQLNode) runStreamInfluxQL() error { var mu sync.RWMutex contexts := make(map[models.GroupID]reduceContext) - n.statMu.Lock() - n.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { mu.RLock() l := len(contexts) mu.RUnlock() return int64(l) } - n.statMu.Unlock() + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) for p, ok := n.ins[0].NextPoint(); ok; { n.timer.Start() diff --git a/join.go b/join.go index 84c2e357b..1f828d7a3 100644 --- a/join.go +++ b/join.go @@ -68,14 +68,13 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode, l *log.Logger) (*JoinN func (j *JoinNode) runJoin([]byte) error { j.groups = make(map[models.GroupID]*group) - j.statMu.Lock() - j.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { j.cardinalityMu.RLock() l := len(j.groups) j.cardinalityMu.RUnlock() return int64(l) } - j.statMu.Unlock() + j.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) groupErrs := make(chan error, 1) done := make(chan struct{}, len(j.ins)) diff --git a/k8s_autoscale.go b/k8s_autoscale.go index b952425e5..add3829e0 100644 --- a/k8s_autoscale.go +++ b/k8s_autoscale.go @@ -69,14 +69,13 @@ func newK8sAutoscaleNode(et *ExecutingTask, n *pipeline.K8sAutoscaleNode, l *log } func (k *K8sAutoscaleNode) runAutoscale([]byte) error { - k.statMu.Lock() - k.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { k.cardinalityMu.RLock() l := len(k.replicasExprs) k.cardinalityMu.RUnlock() return int64(l) } - k.statMu.Unlock() + k.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) k.increaseCount = &expvar.Int{} k.decreaseCount = &expvar.Int{} diff --git a/node.go b/node.go index 1fb79568e..3f37e2e70 100644 --- a/node.go +++ b/node.go @@ -19,9 +19,9 @@ import ( ) const ( - statErrorCount = "errors" - statsCardinalityGauge = "working_cardinality" - statAverageExecTime = "avg_exec_time_ns" + statErrorCount = "errors" + statCardinalityGauge = "working_cardinality" + statAverageExecTime = "avg_exec_time_ns" ) // A node that can be in an executor. @@ -84,7 +84,6 @@ type node struct { timer timer.Timer statsKey string statMap *kexpvar.Map - statMu sync.Mutex nodeErrors *kexpvar.Int nodeCardinality *kexpvar.IntFuncGauge @@ -112,8 +111,7 @@ func (n *node) init() { n.statMap.Set(statAverageExecTime, avgExecVar) n.nodeErrors = &kexpvar.Int{} n.statMap.Set(statErrorCount, n.nodeErrors) - n.nodeCardinality = kexpvar.NewIntFuncGauge(nil, &n.statMu) - n.statMap.Set(statsCardinalityGauge, n.nodeCardinality) + n.statMap.Set(statCardinalityGauge, kexpvar.NewIntFuncGauge(nil)) n.timer = n.et.tm.TimingService.NewTimer(avgExecVar) n.errCh = make(chan error, 1) } diff --git a/sample.go b/sample.go index 30c58ad21..cccd8fce4 100644 --- a/sample.go +++ b/sample.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -36,14 +37,13 @@ func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode, l *log.Logger) (*S } func (s *SampleNode) runSample([]byte) error { - s.statMu.Lock() - s.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { s.cardinalityMu.RLock() l := len(s.counts) s.cardinalityMu.RUnlock() return int64(l) } - s.statMu.Unlock() + s.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) switch s.Wants() { case pipeline.StreamEdge: diff --git a/where.go b/where.go index 333933f54..1bf727209 100644 --- a/where.go +++ b/where.go @@ -6,6 +6,7 @@ import ( "log" "sync" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/stateful" @@ -37,14 +38,13 @@ func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, l *log.Logger) (wn * func (w *WhereNode) runWhere(snapshot []byte) error { var mu sync.RWMutex - w.statMu.Lock() - w.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { mu.RLock() l := len(w.expressions) mu.RUnlock() return int64(l) } - w.statMu.Unlock() + w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) switch w.Wants() { case pipeline.StreamEdge: diff --git a/window.go b/window.go index 9aa8c4bcf..e4f95d893 100644 --- a/window.go +++ b/window.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -33,14 +34,13 @@ type window interface { func (w *WindowNode) runWindow([]byte) error { var mu sync.RWMutex windows := make(map[models.GroupID]window) - w.statMu.Lock() - w.nodeCardinality.ValueF = func() int64 { + valueF := func() int64 { mu.RLock() l := len(windows) mu.RUnlock() return int64(l) } - w.statMu.Unlock() + w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) // Loops through points windowing by group for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() {