Skip to content

Commit

Permalink
deadmans stats by group and for emitted not collected
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 16, 2016
1 parent 4e5a5e5 commit f8b8602
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### Features
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.
- [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data.
The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`.


### Bugfixes
Expand Down
74 changes: 70 additions & 4 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"strconv"
"sync"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
Expand Down Expand Up @@ -33,9 +34,11 @@ type Edge struct {
batch chan models.Batch
reduce chan *MapResult

logger *log.Logger
aborted chan struct{}
statMap *expvar.Map
logger *log.Logger
aborted chan struct{}
statMap *expvar.Map
groupMu sync.RWMutex
groupStats map[models.GroupID]*edgeStat
}

func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logService LogService) *Edge {
Expand All @@ -48,7 +51,11 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer
sm := NewStatistics("edges", tags)
sm.Add(statCollected, 0)
sm.Add(statEmitted, 0)
e := &Edge{statMap: sm, aborted: make(chan struct{})}
e := &Edge{
statMap: sm,
aborted: make(chan struct{}),
groupStats: make(map[models.GroupID]*edgeStat),
}
name := fmt.Sprintf("%s|%s->%s", taskName, parentName, childName)
e.logger = logService.NewLogger(fmt.Sprintf("[edge:%s] ", name), log.LstdFlags)
switch t {
Expand Down Expand Up @@ -78,6 +85,29 @@ func (e *Edge) collectedCount() int64 {
return int64(c)
}

// Stats for a given group for this edge
type edgeStat struct {
collected int64
emitted int64
tags models.Tags
dims []string
}

// Get a snapshot of the current group statistics for this edge
func (e *Edge) readGroupStats(f func(group models.GroupID, collected, emitted int64, tags models.Tags, dims []string)) {
e.groupMu.RLock()
defer e.groupMu.RUnlock()
for group, stats := range e.groupStats {
f(
group,
stats.collected,
stats.emitted,
stats.tags,
stats.dims,
)
}
}

// Close the edge, this can only be called after all
// collect calls to the edge have finished.
func (e *Edge) Close() {
Expand Down Expand Up @@ -121,6 +151,7 @@ func (e *Edge) NextPoint() (p models.Point, ok bool) {
case p, ok = <-e.stream:
if ok {
e.statMap.Add(statEmitted, 1)
e.incEmitted(p)
}
}
return
Expand All @@ -132,6 +163,7 @@ func (e *Edge) NextBatch() (b models.Batch, ok bool) {
case b, ok = <-e.batch:
if ok {
e.statMap.Add(statEmitted, 1)
e.incEmitted(b)
}
}
return
Expand All @@ -150,6 +182,7 @@ func (e *Edge) NextMaps() (m *MapResult, ok bool) {

func (e *Edge) CollectPoint(p models.Point) error {
e.statMap.Add(statCollected, 1)
e.incCollected(p)
select {
case <-e.aborted:
return ErrAborted
Expand All @@ -160,6 +193,7 @@ func (e *Edge) CollectPoint(p models.Point) error {

func (e *Edge) CollectBatch(b models.Batch) error {
e.statMap.Add(statCollected, 1)
e.incCollected(b)
select {
case <-e.aborted:
return ErrAborted
Expand All @@ -177,3 +211,35 @@ func (e *Edge) CollectMaps(m *MapResult) error {
return nil
}
}

// Increment the emitted count of the group for this edge.
func (e *Edge) incEmitted(p models.PointInterface) {
e.groupMu.Lock()
defer e.groupMu.Unlock()
if stats, ok := e.groupStats[p.PointGroup()]; ok {
stats.emitted += 1
} else {
stats = &edgeStat{
emitted: 1,
tags: p.PointTags(),
dims: p.PointDimensions(),
}
e.groupStats[p.PointGroup()] = stats
}
}

// Increment the ollected count of the group for this edge.
func (e *Edge) incCollected(p models.PointInterface) {
e.groupMu.Lock()
defer e.groupMu.Unlock()
if stats, ok := e.groupStats[p.PointGroup()]; ok {
stats.collected += 1
} else {
stats = &edgeStat{
collected: 1,
tags: p.PointTags(),
dims: p.PointDimensions(),
}
e.groupStats[p.PointGroup()] = stats
}
}
43 changes: 39 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"runtime"
"sync"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

Expand Down Expand Up @@ -39,8 +40,7 @@ type Node interface {
// executing dot
edot(buf *bytes.Buffer)

// the number of points/batches this node has collected
collectedCount() int64
nodeStatsByGroup() map[models.GroupID]nodeStats
}

//implementation of Node
Expand Down Expand Up @@ -174,10 +174,45 @@ func (n *node) edot(buf *bytes.Buffer) {
// Return the number of points/batches this node
// has collected.
func (n *node) collectedCount() (c int64) {

// Count how many points each parent edge has emitted.
for _, in := range n.ins {
c += in.emittedCount()
}
return c
return
}

// Return the number of points/batches this node
// has emitted.
func (n *node) emittedCount() (c int64) {
// Count how many points each output edge has collected.
for _, out := range n.outs {
c += out.collectedCount()
}
return
}

// Statistics for a node
type nodeStats struct {
Fields models.Fields
Tags models.Tags
Dimensions []string
}

// Return a copy of the current node statistics.
func (n *node) nodeStatsByGroup() (stats map[models.GroupID]nodeStats) {
// Get the counts for just one output.
if len(n.outs) > 0 {
stats = make(map[models.GroupID]nodeStats)
n.outs[0].readGroupStats(func(group models.GroupID, c, e int64, tags models.Tags, dims []string) {
stats[group] = nodeStats{
Fields: models.Fields{
// A node's emitted count is the collected count of its output.
"emitted": c,
},
Tags: tags,
Dimensions: dims,
}
})
}
return
}
44 changes: 44 additions & 0 deletions noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package kapacitor

import (
"log"

"github.com/influxdata/kapacitor/pipeline"
)

type NoOpNode struct {
node
}

// Create a new NoOpNode which does nothing with the data and just passes it through.
func newNoOpNode(et *ExecutingTask, n *pipeline.NoOpNode, l *log.Logger) (*NoOpNode, error) {
nn := &NoOpNode{
node: node{Node: n, et: et, logger: l},
}
nn.node.runF = nn.runNoOp
return nn, nil
}

func (s *NoOpNode) runNoOp([]byte) error {
switch s.Wants() {
case pipeline.StreamEdge:
for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() {
for _, child := range s.outs {
err := child.CollectPoint(p)
if err != nil {
return err
}
}
}
case pipeline.BatchEdge:
for b, ok := s.ins[0].NextBatch(); ok; b, ok = s.ins[0].NextBatch() {
for _, child := range s.outs {
err := child.CollectBatch(b)
if err != nil {
return err
}
}
}
}
return nil
}
11 changes: 9 additions & 2 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ func (n *node) dot(buf *bytes.Buffer) {
func (n *node) Stats(interval time.Duration) *StatsNode {
stats := newStatsNode(n, interval)
n.pipeline().addSource(stats)
// If the source node does not have any children add a NoOpNode.
// This is a work around to make it so that the source node has somewhere to send its data.
// That way we can get stats on its behavior.
if len(n.Children()) == 0 {
noop := newNoOpNode(n.Provides())
n.linkChild(noop)
}
return stats
}

Expand Down Expand Up @@ -238,14 +245,14 @@ const intervalMarker = "INTERVAL"
//
func (n *node) Deadman(threshold float64, interval time.Duration, expr ...tick.Node) *AlertNode {
dn := n.Stats(interval).
Derivative("collected").NonNegative()
Derivative("emitted").NonNegative()
dn.Unit = interval

an := dn.Alert()
critExpr := &tick.BinaryNode{
Operator: tick.TokenLessEqual,
Left: &tick.ReferenceNode{
Reference: "collected",
Reference: "emitted",
},
Right: &tick.NumberNode{
IsFloat: true,
Expand Down
20 changes: 20 additions & 0 deletions pipeline/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pipeline

// A node that does not perform any operation.
//
// *Do not use this node in a TICKscript there should be no need for it.*
//
// If a node does not have any children, then its emitted count remains zero.
// Using a NoOpNode is a work around so that statistics are accurately reported
// for nodes with no real children.
// A NoOpNode is automatically appended to any node that is a source for a StatsNode
// and does not have any children.
type NoOpNode struct {
chainnode
}

func newNoOpNode(wants EdgeType) *NoOpNode {
return &NoOpNode{
chainnode: newBasicChainNode("noop", wants, wants),
}
}
14 changes: 10 additions & 4 deletions pipeline/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@ import "time"
//
// The currently available internal statistics:
//
// * collected -- the number of points or batches this node has received.
// * emitted -- the number of points or batches this node has sent to its children.
//
// Each stat is available as a field in the emitted data stream.
// Each stat is available as a field in the data stream.
//
// The stats are in groups according to the original data.
// Meaning that if the source node is grouped by the tag 'host' as an example,
// then the counts are output per host with the appropriate 'host' tag.
// Since its possible for groups to change when crossing a node only the emitted groups
// are considered.
//
// Example:
// var data = stream.from()...
// // Emit statistics every 1 minute and cache them via the HTTP API.
// data.stats(1m).httpOut('stats')
// // Contiue normal processing of the data stream
// // Continue normal processing of the data stream
// data....
//
// WARNING: It is not recommened to join the stats stream with the orginal data stream.
// WARNING: It is not recommended to join the stats stream with the original data stream.
// Since they operate on different clocks you could potentially create a deadlock.
// This is a limitation of the current implementation and may be removed in the future.
type StatsNode struct {
Expand Down
2 changes: 1 addition & 1 deletion services/deadman/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
// Default deadman's switch id
DefaultId = "node 'NODE_NAME' in task '{{ .TaskName }}'"
// Default deadman's switch message
DefaultMessage = "{{ .ID }} is {{ if eq .Level \"OK\" }}alive{{ else }}dead{{ end }}: {{ index .Fields \"collected\" | printf \"%0.3f\" }} points/INTERVAL."
DefaultMessage = "{{ .ID }} is {{ if eq .Level \"OK\" }}alive{{ else }}dead{{ end }}: {{ index .Fields \"emitted\" | printf \"%0.3f\" }} points/INTERVAL."
)

type Config struct {
Expand Down
17 changes: 11 additions & 6 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@ func (s *StatsNode) runStats([]byte) error {
return nil
case now := <-ticker.C:
point.Time = now.UTC()
count := s.en.collectedCount()
point.Fields = models.Fields{"collected": count}
for _, out := range s.outs {
err := out.CollectPoint(point)
if err != nil {
return err
stats := s.en.nodeStatsByGroup()
for group, stat := range stats {
point.Fields = stat.Fields
point.Group = group
point.Dimensions = stat.Dimensions
point.Tags = stat.Tags
for _, out := range s.outs {
err := out.CollectPoint(point)
if err != nil {
return err
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (Node, error
return newStatsNode(et, t, l)
case *pipeline.ShiftNode:
return newShiftNode(et, t, l)
case *pipeline.NoOpNode:
return newNoOpNode(et, t, l)
default:
return nil, fmt.Errorf("unknown pipeline node type %T", p)
}
Expand Down

0 comments on commit f8b8602

Please sign in to comment.