From 7e29e8f6d2f90505e2a48fe90ac6de86c5912da6 Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Mon, 6 Mar 2017 11:26:58 -0500 Subject: [PATCH] Revert "WIP: Add cardinality stat to each node type." --- CHANGELOG.md | 1 - alert.go | 25 +- combine.go | 17 - derivative.go | 17 - eval.go | 19 - expvar/expvar.go | 23 - flatten.go | 26 - group_by.go | 21 - http_out.go | 8 - influxql.go | 17 - integrations/data/TestStream_Cardinality.srpl | 270 ------- integrations/streamer_test.go | 708 ------------------ join.go | 19 +- k8s_autoscale.go | 13 - node.go | 6 +- sample.go | 14 - server/server_test.go | 8 +- where.go | 19 - window.go | 15 - 19 files changed, 10 insertions(+), 1236 deletions(-) delete mode 100644 integrations/data/TestStream_Cardinality.srpl diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a6052633..c18157f1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,6 @@ Renamed `eval_errors` to `errors` in eval node. - [#922](https://github.com/influxdata/kapacitor/issues/922): Expose server specific information in alert templates. - [#1162](https://github.com/influxdata/kapacitor/pulls/1162): Add Pushover integration. -- [#1221](https://github.com/influxdata/kapacitor/pull/1221): Add `working_cardinality` stat to each node type that tracks the number of groups per node. ### Bugfixes diff --git a/alert.go b/alert.go index 3f2f847a3..8be0522be 100644 --- a/alert.go +++ b/alert.go @@ -58,8 +58,6 @@ type AlertNode struct { messageTmpl *text.Template detailsTmpl *html.Template - statesMu sync.RWMutex - alertsTriggered *expvar.Int oksTriggered *expvar.Int infosTriggered *expvar.Int @@ -450,14 +448,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * } func (a *AlertNode) runAlert([]byte) error { - valueF := func() int64 { - a.statesMu.RLock() - l := len(a.states) - a.statesMu.RUnlock() - return int64(l) - } - a.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - // Register delete hook if a.hasAnonTopic() { a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic)) @@ -498,7 +488,7 @@ func (a *AlertNode) runAlert([]byte) error { return err } var currentLevel alert.Level - if state, ok := a.getAlertState(p.Group); ok { + if state, ok := a.states[p.Group]; ok { currentLevel = state.currentLevel() } else { // Check for previous state @@ -590,7 +580,7 @@ func (a *AlertNode) runAlert([]byte) error { var highestPoint *models.BatchPoint var currentLevel alert.Level - if state, ok := a.getAlertState(b.Group); ok { + if state, ok := a.states[b.Group]; ok { currentLevel = state.currentLevel() } else { // Check for previous state @@ -944,14 +934,12 @@ func (a *alertState) percentChange() float64 { } func (a *AlertNode) updateState(t time.Time, level alert.Level, group models.GroupID) *alertState { - state, ok := a.getAlertState(group) + state, ok := a.states[group] if !ok { state = &alertState{ history: make([]alert.Level, a.a.History), } - a.statesMu.Lock() a.states[group] = state - a.statesMu.Unlock() } state.addEvent(level) @@ -1086,10 +1074,3 @@ func (a *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group details := tmpBuffer.String() return msg, details, nil } - -func (a *AlertNode) getAlertState(id models.GroupID) (state *alertState, ok bool) { - a.statesMu.RLock() - state, ok = a.states[id] - a.statesMu.RUnlock() - return state, ok -} diff --git a/combine.go b/combine.go index a4cec6df0..76a0b80b3 100644 --- a/combine.go +++ b/combine.go @@ -4,10 +4,8 @@ import ( "fmt" "log" "sort" - "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/stateful" @@ -21,8 +19,6 @@ type CombineNode struct { expressionsByGroup map[models.GroupID][]stateful.Expression scopePools []stateful.ScopePool - expressionsByGroupMu sync.RWMutex - combination combination } @@ -34,7 +30,6 @@ func newCombineNode(et *ExecutingTask, n *pipeline.CombineNode, l *log.Logger) ( expressionsByGroup: make(map[models.GroupID][]stateful.Expression), combination: combination{max: n.Max}, } - // Create stateful expressions cn.expressions = make([]stateful.Expression, len(n.Lambdas)) cn.scopePools = make([]stateful.ScopePool, len(n.Lambdas)) @@ -65,14 +60,6 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) } func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (n *CombineNode) runCombine([]byte) error { - valueF := func() int64 { - n.expressionsByGroupMu.RLock() - l := len(n.expressionsByGroup) - n.expressionsByGroupMu.RUnlock() - return int64(l) - } - n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - switch n.Wants() { case pipeline.StreamEdge: buffers := make(map[models.GroupID]*buffer) @@ -175,17 +162,13 @@ func (n *CombineNode) combineBuffer(buf *buffer) error { return nil } l := len(n.expressions) - n.expressionsByGroupMu.RLock() expressions, ok := n.expressionsByGroup[buf.Group] - n.expressionsByGroupMu.RUnlock() if !ok { expressions = make([]stateful.Expression, l) for i, expr := range n.expressions { expressions[i] = expr.CopyReset() } - n.expressionsByGroupMu.Lock() n.expressionsByGroup[buf.Group] = expressions - n.expressionsByGroupMu.Unlock() } // Compute matching result for all points diff --git a/derivative.go b/derivative.go index ce415f9a4..1de31ac69 100644 --- a/derivative.go +++ b/derivative.go @@ -2,10 +2,8 @@ package kapacitor import ( "log" - "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -29,25 +27,12 @@ func newDerivativeNode(et *ExecutingTask, n *pipeline.DerivativeNode, l *log.Log func (d *DerivativeNode) runDerivative([]byte) error { switch d.Provides() { case pipeline.StreamEdge: - var mu sync.RWMutex previous := make(map[models.GroupID]models.Point) - valueF := func() int64 { - mu.RLock() - l := len(previous) - mu.RUnlock() - return int64(l) - } - d.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() { d.timer.Start() - mu.RLock() pr, ok := previous[p.Group] - mu.RUnlock() if !ok { - mu.Lock() previous[p.Group] = p - mu.Unlock() d.timer.Stop() continue } @@ -66,9 +51,7 @@ func (d *DerivativeNode) runDerivative([]byte) error { } d.timer.Resume() } - mu.Lock() previous[p.Group] = p - mu.Unlock() d.timer.Stop() } case pipeline.BatchEdge: diff --git a/eval.go b/eval.go index 965c79fce..6c828db0b 100644 --- a/eval.go +++ b/eval.go @@ -4,10 +4,8 @@ import ( "errors" "fmt" "log" - "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/ast" @@ -22,10 +20,6 @@ type EvalNode struct { refVarList [][]string scopePool stateful.ScopePool tags map[string]bool - - expressionsByGroupMu sync.RWMutex - - evalErrors *expvar.Int } // Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point. @@ -38,7 +32,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN e: n, expressionsByGroup: make(map[models.GroupID][]stateful.Expression), } - // Create stateful expressions en.expressions = make([]stateful.Expression, len(n.Lambdas)) en.refVarList = make([][]string, len(n.Lambdas)) @@ -69,14 +62,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN } func (e *EvalNode) runEval(snapshot []byte) error { - valueF := func() int64 { - e.expressionsByGroupMu.RLock() - l := len(e.expressionsByGroup) - e.expressionsByGroupMu.RUnlock() - return int64(l) - } - e.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - switch e.Provides() { case pipeline.StreamEdge: var err error @@ -133,17 +118,13 @@ func (e *EvalNode) runEval(snapshot []byte) error { func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags models.Tags) (models.Fields, models.Tags, error) { vars := e.scopePool.Get() defer e.scopePool.Put(vars) - e.expressionsByGroupMu.RLock() expressions, ok := e.expressionsByGroup[group] - e.expressionsByGroupMu.RUnlock() if !ok { expressions = make([]stateful.Expression, len(e.expressions)) for i, exp := range e.expressions { expressions[i] = exp.CopyReset() } - e.expressionsByGroupMu.Lock() e.expressionsByGroup[group] = expressions - e.expressionsByGroupMu.Unlock() } for i, expr := range expressions { err := fillScope(vars, e.refVarList[i], now, fields, tags) diff --git a/expvar/expvar.go b/expvar/expvar.go index a60451e66..8b0e49ff2 100644 --- a/expvar/expvar.go +++ b/expvar/expvar.go @@ -46,29 +46,6 @@ func (v *Int) IntValue() int64 { return atomic.LoadInt64(&v.i) } -// IntFuncGauge is a 64-bit integer variable that satisfies the expvar.Var interface. -type IntFuncGauge struct { - ValueF func() int64 -} - -func (v *IntFuncGauge) String() string { - return strconv.FormatInt(v.IntValue(), 10) -} - -func (v *IntFuncGauge) Add(delta int64) {} -func (v *IntFuncGauge) Set(value int64) {} - -func (v *IntFuncGauge) IntValue() int64 { - if v == nil || v.ValueF == nil { - return 0 - } - return v.ValueF() -} - -func NewIntFuncGauge(fn func() int64) *IntFuncGauge { - return &IntFuncGauge{fn} -} - // IntSum is a 64-bit integer variable that consists of multiple different parts // and satisfies the expvar.Var interface. // The value of the var is the sum of all its parts. diff --git a/flatten.go b/flatten.go index 0beec49fe..32dae5907 100644 --- a/flatten.go +++ b/flatten.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -50,24 +49,13 @@ type flattenBatchBuffer struct { } func (n *FlattenNode) runFlatten([]byte) error { - var mu sync.RWMutex switch n.Wants() { case pipeline.StreamEdge: flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer) - valueF := func() int64 { - mu.RLock() - l := len(flattenBuffers) - mu.RUnlock() - return int64(l) - } - n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() { n.timer.Start() t := p.Time.Round(n.f.Tolerance) - mu.RLock() currentBuf, ok := flattenBuffers[p.Group] - mu.RUnlock() if !ok { currentBuf = &flattenStreamBuffer{ Time: t, @@ -76,9 +64,7 @@ func (n *FlattenNode) runFlatten([]byte) error { Dimensions: p.Dimensions, Tags: p.PointTags(), } - mu.Lock() flattenBuffers[p.Group] = currentBuf - mu.Unlock() } rp := models.RawPoint{ Time: t, @@ -118,20 +104,10 @@ func (n *FlattenNode) runFlatten([]byte) error { } case pipeline.BatchEdge: allBuffers := make(map[models.GroupID]*flattenBatchBuffer) - valueF := func() int64 { - mu.RLock() - l := len(allBuffers) - mu.RUnlock() - return int64(l) - } - n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() { n.timer.Start() t := b.TMax.Round(n.f.Tolerance) - mu.RLock() currentBuf, ok := allBuffers[b.Group] - mu.RUnlock() if !ok { currentBuf = &flattenBatchBuffer{ Time: t, @@ -140,9 +116,7 @@ func (n *FlattenNode) runFlatten([]byte) error { Tags: b.Tags, Points: make(map[time.Time][]models.RawPoint), } - mu.Lock() allBuffers[b.Group] = currentBuf - mu.Unlock() } if !t.Equal(currentBuf.Time) { // Flatten/Emit old buffer diff --git a/group_by.go b/group_by.go index f23972d5b..eaf9fd399 100644 --- a/group_by.go +++ b/group_by.go @@ -3,10 +3,8 @@ package kapacitor import ( "log" "sort" - "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/ast" @@ -52,23 +50,13 @@ func (g *GroupByNode) runGroupBy([]byte) error { } } default: - var mu sync.RWMutex var lastTime time.Time groups := make(map[models.GroupID]*models.Batch) - valueF := func() int64 { - mu.RLock() - l := len(groups) - mu.RUnlock() - return int64(l) - } - g.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - for b, ok := g.ins[0].NextBatch(); ok; b, ok = g.ins[0].NextBatch() { g.timer.Start() if !b.TMax.Equal(lastTime) { lastTime = b.TMax // Emit all groups - mu.RLock() for id, group := range groups { for _, child := range g.outs { err := child.CollectBatch(*group) @@ -76,14 +64,9 @@ func (g *GroupByNode) runGroupBy([]byte) error { return err } } - mu.RUnlock() - mu.Lock() // Remove from groups delete(groups, id) - mu.Unlock() - mu.RLock() } - mu.RUnlock() } for _, p := range b.Points { if g.allDimensions { @@ -92,9 +75,7 @@ func (g *GroupByNode) runGroupBy([]byte) error { dims.TagNames = g.dimensions } groupID := models.ToGroupID(b.Name, p.Tags, dims) - mu.RLock() group, ok := groups[groupID] - mu.RUnlock() if !ok { tags := make(map[string]string, len(dims.TagNames)) for _, dim := range dims.TagNames { @@ -107,9 +88,7 @@ func (g *GroupByNode) runGroupBy([]byte) error { ByName: b.ByName, Tags: tags, } - mu.Lock() groups[groupID] = group - mu.Unlock() } group.Points = append(group.Points, p) } diff --git a/http_out.go b/http_out.go index d34ba1083..23654b740 100644 --- a/http_out.go +++ b/http_out.go @@ -7,7 +7,6 @@ import ( "path" "sync" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/services/httpd" @@ -42,13 +41,6 @@ func (h *HTTPOutNode) Endpoint() string { } func (h *HTTPOutNode) runOut([]byte) error { - valueF := func() int64 { - h.mu.RLock() - l := len(h.groupSeriesIdx) - h.mu.RUnlock() - return int64(l) - } - h.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) hndl := func(w http.ResponseWriter, req *http.Request) { h.mu.RLock() diff --git a/influxql.go b/influxql.go index 0d101f368..1e344de17 100644 --- a/influxql.go +++ b/influxql.go @@ -3,10 +3,8 @@ package kapacitor import ( "fmt" "log" - "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/pkg/errors" @@ -72,21 +70,10 @@ func (c *baseReduceContext) Time() time.Time { } func (n *InfluxQLNode) runStreamInfluxQL() error { - var mu sync.RWMutex contexts := make(map[models.GroupID]reduceContext) - valueF := func() int64 { - mu.RLock() - l := len(contexts) - mu.RUnlock() - return int64(l) - } - n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - for p, ok := n.ins[0].NextPoint(); ok; { n.timer.Start() - mu.RLock() context := contexts[p.Group] - mu.RUnlock() // Fisrt point in window if context == nil { // Create new context @@ -107,9 +94,7 @@ func (n *InfluxQLNode) runStreamInfluxQL() error { } context = createFn(c) - mu.Lock() contexts[p.Group] = context - mu.Unlock() } if n.isStreamTransformation { @@ -142,9 +127,7 @@ func (n *InfluxQLNode) runStreamInfluxQL() error { } // Nil out reduced point - mu.Lock() contexts[p.Group] = nil - mu.Unlock() // do not advance, // go through loop again to initialize new iterator. } diff --git a/integrations/data/TestStream_Cardinality.srpl b/integrations/data/TestStream_Cardinality.srpl deleted file mode 100644 index 1aed7d5aa..000000000 --- a/integrations/data/TestStream_Cardinality.srpl +++ /dev/null @@ -1,270 +0,0 @@ -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000001 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000002 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000003 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000004 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000005 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000006 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000007 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000008 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000009 -dbname -rpname -cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000010 -dbname -rpname -cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000010 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index c26f45788..215e0d043 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -8626,714 +8626,6 @@ topScores } } -func TestStream_DerivativeCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |derivative('usage_user') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "derivative2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_WhereCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |where(lambda: "host" == 'localhost') // replace with localhost -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "where2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_SampleCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |sample(2) -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "sample2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_WindowCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |window() - .period(1s) - .every(1s) -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "window2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_InfluxQLCardinalityStream(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |max('usage_user') - .as('max') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "max2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_InfluxQLCardinalityBatch(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |window() - .period(1s) - .every(1s) - |max('usage_user') - .as('max') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "window2": map[string]interface{}{ - "emitted": int64(81), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - "max3": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(0), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(81), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_EvalCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |eval(lambda: sigma("usage_user")) - .as('sigma') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "eval2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_FlattenCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |flatten() - .on('host','cpu') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "flatten2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_GroupByCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - |window() - .period(1s) - .every(1s) - |groupBy('cpu') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "window2": map[string]interface{}{ - "emitted": int64(9), - "working_cardinality": int64(1), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - "groupby3": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(9), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_AlertCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |alert() -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "alert2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - "warns_triggered": int64(0), - "crits_triggered": int64(0), - "alerts_triggered": int64(0), - "oks_triggered": int64(0), - "infos_triggered": int64(0), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_HTTPOutCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |httpOut('usage_user') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "http_out2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_K8sAutoscaleCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |k8sAutoscale() - .resourceName('a') - .replicas(lambda: int(0)) -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "k8s_autoscale2": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(9), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(90), - "increase_events": int64(1), - "decrease_events": int64(0), - "cooldown_drops": int64(0), - }, - } - - scaleUpdates := make(chan k8s.Scale, 100) - k8sAutoscale := k8sAutoscale{} - k8sAutoscale.ScalesGetFunc = func(kind, name string) (*k8s.Scale, error) { - var replicas int32 - switch name { - case "serviceA": - replicas = 1 - case "serviceB": - replicas = 10 - } - return &k8s.Scale{ - ObjectMeta: k8s.ObjectMeta{ - Name: name, - }, - Spec: k8s.ScaleSpec{ - Replicas: replicas, - }, - }, nil - } - k8sAutoscale.ScalesUpdateFunc = func(kind string, scale *k8s.Scale) error { - scaleUpdates <- *scale - return nil - } - tmInit := func(tm *kapacitor.TaskMaster) { - tm.K8sService = k8sAutoscale - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, tmInit) - close(scaleUpdates) -} - -func TestStream_JoinCardinality(t *testing.T) { - - var script = ` -var s1 = stream - |from() - .measurement('cpu') - .groupBy('host') - -var s2 = stream - |from() - .measurement('cpu') - .groupBy('cpu') - -s2|join(s1) - .as('s1','s2') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(180), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from2": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "join4": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(10), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(180), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_CombineCardinality(t *testing.T) { - - var script = ` -var s1 = stream - |from() - .measurement('cpu') - .groupBy('cpu','host') - |combine(lambda: TRUE, lambda: TRUE) - .as('total','true') -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "combine2": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(9), - "collected": int64(90), - "emitted": int64(0), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func TestStream_MixedCardinality(t *testing.T) { - - var script = ` -stream - |from() - .measurement('cpu') - .groupBy('host','cpu') - |where(lambda: "host" == 'localhost') - |eval(lambda: sigma("usage_user")) - .as('sigma') - |where(lambda: "cpu" == 'cpu-total' OR "cpu" == 'cpu0' OR "cpu" == 'cpu1') - |derivative('sigma') - |alert() -` - - // Expected Stats - es := map[string]map[string]interface{}{ - "stream0": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "from1": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(0), - "collected": int64(90), - "emitted": int64(90), - }, - "where2": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(9), - "collected": int64(90), - "emitted": int64(90), - }, - "eval3": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(9), - "collected": int64(90), - "emitted": int64(90), - }, - "where4": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(9), - "collected": int64(90), - "emitted": int64(30), - }, - "derivative5": map[string]interface{}{ - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "working_cardinality": int64(3), - "collected": int64(30), - "emitted": int64(27), - }, - "alert6": map[string]interface{}{ - "emitted": int64(0), - "working_cardinality": int64(3), - "avg_exec_time_ns": int64(0), - "errors": int64(0), - "collected": int64(27), - "warns_triggered": int64(0), - "crits_triggered": int64(0), - "alerts_triggered": int64(0), - "oks_triggered": int64(0), - "infos_triggered": int64(0), - }, - } - - testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) -} - -func testStreamerCardinality( - t *testing.T, - name, script string, - expectedStats map[string]map[string]interface{}, - tmInit func(tm *kapacitor.TaskMaster), -) { - clock, et, replayErr, tm := testStreamer(t, name, script, tmInit) - defer tm.Close() - - err := fastForwardTask(clock, et, replayErr, tm, 20*time.Second) - if err != nil { - t.Fatalf("Encountered error: %v", err) - } - stats, err := et.ExecutionStats() - if err != nil { - t.Fatalf("Encountered error: %v", err) - } - if !reflect.DeepEqual(expectedStats, stats.NodeStats) { - t.Errorf("got:\n%+v\n\nexp:\n%+v\n", stats.NodeStats, expectedStats) - } -} - // Helper test function for streamer func testStreamer( t *testing.T, diff --git a/join.go b/join.go index 552b5c68c..ff01fdc39 100644 --- a/join.go +++ b/join.go @@ -29,8 +29,6 @@ type JoinNode struct { // Represents the lower bound of times per group per parent lowMarks map[srcGroup]time.Time - groupsMu sync.RWMutex - reported map[int]bool allReported bool } @@ -67,14 +65,8 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode, l *log.Logger) (*JoinN } func (j *JoinNode) runJoin([]byte) error { + j.groups = make(map[models.GroupID]*group) - valueF := func() int64 { - j.groupsMu.RLock() - l := len(j.groups) - j.groupsMu.RUnlock() - return int64(l) - } - j.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) groupErrs := make(chan error, 1) done := make(chan struct{}, len(j.ins)) @@ -117,21 +109,16 @@ func (j *JoinNode) runJoin([]byte) error { } } // No more points are coming signal all groups to finish up. - j.groupsMu.RLock() for _, group := range j.groups { close(group.points) } - j.groupsMu.RUnlock() - j.runningGroups.Wait() - j.groupsMu.RLock() for _, group := range j.groups { err := group.emitAll() if err != nil { return err } } - j.groupsMu.RUnlock() return nil } @@ -282,15 +269,11 @@ func (j *JoinNode) sendSpecificPoint(specific srcPoint, groupErrs chan<- error) // safely get the group for the point or create one if it doesn't exist. func (j *JoinNode) getGroup(p models.PointInterface, groupErrs chan<- error) *group { - j.groupsMu.RLock() group := j.groups[p.PointGroup()] - j.groupsMu.RUnlock() if group == nil { group = newGroup(len(j.ins), j) - j.groupsMu.Lock() j.groups[p.PointGroup()] = group j.runningGroups.Add(1) - j.groupsMu.Unlock() go func() { err := group.run() if err != nil { diff --git a/k8s_autoscale.go b/k8s_autoscale.go index 95ba6a92a..4267b99e4 100644 --- a/k8s_autoscale.go +++ b/k8s_autoscale.go @@ -3,7 +3,6 @@ package kapacitor import ( "fmt" "log" - "sync" "time" "github.com/influxdata/kapacitor/expvar" @@ -36,8 +35,6 @@ type K8sAutoscaleNode struct { decreaseCount *expvar.Int cooldownDropsCount *expvar.Int - replicasExprsMu sync.RWMutex - min int max int } @@ -69,14 +66,6 @@ func newK8sAutoscaleNode(et *ExecutingTask, n *pipeline.K8sAutoscaleNode, l *log } func (k *K8sAutoscaleNode) runAutoscale([]byte) error { - valueF := func() int64 { - k.replicasExprsMu.RLock() - l := len(k.replicasExprs) - k.replicasExprsMu.RUnlock() - return int64(l) - } - k.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - k.increaseCount = &expvar.Int{} k.decreaseCount = &expvar.Int{} k.cooldownDropsCount = &expvar.Int{} @@ -159,9 +148,7 @@ func (k *K8sAutoscaleNode) handlePoint(streamName string, group models.GroupID, } // Eval the replicas expression - k.replicasExprsMu.Lock() newReplicas, err := k.evalExpr(state.current, group, k.k.Replicas, k.replicasExprs, k.replicasScopePool, t, fields, tags) - k.replicasExprsMu.Unlock() if err != nil { return models.Point{}, errors.Wrap(err, "failed to evaluate the replicas expression") } diff --git a/node.go b/node.go index 16e0dc938..f46f6f848 100644 --- a/node.go +++ b/node.go @@ -19,9 +19,8 @@ import ( ) const ( - statErrorCount = "errors" - statCardinalityGauge = "working_cardinality" - statAverageExecTime = "avg_exec_time_ns" + statAverageExecTime = "avg_exec_time_ns" + statErrorCount = "errors" ) // A node that can be in an executor. @@ -110,7 +109,6 @@ func (n *node) init() { n.statMap.Set(statAverageExecTime, avgExecVar) n.nodeErrors = &kexpvar.Int{} n.statMap.Set(statErrorCount, n.nodeErrors) - n.statMap.Set(statCardinalityGauge, kexpvar.NewIntFuncGauge(nil)) n.timer = n.et.tm.TimingService.NewTimer(avgExecVar) n.errCh = make(chan error, 1) } diff --git a/sample.go b/sample.go index 010a2e728..d8bf63beb 100644 --- a/sample.go +++ b/sample.go @@ -3,10 +3,8 @@ package kapacitor import ( "errors" "log" - "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -15,8 +13,6 @@ type SampleNode struct { node s *pipeline.SampleNode - countsMu sync.RWMutex - counts map[models.GroupID]int64 duration time.Duration } @@ -37,14 +33,6 @@ func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode, l *log.Logger) (*S } func (s *SampleNode) runSample([]byte) error { - valueF := func() int64 { - s.countsMu.RLock() - l := len(s.counts) - s.countsMu.RUnlock() - return int64(l) - } - s.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - switch s.Wants() { case pipeline.StreamEdge: for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() { @@ -85,12 +73,10 @@ func (s *SampleNode) shouldKeep(group models.GroupID, t time.Time) bool { keepTime := t.Truncate(s.duration) return t.Equal(keepTime) } else { - s.countsMu.Lock() count := s.counts[group] keep := count%s.s.N == 0 count++ s.counts[group] = count - s.countsMu.Unlock() return keep } } diff --git a/server/server_test.go b/server/server_test.go index 983e7d77e..b91a75740 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -409,10 +409,10 @@ func TestServer_EnableTask(t *testing.T) { dot := `digraph testTaskID { graph [throughput="0.00 points/s"]; -stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; +stream0 [avg_exec_time_ns="0s" errors="0" ]; stream0 -> from1 [processed="0"]; -from1 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; +from1 [avg_exec_time_ns="0s" errors="0" ]; }` if ti.Dot != dot { t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) @@ -479,10 +479,10 @@ func TestServer_EnableTaskOnCreate(t *testing.T) { dot := `digraph testTaskID { graph [throughput="0.00 points/s"]; -stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; +stream0 [avg_exec_time_ns="0s" errors="0" ]; stream0 -> from1 [processed="0"]; -from1 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; +from1 [avg_exec_time_ns="0s" errors="0" ]; }` if ti.Dot != dot { t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) diff --git a/where.go b/where.go index 1bf727209..1454b370c 100644 --- a/where.go +++ b/where.go @@ -4,9 +4,7 @@ import ( "errors" "fmt" "log" - "sync" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/stateful" @@ -37,22 +35,11 @@ func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, l *log.Logger) (wn * } func (w *WhereNode) runWhere(snapshot []byte) error { - var mu sync.RWMutex - valueF := func() int64 { - mu.RLock() - l := len(w.expressions) - mu.RUnlock() - return int64(l) - } - w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - switch w.Wants() { case pipeline.StreamEdge: for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() { w.timer.Start() - mu.RLock() expr := w.expressions[p.Group] - mu.RUnlock() scopePool := w.scopePools[p.Group] if expr == nil { @@ -62,9 +49,7 @@ func (w *WhereNode) runWhere(snapshot []byte) error { } expr = compiledExpr - mu.Lock() w.expressions[p.Group] = expr - mu.Unlock() scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(w.w.Lambda.Expression)) w.scopePools[p.Group] = scopePool @@ -87,9 +72,7 @@ func (w *WhereNode) runWhere(snapshot []byte) error { case pipeline.BatchEdge: for b, ok := w.ins[0].NextBatch(); ok; b, ok = w.ins[0].NextBatch() { w.timer.Start() - mu.RLock() expr := w.expressions[b.Group] - mu.RUnlock() scopePool := w.scopePools[b.Group] if expr == nil { @@ -99,9 +82,7 @@ func (w *WhereNode) runWhere(snapshot []byte) error { } expr = compiledExpr - mu.Lock() w.expressions[b.Group] = expr - mu.Unlock() scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(w.w.Lambda.Expression)) w.scopePools[b.Group] = scopePool diff --git a/window.go b/window.go index e4f95d893..89d03c407 100644 --- a/window.go +++ b/window.go @@ -4,10 +4,8 @@ import ( "errors" "fmt" "log" - "sync" "time" - "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -32,22 +30,11 @@ type window interface { } func (w *WindowNode) runWindow([]byte) error { - var mu sync.RWMutex windows := make(map[models.GroupID]window) - valueF := func() int64 { - mu.RLock() - l := len(windows) - mu.RUnlock() - return int64(l) - } - w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) - // Loops through points windowing by group for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() { w.timer.Start() - mu.RLock() wnd := windows[p.Group] - mu.RUnlock() if wnd == nil { tags := make(map[string]string, len(p.Dimensions.TagNames)) for _, dim := range p.Dimensions.TagNames { @@ -83,9 +70,7 @@ func (w *WindowNode) runWindow([]byte) error { // This should not be possible, but just in case. return errors.New("invalid window, no period specified for either time or count") } - mu.Lock() windows[p.Group] = wnd - mu.Unlock() } batch, ok := wnd.Insert(p) if ok {