Skip to content

Commit

Permalink
Define nodeCardinality stat on all nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
desa committed Mar 2, 2017
1 parent 06a96da commit bb3c2e5
Show file tree
Hide file tree
Showing 14 changed files with 17 additions and 48 deletions.
4 changes: 0 additions & 4 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type AlertNode struct {
warnsTriggered *expvar.Int
critsTriggered *expvar.Int
eventsDropped *expvar.Int
nodeCardinality *expvar.IntFuncGauge

bufPool sync.Pool

Expand Down Expand Up @@ -488,9 +487,6 @@ func (a *AlertNode) runAlert([]byte) error {
a.eventsDropped = &expvar.Int{}
a.statMap.Set(statsCritsTriggered, a.critsTriggered)

// a.nodeCardinality is assigned in newAlertNode
a.statMap.Set(statsCardinalityGauge, a.nodeCardinality)

switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
Expand Down
5 changes: 0 additions & 5 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type CombineNode struct {

cardinalityMu sync.RWMutex

nodeCardinality *expvar.IntFuncGauge

combination combination
}

Expand Down Expand Up @@ -74,9 +72,6 @@ func (t timeList) Less(i, j int) bool { return t[i].Before(t[j]) }
func (t timeList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }

func (n *CombineNode) runCombine([]byte) error {
// n.nodeCardinality is assigned in newCombineNode
n.statMap.Set(statsCardinalityGauge, n.nodeCardinality)

switch n.Wants() {
case pipeline.StreamEdge:
buffers := make(map[models.GroupID]*buffer)
Expand Down
3 changes: 1 addition & 2 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ func (d *DerivativeNode) runDerivative([]byte) error {
case pipeline.StreamEdge:
var mu sync.RWMutex
previous := make(map[models.GroupID]models.Point)
cardinalityGauge := expvar.NewIntFuncGauge(func() int64 {
d.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
mu.RLock()
l := len(previous)
mu.RUnlock()
return int64(l)
})
d.statMap.Set(statsCardinalityGauge, cardinalityGauge)
for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
mu.RLock()
Expand Down
6 changes: 1 addition & 5 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ type EvalNode struct {
scopePool stateful.ScopePool
tags map[string]bool

cardinalityMu sync.RWMutex
nodeCardinality *expvar.IntFuncGauge
cardinalityMu sync.RWMutex

evalErrors *expvar.Int
}
Expand Down Expand Up @@ -76,9 +75,6 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
// e.nodeCardinality is assigned in newEvalNode
e.statMap.Set(statsCardinalityGauge, e.nodeCardinality)

switch e.Provides() {
case pipeline.StreamEdge:
var err error
Expand Down
6 changes: 2 additions & 4 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ func (n *FlattenNode) runFlatten([]byte) error {
switch n.Wants() {
case pipeline.StreamEdge:
flattenBuffers := make(map[models.GroupID]*flattenStreamBuffer)
cardinalityGauge := expvar.NewIntFuncGauge(func() int64 {
n.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
mu.RLock()
l := len(flattenBuffers)
mu.RUnlock()
return int64(l)
})
n.statMap.Set(statsCardinalityGauge, cardinalityGauge)

for p, ok := n.ins[0].NextPoint(); ok; p, ok = n.ins[0].NextPoint() {
n.timer.Start()
Expand Down Expand Up @@ -118,13 +117,12 @@ func (n *FlattenNode) runFlatten([]byte) error {
}
case pipeline.BatchEdge:
allBuffers := make(map[models.GroupID]*flattenBatchBuffer)
cardinalityGauge := expvar.NewIntFuncGauge(func() int64 {
n.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
mu.RLock()
l := len(allBuffers)
mu.RUnlock()
return int64(l)
})
n.statMap.Set(statsCardinalityGauge, cardinalityGauge)
for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
t := b.TMax.Round(n.f.Tolerance)
Expand Down
5 changes: 0 additions & 5 deletions http_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ type HTTPOutNode struct {
endpoint string
routes []httpd.Route
mu sync.RWMutex

nodeCardinality *expvar.IntFuncGauge
}

// Create a new HTTPOutNode which caches the most recent item and exposes it over the HTTP API.
Expand Down Expand Up @@ -50,9 +48,6 @@ func (h *HTTPOutNode) Endpoint() string {
}

func (h *HTTPOutNode) runOut([]byte) error {
// h.nodeCardinality is assigned in newHTTPOutNode
h.statMap.Set(statsCardinalityGauge, h.nodeCardinality)

hndl := func(w http.ResponseWriter, req *http.Request) {
h.mu.RLock()
defer h.mu.RUnlock()
Expand Down
3 changes: 1 addition & 2 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ func (c *baseReduceContext) Time() time.Time {
func (n *InfluxQLNode) runStreamInfluxQL() error {
var mu sync.RWMutex
contexts := make(map[models.GroupID]reduceContext)
cardinalityGauge := expvar.NewIntFuncGauge(func() int64 {
n.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
mu.RLock()
l := len(contexts)
mu.RUnlock()
return int64(l)
})
n.statMap.Set(statsCardinalityGauge, cardinalityGauge)

for p, ok := n.ins[0].NextPoint(); ok; {
n.timer.Start()
Expand Down
5 changes: 1 addition & 4 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ type JoinNode struct {
// Represents the lower bound of times per group per parent
lowMarks map[srcGroup]time.Time

cardinalityMu sync.RWMutex
nodeCardinality *expvar.IntFuncGauge
cardinalityMu sync.RWMutex

reported map[int]bool
allReported bool
Expand Down Expand Up @@ -76,8 +75,6 @@ func (j *JoinNode) runJoin([]byte) error {
return int64(l)
})

j.statMap.Set(statsCardinalityGauge, j.nodeCardinality)

groupErrs := make(chan error, 1)
done := make(chan struct{}, len(j.ins))

Expand Down
4 changes: 0 additions & 4 deletions k8s_autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ type K8sAutoscaleNode struct {
decreaseCount *expvar.Int
cooldownDropsCount *expvar.Int

nodeCardinality *expvar.IntFuncGauge

cardinalityMu sync.RWMutex

min int
Expand Down Expand Up @@ -84,8 +82,6 @@ func (k *K8sAutoscaleNode) runAutoscale([]byte) error {
k.statMap.Set(statsK8sIncreaseEventsCount, k.increaseCount)
k.statMap.Set(statsK8sDecreaseEventsCount, k.decreaseCount)
k.statMap.Set(statsK8sCooldownDropsCount, k.cooldownDropsCount)
// k.nodeCardinality is assigned in newK8sAutoscaleNode
k.statMap.Set(statsCardinalityGauge, k.nodeCardinality)

switch k.Wants() {
case pipeline.StreamEdge:
Expand Down
5 changes: 4 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ type node struct {
statsKey string
statMap *kexpvar.Map

nodeErrors *kexpvar.Int
nodeErrors *kexpvar.Int
nodeCardinality *kexpvar.IntFuncGauge
}

func (n *node) addParentEdge(e *Edge) {
Expand All @@ -110,6 +111,8 @@ func (n *node) init() {
n.statMap.Set(statAverageExecTime, avgExecVar)
n.nodeErrors = &kexpvar.Int{}
n.statMap.Set(statErrorCount, n.nodeErrors)
n.nodeCardinality = kexpvar.NewIntFuncGauge(func() int64 { return 0 })
n.statMap.Set(statsCardinalityGauge, n.nodeCardinality)
n.timer = n.et.tm.TimingService.NewTimer(avgExecVar)
n.errCh = make(chan error, 1)
}
Expand Down
5 changes: 1 addition & 4 deletions sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ type SampleNode struct {
node
s *pipeline.SampleNode

cardinalityMu sync.RWMutex
nodeCardinality *expvar.IntFuncGauge
cardinalityMu sync.RWMutex

counts map[models.GroupID]int64
duration time.Duration
Expand Down Expand Up @@ -44,8 +43,6 @@ func newSampleNode(et *ExecutingTask, n *pipeline.SampleNode, l *log.Logger) (*S
}

func (s *SampleNode) runSample([]byte) error {
// s.nodeCardinality is assigned in newSampleNode
s.statMap.Set(statsCardinalityGauge, s.nodeCardinality)
switch s.Wants() {
case pipeline.StreamEdge:
for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() {
Expand Down
8 changes: 4 additions & 4 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,10 @@ func TestServer_EnableTask(t *testing.T) {
dot := `digraph testTaskID {
graph [throughput="0.00 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" ];
stream0 [avg_exec_time_ns="0s" cardinality="0" errors="0" ];
stream0 -> from1 [processed="0"];
from1 [avg_exec_time_ns="0s" errors="0" ];
from1 [avg_exec_time_ns="0s" cardinality="0" errors="0" ];
}`
if ti.Dot != dot {
t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot)
Expand Down Expand Up @@ -479,10 +479,10 @@ func TestServer_EnableTaskOnCreate(t *testing.T) {
dot := `digraph testTaskID {
graph [throughput="0.00 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" ];
stream0 [avg_exec_time_ns="0s" cardinality="0" errors="0" ];
stream0 -> from1 [processed="0"];
from1 [avg_exec_time_ns="0s" errors="0" ];
from1 [avg_exec_time_ns="0s" cardinality="0" errors="0" ];
}`
if ti.Dot != dot {
t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot)
Expand Down
3 changes: 1 addition & 2 deletions where.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, l *log.Logger) (wn *

func (w *WhereNode) runWhere(snapshot []byte) error {
var mu sync.RWMutex
cardinalityGauge := expvar.NewIntFuncGauge(func() int64 {
w.nodeCardinality = expvar.NewIntFuncGauge(func() int64 {
mu.RLock()
l := len(w.expressions)
mu.RUnlock()
return int64(l)
})
w.statMap.Set(statsCardinalityGauge, cardinalityGauge)

switch w.Wants() {
case pipeline.StreamEdge:
Expand Down
3 changes: 1 addition & 2 deletions window.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ type window interface {
func (w *WindowNode) runWindow([]byte) error {
var mu sync.RWMutex
windows := make(map[models.GroupID]window)
cardinalityGauge := kexpvar.NewIntFuncGauge(func() int64 {
w.nodeCardinality = kexpvar.NewIntFuncGauge(func() int64 {
mu.RLock()
l := len(windows)
mu.RUnlock()
return int64(l)
})
w.statMap.Set(statsCardinalityGauge, cardinalityGauge)
// Loops through points windowing by group
for p, ok := w.ins[0].NextPoint(); ok; p, ok = w.ins[0].NextPoint() {
w.timer.Start()
Expand Down

0 comments on commit bb3c2e5

Please sign in to comment.