From c902ade44a234799db5a0174689cc0e9d571fd7c Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Mon, 6 Mar 2017 11:53:59 -0500 Subject: [PATCH] Revert "Revert "WIP: Add cardinality stat to each node type."" --- CHANGELOG.md | 1 + alert.go | 25 +- combine.go | 17 + derivative.go | 17 + eval.go | 19 + expvar/expvar.go | 23 + flatten.go | 26 + group_by.go | 21 + http_out.go | 8 + influxql.go | 17 + integrations/data/TestStream_Cardinality.srpl | 270 +++++++ integrations/streamer_test.go | 708 ++++++++++++++++++ join.go | 19 +- k8s_autoscale.go | 13 + node.go | 6 +- sample.go | 14 + server/server_test.go | 8 +- where.go | 19 + window.go | 15 + 19 files changed, 1236 insertions(+), 10 deletions(-) create mode 100644 integrations/data/TestStream_Cardinality.srpl diff --git a/CHANGELOG.md b/CHANGELOG.md index c18157f1b..3a6052633 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Renamed `eval_errors` to `errors` in eval node. - [#922](https://github.com/influxdata/kapacitor/issues/922): Expose server specific information in alert templates. - [#1162](https://github.com/influxdata/kapacitor/pulls/1162): Add Pushover integration. +- [#1221](https://github.com/influxdata/kapacitor/pull/1221): Add `working_cardinality` stat to each node type that tracks the number of groups per node. ### Bugfixes diff --git a/alert.go b/alert.go index 8be0522be..3f2f847a3 100644 --- a/alert.go +++ b/alert.go @@ -58,6 +58,8 @@ type AlertNode struct { messageTmpl *text.Template detailsTmpl *html.Template + statesMu sync.RWMutex + alertsTriggered *expvar.Int oksTriggered *expvar.Int infosTriggered *expvar.Int @@ -448,6 +450,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * } func (a *AlertNode) runAlert([]byte) error { + valueF := func() int64 { + a.statesMu.RLock() + l := len(a.states) + a.statesMu.RUnlock() + return int64(l) + } + a.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + // Register delete hook if a.hasAnonTopic() { a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic)) @@ -488,7 +498,7 @@ func (a *AlertNode) runAlert([]byte) error { return err } var currentLevel alert.Level - if state, ok := a.states[p.Group]; ok { + if state, ok := a.getAlertState(p.Group); ok { currentLevel = state.currentLevel() } else { // Check for previous state @@ -580,7 +590,7 @@ func (a *AlertNode) runAlert([]byte) error { var highestPoint *models.BatchPoint var currentLevel alert.Level - if state, ok := a.states[b.Group]; ok { + if state, ok := a.getAlertState(b.Group); ok { currentLevel = state.currentLevel() } else { // Check for previous state @@ -934,12 +944,14 @@ func (a *alertState) percentChange() float64 { } func (a *AlertNode) updateState(t time.Time, level alert.Level, group models.GroupID) *alertState { - state, ok := a.states[group] + state, ok := a.getAlertState(group) if !ok { state = &alertState{ history: make([]alert.Level, a.a.History), } + a.statesMu.Lock() a.states[group] = state + a.statesMu.Unlock() } state.addEvent(level) @@ -1074,3 +1086,10 @@ func (a *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group details := tmpBuffer.String() return msg, details, nil } + +func (a *AlertNode) getAlertState(id models.GroupID) (state *alertState, ok bool) { + a.statesMu.RLock() + state, ok = a.states[id] + a.statesMu.RUnlock() + return state, ok +} diff --git a/combine.go b/combine.go index 76a0b80b3..a4cec6df0 100644 --- a/combine.go +++ b/combine.go @@ -4,8 +4,10 @@ import ( "fmt" "log" "sort" + "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/stateful" @@ -19,6 +21,8 @@ type CombineNode struct { expressionsByGroup map[models.GroupID][]stateful.Expression scopePools []stateful.ScopePool + expressionsByGroupMu sync.RWMutex + combination combination } @@ -30,6 +34,7 @@ func newCombineNode(et *ExecutingTask, n *pipeline.CombineNode, l *log.Logger) ( expressionsByGroup: make(map[models.GroupID][]stateful.Expression), combination: combination{max: n.Max}, } + // Create stateful expressions cn.expressions = make([]stateful.Expression, len(n.Lambdas)) cn.scopePools = make([]stateful.ScopePool, len(n.Lambdas)) @@ -60,6 +65,14 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) } func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] } func (n *CombineNode) runCombine([]byte) error { + valueF := func() int64 { + n.expressionsByGroupMu.RLock() + l := len(n.expressionsByGroup) + n.expressionsByGroupMu.RUnlock() + return int64(l) + } + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + switch n.Wants() { case pipeline.StreamEdge: buffers := make(map[models.GroupID]*buffer) @@ -162,13 +175,17 @@ func (n *CombineNode) combineBuffer(buf *buffer) error { return nil } l := len(n.expressions) + n.expressionsByGroupMu.RLock() expressions, ok := n.expressionsByGroup[buf.Group] + n.expressionsByGroupMu.RUnlock() if !ok { expressions = make([]stateful.Expression, l) for i, expr := range n.expressions { expressions[i] = expr.CopyReset() } + n.expressionsByGroupMu.Lock() n.expressionsByGroup[buf.Group] = expressions + n.expressionsByGroupMu.Unlock() } // Compute matching result for all points diff --git a/derivative.go b/derivative.go index 1de31ac69..ce415f9a4 100644 --- a/derivative.go +++ b/derivative.go @@ -2,8 +2,10 @@ package kapacitor import ( "log" + "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -27,12 +29,25 @@ func newDerivativeNode(et *ExecutingTask, n *pipeline.DerivativeNode, l *log.Log func (d *DerivativeNode) runDerivative([]byte) error { switch d.Provides() { case pipeline.StreamEdge: + var mu sync.RWMutex previous := make(map[models.GroupID]models.Point) + valueF := func() int64 { + mu.RLock() + l := len(previous) + mu.RUnlock() + return int64(l) + } + d.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() { d.timer.Start() + mu.RLock() pr, ok := previous[p.Group] + mu.RUnlock() if !ok { + mu.Lock() previous[p.Group] = p + mu.Unlock() d.timer.Stop() continue } @@ -51,7 +66,9 @@ func (d *DerivativeNode) runDerivative([]byte) error { } d.timer.Resume() } + mu.Lock() previous[p.Group] = p + mu.Unlock() d.timer.Stop() } case pipeline.BatchEdge: diff --git a/eval.go b/eval.go index 6c828db0b..965c79fce 100644 --- a/eval.go +++ b/eval.go @@ -4,8 +4,10 @@ import ( "errors" "fmt" "log" + "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/ast" @@ -20,6 +22,10 @@ type EvalNode struct { refVarList [][]string scopePool stateful.ScopePool tags map[string]bool + + expressionsByGroupMu sync.RWMutex + + evalErrors *expvar.Int } // Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point. @@ -32,6 +38,7 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN e: n, expressionsByGroup: make(map[models.GroupID][]stateful.Expression), } + // Create stateful expressions en.expressions = make([]stateful.Expression, len(n.Lambdas)) en.refVarList = make([][]string, len(n.Lambdas)) @@ -62,6 +69,14 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN } func (e *EvalNode) runEval(snapshot []byte) error { + valueF := func() int64 { + e.expressionsByGroupMu.RLock() + l := len(e.expressionsByGroup) + e.expressionsByGroupMu.RUnlock() + return int64(l) + } + e.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + switch e.Provides() { case pipeline.StreamEdge: var err error @@ -118,13 +133,17 @@ func (e *EvalNode) runEval(snapshot []byte) error { func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags models.Tags) (models.Fields, models.Tags, error) { vars := e.scopePool.Get() defer e.scopePool.Put(vars) + e.expressionsByGroupMu.RLock() expressions, ok := e.expressionsByGroup[group] + e.expressionsByGroupMu.RUnlock() if !ok { expressions = make([]stateful.Expression, len(e.expressions)) for i, exp := range e.expressions { expressions[i] = exp.CopyReset() } + e.expressionsByGroupMu.Lock() e.expressionsByGroup[group] = expressions + e.expressionsByGroupMu.Unlock() } for i, expr := range expressions { err := fillScope(vars, e.refVarList[i], now, fields, tags) diff --git a/expvar/expvar.go b/expvar/expvar.go index 8b0e49ff2..a60451e66 100644 --- a/expvar/expvar.go +++ b/expvar/expvar.go @@ -46,6 +46,29 @@ func (v *Int) IntValue() int64 { return atomic.LoadInt64(&v.i) } +// IntFuncGauge is a 64-bit integer variable that satisfies the expvar.Var interface. +type IntFuncGauge struct { + ValueF func() int64 +} + +func (v *IntFuncGauge) String() string { + return strconv.FormatInt(v.IntValue(), 10) +} + +func (v *IntFuncGauge) Add(delta int64) {} +func (v *IntFuncGauge) Set(value int64) {} + +func (v *IntFuncGauge) IntValue() int64 { + if v == nil || v.ValueF == nil { + return 0 + } + return v.ValueF() +} + +func NewIntFuncGauge(fn func() int64) *IntFuncGauge { + return &IntFuncGauge{fn} +} + // IntSum is a 64-bit integer variable that consists of multiple different parts // and satisfies the expvar.Var interface. // The value of the var is the sum of all its parts. diff --git a/flatten.go b/flatten.go index 32dae5907..0beec49fe 100644 --- a/flatten.go +++ b/flatten.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -49,13 +50,24 @@ type flattenBatchBuffer struct { } func (n *FlattenNode) runFlatten([]byte) error { + var mu sync.RWMutex switch n.Wants() { case pipeline.StreamEdge: flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer) + valueF := func() int64 { + mu.RLock() + l := len(flattenBuffers) + mu.RUnlock() + return int64(l) + } + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() { n.timer.Start() t := p.Time.Round(n.f.Tolerance) + mu.RLock() currentBuf, ok := flattenBuffers[p.Group] + mu.RUnlock() if !ok { currentBuf = &flattenStreamBuffer{ Time: t, @@ -64,7 +76,9 @@ func (n *FlattenNode) runFlatten([]byte) error { Dimensions: p.Dimensions, Tags: p.PointTags(), } + mu.Lock() flattenBuffers[p.Group] = currentBuf + mu.Unlock() } rp := models.RawPoint{ Time: t, @@ -104,10 +118,20 @@ func (n *FlattenNode) runFlatten([]byte) error { } case pipeline.BatchEdge: allBuffers := make(map[models.GroupID]*flattenBatchBuffer) + valueF := func() int64 { + mu.RLock() + l := len(allBuffers) + mu.RUnlock() + return int64(l) + } + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() { n.timer.Start() t := b.TMax.Round(n.f.Tolerance) + mu.RLock() currentBuf, ok := allBuffers[b.Group] + mu.RUnlock() if !ok { currentBuf = &flattenBatchBuffer{ Time: t, @@ -116,7 +140,9 @@ func (n *FlattenNode) runFlatten([]byte) error { Tags: b.Tags, Points: make(map[time.Time][]models.RawPoint), } + mu.Lock() allBuffers[b.Group] = currentBuf + mu.Unlock() } if !t.Equal(currentBuf.Time) { // Flatten/Emit old buffer diff --git a/group_by.go b/group_by.go index eaf9fd399..f23972d5b 100644 --- a/group_by.go +++ b/group_by.go @@ -3,8 +3,10 @@ package kapacitor import ( "log" "sort" + "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/ast" @@ -50,13 +52,23 @@ func (g *GroupByNode) runGroupBy([]byte) error { } } default: + var mu sync.RWMutex var lastTime time.Time groups := make(map[models.GroupID]*models.Batch) + valueF := func() int64 { + mu.RLock() + l := len(groups) + mu.RUnlock() + return int64(l) + } + g.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for b, ok := g.ins[0].NextBatch(); ok; b, ok = g.ins[0].NextBatch() { g.timer.Start() if !b.TMax.Equal(lastTime) { lastTime = b.TMax // Emit all groups + mu.RLock() for id, group := range groups { for _, child := range g.outs { err := child.CollectBatch(*group) @@ -64,9 +76,14 @@ func (g *GroupByNode) runGroupBy([]byte) error { return err } } + mu.RUnlock() + mu.Lock() // Remove from groups delete(groups, id) + mu.Unlock() + mu.RLock() } + mu.RUnlock() } for _, p := range b.Points { if g.allDimensions { @@ -75,7 +92,9 @@ func (g *GroupByNode) runGroupBy([]byte) error { dims.TagNames = g.dimensions } groupID := models.ToGroupID(b.Name, p.Tags, dims) + mu.RLock() group, ok := groups[groupID] + mu.RUnlock() if !ok { tags := make(map[string]string, len(dims.TagNames)) for _, dim := range dims.TagNames { @@ -88,7 +107,9 @@ func (g *GroupByNode) runGroupBy([]byte) error { ByName: b.ByName, Tags: tags, } + mu.Lock() groups[groupID] = group + mu.Unlock() } group.Points = append(group.Points, p) } diff --git a/http_out.go b/http_out.go index 23654b740..d34ba1083 100644 --- a/http_out.go +++ b/http_out.go @@ -7,6 +7,7 @@ import ( "path" "sync" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/services/httpd" @@ -41,6 +42,13 @@ func (h *HTTPOutNode) Endpoint() string { } func (h *HTTPOutNode) runOut([]byte) error { + valueF := func() int64 { + h.mu.RLock() + l := len(h.groupSeriesIdx) + h.mu.RUnlock() + return int64(l) + } + h.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) hndl := func(w http.ResponseWriter, req *http.Request) { h.mu.RLock() diff --git a/influxql.go b/influxql.go index 1e344de17..0d101f368 100644 --- a/influxql.go +++ b/influxql.go @@ -3,8 +3,10 @@ package kapacitor import ( "fmt" "log" + "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/pkg/errors" @@ -70,10 +72,21 @@ func (c *baseReduceContext) Time() time.Time { } func (n *InfluxQLNode) runStreamInfluxQL() error { + var mu sync.RWMutex contexts := make(map[models.GroupID]reduceContext) + valueF := func() int64 { + mu.RLock() + l := len(contexts) + mu.RUnlock() + return int64(l) + } + n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + for p, ok := n.ins[0].NextPoint(); ok; { n.timer.Start() + mu.RLock() context := contexts[p.Group] + mu.RUnlock() // Fisrt point in window if context == nil { // Create new context @@ -94,7 +107,9 @@ func (n *InfluxQLNode) runStreamInfluxQL() error { } context = createFn(c) + mu.Lock() contexts[p.Group] = context + mu.Unlock() } if n.isStreamTransformation { @@ -127,7 +142,9 @@ func (n *InfluxQLNode) runStreamInfluxQL() error { } // Nil out reduced point + mu.Lock() contexts[p.Group] = nil + mu.Unlock() // do not advance, // go through loop again to initialize new iterator. } diff --git a/integrations/data/TestStream_Cardinality.srpl b/integrations/data/TestStream_Cardinality.srpl new file mode 100644 index 000000000..1aed7d5aa --- /dev/null +++ b/integrations/data/TestStream_Cardinality.srpl @@ -0,0 +1,270 @@ +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000001 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000002 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000003 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000004 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000005 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000006 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000007 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000008 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000009 +dbname +rpname +cpu,cpu=cpu0,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu1,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu2,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu3,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu4,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu5,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu6,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu7,host=localhost usage_user=20.1 0000000010 +dbname +rpname +cpu,cpu=cpu-total,host=localhost usage_user=20.1 0000000010 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 215e0d043..c26f45788 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -8626,6 +8626,714 @@ topScores } } +func TestStream_DerivativeCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |derivative('usage_user') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "derivative2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_WhereCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |where(lambda: "host" == 'localhost') // replace with localhost +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "where2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_SampleCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |sample(2) +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "sample2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_WindowCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |window() + .period(1s) + .every(1s) +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "window2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_InfluxQLCardinalityStream(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |max('usage_user') + .as('max') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "max2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_InfluxQLCardinalityBatch(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |window() + .period(1s) + .every(1s) + |max('usage_user') + .as('max') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "window2": map[string]interface{}{ + "emitted": int64(81), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + "max3": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(0), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(81), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_EvalCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |eval(lambda: sigma("usage_user")) + .as('sigma') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "eval2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_FlattenCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |flatten() + .on('host','cpu') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "flatten2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_GroupByCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + |window() + .period(1s) + .every(1s) + |groupBy('cpu') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "window2": map[string]interface{}{ + "emitted": int64(9), + "working_cardinality": int64(1), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + "groupby3": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(9), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_AlertCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |alert() +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "alert2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + "warns_triggered": int64(0), + "crits_triggered": int64(0), + "alerts_triggered": int64(0), + "oks_triggered": int64(0), + "infos_triggered": int64(0), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_HTTPOutCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |httpOut('usage_user') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "http_out2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_K8sAutoscaleCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |k8sAutoscale() + .resourceName('a') + .replicas(lambda: int(0)) +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "k8s_autoscale2": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(9), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(90), + "increase_events": int64(1), + "decrease_events": int64(0), + "cooldown_drops": int64(0), + }, + } + + scaleUpdates := make(chan k8s.Scale, 100) + k8sAutoscale := k8sAutoscale{} + k8sAutoscale.ScalesGetFunc = func(kind, name string) (*k8s.Scale, error) { + var replicas int32 + switch name { + case "serviceA": + replicas = 1 + case "serviceB": + replicas = 10 + } + return &k8s.Scale{ + ObjectMeta: k8s.ObjectMeta{ + Name: name, + }, + Spec: k8s.ScaleSpec{ + Replicas: replicas, + }, + }, nil + } + k8sAutoscale.ScalesUpdateFunc = func(kind string, scale *k8s.Scale) error { + scaleUpdates <- *scale + return nil + } + tmInit := func(tm *kapacitor.TaskMaster) { + tm.K8sService = k8sAutoscale + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, tmInit) + close(scaleUpdates) +} + +func TestStream_JoinCardinality(t *testing.T) { + + var script = ` +var s1 = stream + |from() + .measurement('cpu') + .groupBy('host') + +var s2 = stream + |from() + .measurement('cpu') + .groupBy('cpu') + +s2|join(s1) + .as('s1','s2') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(180), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from2": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "join4": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(10), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(180), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_CombineCardinality(t *testing.T) { + + var script = ` +var s1 = stream + |from() + .measurement('cpu') + .groupBy('cpu','host') + |combine(lambda: TRUE, lambda: TRUE) + .as('total','true') +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "combine2": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(9), + "collected": int64(90), + "emitted": int64(0), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func TestStream_MixedCardinality(t *testing.T) { + + var script = ` +stream + |from() + .measurement('cpu') + .groupBy('host','cpu') + |where(lambda: "host" == 'localhost') + |eval(lambda: sigma("usage_user")) + .as('sigma') + |where(lambda: "cpu" == 'cpu-total' OR "cpu" == 'cpu0' OR "cpu" == 'cpu1') + |derivative('sigma') + |alert() +` + + // Expected Stats + es := map[string]map[string]interface{}{ + "stream0": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "from1": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(0), + "collected": int64(90), + "emitted": int64(90), + }, + "where2": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(9), + "collected": int64(90), + "emitted": int64(90), + }, + "eval3": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(9), + "collected": int64(90), + "emitted": int64(90), + }, + "where4": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(9), + "collected": int64(90), + "emitted": int64(30), + }, + "derivative5": map[string]interface{}{ + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "working_cardinality": int64(3), + "collected": int64(30), + "emitted": int64(27), + }, + "alert6": map[string]interface{}{ + "emitted": int64(0), + "working_cardinality": int64(3), + "avg_exec_time_ns": int64(0), + "errors": int64(0), + "collected": int64(27), + "warns_triggered": int64(0), + "crits_triggered": int64(0), + "alerts_triggered": int64(0), + "oks_triggered": int64(0), + "infos_triggered": int64(0), + }, + } + + testStreamerCardinality(t, "TestStream_Cardinality", script, es, nil) +} + +func testStreamerCardinality( + t *testing.T, + name, script string, + expectedStats map[string]map[string]interface{}, + tmInit func(tm *kapacitor.TaskMaster), +) { + clock, et, replayErr, tm := testStreamer(t, name, script, tmInit) + defer tm.Close() + + err := fastForwardTask(clock, et, replayErr, tm, 20*time.Second) + if err != nil { + t.Fatalf("Encountered error: %v", err) + } + stats, err := et.ExecutionStats() + if err != nil { + t.Fatalf("Encountered error: %v", err) + } + if !reflect.DeepEqual(expectedStats, stats.NodeStats) { + t.Errorf("got:\n%+v\n\nexp:\n%+v\n", stats.NodeStats, expectedStats) + } +} + // Helper test function for streamer func testStreamer( t *testing.T, diff --git a/join.go b/join.go index ff01fdc39..552b5c68c 100644 --- a/join.go +++ b/join.go @@ -29,6 +29,8 @@ type JoinNode struct { // Represents the lower bound of times per group per parent lowMarks map[srcGroup]time.Time + groupsMu sync.RWMutex + reported map[int]bool allReported bool } @@ -65,8 +67,14 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode, l *log.Logger) (*JoinN } func (j *JoinNode) runJoin([]byte) error { - j.groups = make(map[models.GroupID]*group) + valueF := func() int64 { + j.groupsMu.RLock() + l := len(j.groups) + j.groupsMu.RUnlock() + return int64(l) + } + j.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) groupErrs := make(chan error, 1) done := make(chan struct{}, len(j.ins)) @@ -109,16 +117,21 @@ func (j *JoinNode) runJoin([]byte) error { } } // No more points are coming signal all groups to finish up. + j.groupsMu.RLock() for _, group := range j.groups { close(group.points) } + j.groupsMu.RUnlock() + j.runningGroups.Wait() + j.groupsMu.RLock() for _, group := range j.groups { err := group.emitAll() if err != nil { return err } } + j.groupsMu.RUnlock() return nil } @@ -269,11 +282,15 @@ func (j *JoinNode) sendSpecificPoint(specific srcPoint, groupErrs chan<- error) // safely get the group for the point or create one if it doesn't exist. func (j *JoinNode) getGroup(p models.PointInterface, groupErrs chan<- error) *group { + j.groupsMu.RLock() group := j.groups[p.PointGroup()] + j.groupsMu.RUnlock() if group == nil { group = newGroup(len(j.ins), j) + j.groupsMu.Lock() j.groups[p.PointGroup()] = group j.runningGroups.Add(1) + j.groupsMu.Unlock() go func() { err := group.run() if err != nil { diff --git a/k8s_autoscale.go b/k8s_autoscale.go index 4267b99e4..95ba6a92a 100644 --- a/k8s_autoscale.go +++ b/k8s_autoscale.go @@ -3,6 +3,7 @@ package kapacitor import ( "fmt" "log" + "sync" "time" "github.com/influxdata/kapacitor/expvar" @@ -35,6 +36,8 @@ type K8sAutoscaleNode struct { decreaseCount *expvar.Int cooldownDropsCount *expvar.Int + replicasExprsMu sync.RWMutex + min int max int } @@ -66,6 +69,14 @@ func newK8sAutoscaleNode(et *ExecutingTask, n *pipeline.K8sAutoscaleNode, l *log } func (k *K8sAutoscaleNode) runAutoscale([]byte) error { + valueF := func() int64 { + k.replicasExprsMu.RLock() + l := len(k.replicasExprs) + k.replicasExprsMu.RUnlock() + return int64(l) + } + k.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + k.increaseCount = &expvar.Int{} k.decreaseCount = &expvar.Int{} k.cooldownDropsCount = &expvar.Int{} @@ -148,7 +159,9 @@ func (k *K8sAutoscaleNode) handlePoint(streamName string, group models.GroupID, } // Eval the replicas expression + k.replicasExprsMu.Lock() newReplicas, err := k.evalExpr(state.current, group, k.k.Replicas, k.replicasExprs, k.replicasScopePool, t, fields, tags) + k.replicasExprsMu.Unlock() if err != nil { return models.Point{}, errors.Wrap(err, "failed to evaluate the replicas expression") } diff --git a/node.go b/node.go index f46f6f848..16e0dc938 100644 --- a/node.go +++ b/node.go @@ -19,8 +19,9 @@ import ( ) const ( - statAverageExecTime = "avg_exec_time_ns" - statErrorCount = "errors" + statErrorCount = "errors" + statCardinalityGauge = "working_cardinality" + statAverageExecTime = "avg_exec_time_ns" ) // A node that can be in an executor. @@ -109,6 +110,7 @@ func (n *node) init() { n.statMap.Set(statAverageExecTime, avgExecVar) n.nodeErrors = &kexpvar.Int{} n.statMap.Set(statErrorCount, n.nodeErrors) + n.statMap.Set(statCardinalityGauge, kexpvar.NewIntFuncGauge(nil)) n.timer = n.et.tm.TimingService.NewTimer(avgExecVar) n.errCh = make(chan error, 1) } diff --git a/sample.go b/sample.go index d8bf63beb..010a2e728 100644 --- a/sample.go +++ b/sample.go @@ -3,8 +3,10 @@ package kapacitor import ( "errors" "log" + "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -13,6 +15,8 @@ type SampleNode struct { node s *pipeline.SampleNode + countsMu sync.RWMutex + counts map[models.GroupID]int64 duration time.Duration } @@ -33,6 +37,14 @@ func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode, l *log.Logger) (*S } func (s *SampleNode) runSample([]byte) error { + valueF := func() int64 { + s.countsMu.RLock() + l := len(s.counts) + s.countsMu.RUnlock() + return int64(l) + } + s.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + switch s.Wants() { case pipeline.StreamEdge: for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() { @@ -73,10 +85,12 @@ func (s *SampleNode) shouldKeep(group models.GroupID, t time.Time) bool { keepTime := t.Truncate(s.duration) return t.Equal(keepTime) } else { + s.countsMu.Lock() count := s.counts[group] keep := count%s.s.N == 0 count++ s.counts[group] = count + s.countsMu.Unlock() return keep } } diff --git a/server/server_test.go b/server/server_test.go index b91a75740..983e7d77e 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -409,10 +409,10 @@ func TestServer_EnableTask(t *testing.T) { dot := `digraph testTaskID { graph [throughput="0.00 points/s"]; -stream0 [avg_exec_time_ns="0s" errors="0" ]; +stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; stream0 -> from1 [processed="0"]; -from1 [avg_exec_time_ns="0s" errors="0" ]; +from1 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; }` if ti.Dot != dot { t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) @@ -479,10 +479,10 @@ func TestServer_EnableTaskOnCreate(t *testing.T) { dot := `digraph testTaskID { graph [throughput="0.00 points/s"]; -stream0 [avg_exec_time_ns="0s" errors="0" ]; +stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; stream0 -> from1 [processed="0"]; -from1 [avg_exec_time_ns="0s" errors="0" ]; +from1 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ]; }` if ti.Dot != dot { t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) diff --git a/where.go b/where.go index 1454b370c..1bf727209 100644 --- a/where.go +++ b/where.go @@ -4,7 +4,9 @@ import ( "errors" "fmt" "log" + "sync" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" "github.com/influxdata/kapacitor/tick/stateful" @@ -35,11 +37,22 @@ func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, l *log.Logger) (wn * } func (w *WhereNode) runWhere(snapshot []byte) error { + var mu sync.RWMutex + valueF := func() int64 { + mu.RLock() + l := len(w.expressions) + mu.RUnlock() + return int64(l) + } + w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + switch w.Wants() { case pipeline.StreamEdge: for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() { w.timer.Start() + mu.RLock() expr := w.expressions[p.Group] + mu.RUnlock() scopePool := w.scopePools[p.Group] if expr == nil { @@ -49,7 +62,9 @@ func (w *WhereNode) runWhere(snapshot []byte) error { } expr = compiledExpr + mu.Lock() w.expressions[p.Group] = expr + mu.Unlock() scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(w.w.Lambda.Expression)) w.scopePools[p.Group] = scopePool @@ -72,7 +87,9 @@ func (w *WhereNode) runWhere(snapshot []byte) error { case pipeline.BatchEdge: for b, ok := w.ins[0].NextBatch(); ok; b, ok = w.ins[0].NextBatch() { w.timer.Start() + mu.RLock() expr := w.expressions[b.Group] + mu.RUnlock() scopePool := w.scopePools[b.Group] if expr == nil { @@ -82,7 +99,9 @@ func (w *WhereNode) runWhere(snapshot []byte) error { } expr = compiledExpr + mu.Lock() w.expressions[b.Group] = expr + mu.Unlock() scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(w.w.Lambda.Expression)) w.scopePools[b.Group] = scopePool diff --git a/window.go b/window.go index 89d03c407..e4f95d893 100644 --- a/window.go +++ b/window.go @@ -4,8 +4,10 @@ import ( "errors" "fmt" "log" + "sync" "time" + "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" ) @@ -30,11 +32,22 @@ type window interface { } func (w *WindowNode) runWindow([]byte) error { + var mu sync.RWMutex windows := make(map[models.GroupID]window) + valueF := func() int64 { + mu.RLock() + l := len(windows) + mu.RUnlock() + return int64(l) + } + w.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF)) + // Loops through points windowing by group for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() { w.timer.Start() + mu.RLock() wnd := windows[p.Group] + mu.RUnlock() if wnd == nil { tags := make(map[string]string, len(p.Dimensions.TagNames)) for _, dim := range p.Dimensions.TagNames { @@ -70,7 +83,9 @@ func (w *WindowNode) runWindow([]byte) error { // This should not be possible, but just in case. return errors.New("invalid window, no period specified for either time or count") } + mu.Lock() windows[p.Group] = wnd + mu.Unlock() } batch, ok := wnd.Insert(p) if ok {