Skip to content

Commit

Permalink
Add error counters to every node type
Browse files Browse the repository at this point in the history
Add error count to alert node

Each instance of `E!` now increments the error count for the alert node.

Add error count to batch node

Each instance of `E!` now increments the error count.

Add error count to combine node

Each instance of `E!` now increments the error count for a combine node.

Add error count to where node

Each instance of `E!` now increments the error count

Add error count to derivative node

Each instance of `E!` now increments a the error count.

Add error count to eval node

Each instance of `E!` now increments the node error count. It should be
noted that the eval node already tracks its own errors.

Add error count to flatten node

Each instance of `E!` now increments the nodes error count.

Add error count to influxdb_out node

Each instance of `E!` now increments the node's error count.

Add error count to influxql node

Each instance of `E!` now increments the nodes error count.

Add error count to join node

Note the TODO item.

Add errors count to k8s autoscale node

Note here that k8s already tracks it's own error count.

Add error count to log node

Each instance of `E!` now increments the error count

Add error count to stream node

Each insance of `E!` now increments the nodes error count

Remove incrementErrorCount from handler

Accidentally added it here.

Change errors to k8s_errors in K8sAutoscaleNode

Rename error_count to errors for node error count

Add node errors to node struct

Doing this prevents an unnecessary map lookup each time the error count
is updated.

Pass JoinNode pointer to joinset JoinIntoBatch

To increment the nodes error count, a reference to the node must be
passed down.

Add entry to changelog

Remove batch errors in favor of global node error

Remove k8 errors in favor of global errors

Remove JoinNode reference from JoinIntoBatch fn
  • Loading branch information
desa committed Feb 23, 2017
1 parent e4bf8af commit 33f9997
Show file tree
Hide file tree
Showing 16 changed files with 61 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
### Features

- [#1159](https://github.com/influxdata/kapacitor/pulls/1159): Go version 1.7.4 -> 1.7.5
- [#1175](https://github.com/influxdata/kapacitor/pull/1175): BREAKING: Add generic error counters to every node type.
Renamed `query_errors` to `errors` in batch node.
Renamed `eval_errors` to `errors` in eval node.

### Bugfixes

Expand Down
6 changes: 6 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,13 @@ func (a *AlertNode) restoreEventState(id string) (alert.Level, time.Time) {
if anonFound && topicFound {
// Anon topic takes precedence
if err := a.et.tm.AlertService.UpdateEvent(a.topic, anonTopicState); err != nil {
a.incrementErrorCount()
a.logger.Printf("E! failed to update topic %q event state for event %q", a.topic, id)
}
} else if topicFound && a.hasAnonTopic() {
// Update event state for topic
if err := a.et.tm.AlertService.UpdateEvent(a.anonTopic, topicState); err != nil {
a.incrementErrorCount()
a.logger.Printf("E! failed to update topic %q event state for event %q", a.topic, id)
}
} // else nothing was found, nothing to do
Expand Down Expand Up @@ -741,6 +743,7 @@ func (a *AlertNode) handleEvent(event alert.Event) {
err := a.et.tm.AlertService.Collect(event)
if err != nil {
a.eventsDropped.Add(1)
a.incrementErrorCount()
a.logger.Println("E!", err)
}
}
Expand All @@ -751,6 +754,7 @@ func (a *AlertNode) handleEvent(event alert.Event) {
err := a.et.tm.AlertService.Collect(event)
if err != nil {
a.eventsDropped.Add(1)
a.incrementErrorCount()
a.logger.Println("E!", err)
}
}
Expand All @@ -762,6 +766,7 @@ func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map
}
if rse := a.levelResets[currentLevel]; rse != nil {
if pass, err := EvalPredicate(rse, a.lrScopePools[currentLevel], now, fields, tags); err != nil {
a.incrementErrorCount()
a.logger.Printf("E! error evaluating reset expression for current level %v: %s", currentLevel, err)
} else if !pass {
return currentLevel
Expand All @@ -783,6 +788,7 @@ func (a *AlertNode) findFirstMatchLevel(start alert.Level, stop alert.Level, now
continue
}
if pass, err := EvalPredicate(se, a.scopePools[l], now, fields, tags); err != nil {
a.incrementErrorCount()
a.logger.Printf("E! error evaluating expression for level %v: %s", alert.Level(l), err)
continue
} else if pass {
Expand Down
8 changes: 2 additions & 6 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
)

const (
statsQueryErrors = "query_errors"
statsBatchesQueried = "batches_queried"
statsPointsQueried = "points_queried"
)
Expand Down Expand Up @@ -137,7 +136,6 @@ type QueryNode struct {
closing chan struct{}
aborting chan struct{}

queryErrors *expvar.Int
batchesQueried *expvar.Int
pointsQueried *expvar.Int
byName bool
Expand Down Expand Up @@ -266,11 +264,9 @@ func (b *QueryNode) Queries(start, stop time.Time) ([]*Query, error) {
// Query InfluxDB and collect batches on batch collector.
func (b *QueryNode) doQuery() error {
defer b.ins[0].Close()
b.queryErrors = &expvar.Int{}
b.batchesQueried = &expvar.Int{}
b.pointsQueried = &expvar.Int{}

b.statMap.Set(statsQueryErrors, b.queryErrors)
b.statMap.Set(statsBatchesQueried, b.batchesQueried)
b.statMap.Set(statsPointsQueried, b.pointsQueried)

Expand Down Expand Up @@ -305,7 +301,7 @@ func (b *QueryNode) doQuery() error {
}
resp, err := con.Query(q)
if err != nil {
b.queryErrors.Add(1)
b.incrementErrorCount()
b.logger.Println("E!", err)
b.timer.Stop()
break
Expand All @@ -315,8 +311,8 @@ func (b *QueryNode) doQuery() error {
for _, res := range resp.Results {
batches, err := models.ResultToBatches(res, b.byName)
if err != nil {
b.incrementErrorCount()
b.logger.Println("E! failed to understand query result:", err)
b.queryErrors.Add(1)
continue
}
for _, bch := range batches {
Expand Down
1 change: 1 addition & 0 deletions combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (n *CombineNode) combineBuffer(buf *buffer) error {
for i := range expressions {
matched, err := EvalPredicate(expressions[i], n.scopePools[i], p.Time, p.Fields, p.Tags)
if err != nil {
n.incrementErrorCount()
n.logger.Println("E! evaluating lambda expression:", err)
}
matches[i][idx] = matched
Expand Down
3 changes: 3 additions & 0 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,21 @@ func (d *DerivativeNode) runDerivative([]byte) error {
func (d *DerivativeNode) derivative(prev, curr models.Fields, prevTime, currTime time.Time) (float64, bool) {
f0, ok := numToFloat(prev[d.d.Field])
if !ok {
d.incrementErrorCount()
d.logger.Printf("E! cannot apply derivative to type %T", prev[d.d.Field])
return 0, false
}

f1, ok := numToFloat(curr[d.d.Field])
if !ok {
d.incrementErrorCount()
d.logger.Printf("E! cannot apply derivative to type %T", curr[d.d.Field])
return 0, false
}

elapsed := float64(currTime.Sub(prevTime))
if elapsed == 0 {
d.incrementErrorCount()
d.logger.Printf("E! cannot perform derivative elapsed time was 0")
return 0, false
}
Expand Down
13 changes: 2 additions & 11 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@ import (
"log"
"time"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
)

const (
statsEvalErrors = "eval_errors"
)

type EvalNode struct {
node
e *pipeline.EvalNode
Expand All @@ -25,8 +20,6 @@ type EvalNode struct {
refVarList [][]string
scopePool stateful.ScopePool
tags map[string]bool

evalErrors *expvar.Int
}

// Create a new EvalNode which applies a transformation func to each point in a stream and returns a single point.
Expand Down Expand Up @@ -69,16 +62,14 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}

func (e *EvalNode) runEval(snapshot []byte) error {
e.evalErrors = &expvar.Int{}
e.statMap.Set(statsEvalErrors, e.evalErrors)
switch e.Provides() {
case pipeline.StreamEdge:
var err error
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
e.timer.Start()
p.Fields, p.Tags, err = e.eval(p.Time, p.Group, p.Fields, p.Tags)
if err != nil {
e.evalErrors.Add(1)
e.incrementErrorCount()
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
Expand All @@ -102,7 +93,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
p := b.Points[i]
b.Points[i].Fields, b.Points[i].Tags, err = e.eval(p.Time, b.Group, p.Fields, p.Tags)
if err != nil {
e.evalErrors.Add(1)
e.incrementErrorCount()
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
Expand Down
1 change: 1 addition & 0 deletions flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ POINTS:
fieldPrefix.WriteString(v)
fieldPrefix.WriteString(n.f.Delimiter)
} else {
n.incrementErrorCount()
n.logger.Printf("E! point missing tag %q for flatten operation", tag)
continue POINTS
}
Expand Down
4 changes: 4 additions & 0 deletions influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (i *InfluxDBOutNode) runOut([]byte) error {
return nil
}()
if err != nil {
i.incrementErrorCount()
i.logger.Printf("E! failed to create database %q on cluster %q: %v", i.i.Database, i.i.Cluster, err)
}
}
Expand Down Expand Up @@ -234,6 +235,7 @@ func (w *writeBuffer) run() {
if !ok {
bp, err = influxdb.NewBatchPoints(qe.bpc)
if err != nil {
w.i.incrementErrorCount()
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
break
}
Expand All @@ -244,6 +246,7 @@ func (w *writeBuffer) run() {
if len(bp.Points()) >= w.size {
err = w.write(bp)
if err != nil {
w.i.incrementErrorCount()
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
}
delete(w.buffer, qe.bpc)
Expand All @@ -265,6 +268,7 @@ func (w *writeBuffer) writeAll() {
for bpc, bp := range w.buffer {
err := w.write(bp)
if err != nil {
w.i.incrementErrorCount()
w.i.logger.Println("E! failed to write points to InfluxDB:", err)
}
delete(w.buffer, bpc)
Expand Down
9 changes: 9 additions & 0 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,29 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
if n.isStreamTransformation {
err := context.AggregatePoint(&p)
if err != nil {
n.incrementErrorCount()
n.logger.Println("E! failed to aggregate point:", err)
}
p, ok = n.ins[0].NextPoint()

err = n.emit(context)
if err != nil && err != ErrEmptyEmit {
n.incrementErrorCount()
n.logger.Println("E! failed to emit stream:", err)
}
} else {
if p.Time.Equal(context.Time()) {
err := context.AggregatePoint(&p)
if err != nil {
n.incrementErrorCount()
n.logger.Println("E! failed to aggregate point:", err)
}
// advance to next point
p, ok = n.ins[0].NextPoint()
} else {
err := n.emit(context)
if err != nil {
n.incrementErrorCount()
n.logger.Println("E! failed to emit stream:", err)
}

Expand Down Expand Up @@ -180,9 +184,11 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
Tags: bp.Tags,
}
if err := context.AggregatePoint(&p); err != nil {
n.incrementErrorCount()
n.logger.Println("E! failed to aggregate batch point:", err)
}
if ep, err := context.EmitPoint(); err != nil && err != ErrEmptyEmit {
n.incrementErrorCount()
n.logger.Println("E! failed to emit batch point:", err)
} else if err != ErrEmptyEmit {
eb.Points = append(eb.Points, models.BatchPoint{
Expand All @@ -196,6 +202,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
n.timer.Pause()
for _, out := range n.outs {
if err := out.CollectBatch(eb); err != nil {
n.incrementErrorCount()
n.logger.Println("E! failed to emit batch points:", err)
}
}
Expand All @@ -204,9 +211,11 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
err := context.AggregateBatch(&b)
if err == nil {
if err := n.emit(context); err != nil {
n.incrementErrorCount()
n.logger.Println("E! failed to emit batch:", err)
}
} else {
n.incrementErrorCount()
n.logger.Println("E! failed to aggregate batch:", err)
}
}
Expand Down
7 changes: 7 additions & 0 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (j *JoinNode) getGroup(p models.PointInterface, groupErrs chan<- error) *gr
go func() {
err := group.run()
if err != nil {
j.incrementErrorCount()
j.logger.Println("E! join group error:", err)
select {
case groupErrs <- err:
Expand Down Expand Up @@ -342,6 +343,7 @@ func (g *group) collect(i int, p models.PointInterface) error {
sets := g.sets[t]
if len(sets) == 0 {
set = newJoinset(
g.j,
g.j.j.StreamName,
g.j.fill,
g.j.fillValue,
Expand All @@ -362,6 +364,7 @@ func (g *group) collect(i int, p models.PointInterface) error {
}
if set == nil {
set = newJoinset(
g.j,
g.j.j.StreamName,
g.j.fill,
g.j.fillValue,
Expand Down Expand Up @@ -466,6 +469,7 @@ func (g *group) emitJoinedSet(set *joinset) error {

// represents a set of points or batches from the same joined time
type joinset struct {
j *JoinNode
name string
fill influxql.FillOption
fillValue interface{}
Expand All @@ -486,6 +490,7 @@ type joinset struct {
}

func newJoinset(
n *JoinNode,
name string,
fill influxql.FillOption,
fillValue interface{},
Expand All @@ -497,6 +502,7 @@ func newJoinset(
) *joinset {
expected := len(prefixes)
return &joinset{
j: n,
name: name,
fill: fill,
fillValue: fillValue,
Expand Down Expand Up @@ -599,6 +605,7 @@ BATCH_POINT:
}
b, ok := batch.(models.Batch)
if !ok {
js.j.incrementErrorCount()
js.logger.Printf("E! invalid join data got %T expected models.Batch", batch)
return models.Batch{}, false
}
Expand Down
7 changes: 2 additions & 5 deletions k8s_autoscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
const (
statsK8sIncreaseEventsCount = "increase_events"
statsK8sDecreaseEventsCount = "decrease_events"
statsK8sErrorsCount = "errors"
statsK8sCooldownDropsCount = "cooldown_drops"
)

Expand Down Expand Up @@ -69,20 +68,18 @@ func newK8sAutoscaleNode(et *ExecutingTask, n *pipeline.K8sAutoscaleNode, l *log
func (k *K8sAutoscaleNode) runAutoscale([]byte) error {
k.increaseCount = &expvar.Int{}
k.decreaseCount = &expvar.Int{}
errorsCount := &expvar.Int{}
k.cooldownDropsCount = &expvar.Int{}

k.statMap.Set(statsK8sIncreaseEventsCount, k.increaseCount)
k.statMap.Set(statsK8sDecreaseEventsCount, k.decreaseCount)
k.statMap.Set(statsK8sErrorsCount, errorsCount)
k.statMap.Set(statsK8sCooldownDropsCount, k.cooldownDropsCount)

switch k.Wants() {
case pipeline.StreamEdge:
for p, ok := k.ins[0].NextPoint(); ok; p, ok = k.ins[0].NextPoint() {
k.timer.Start()
if np, err := k.handlePoint(p.Name, p.Group, p.Dimensions, p.Time, p.Fields, p.Tags); err != nil {
errorsCount.Add(1)
k.incrementErrorCount()
k.logger.Println("E!", err)
} else if np.Name != "" {
k.timer.Pause()
Expand All @@ -101,7 +98,7 @@ func (k *K8sAutoscaleNode) runAutoscale([]byte) error {
k.timer.Start()
for _, p := range b.Points {
if np, err := k.handlePoint(b.Name, b.Group, b.PointDimensions(), p.Time, p.Fields, p.Tags); err != nil {
errorsCount.Add(1)
k.incrementErrorCount()
k.logger.Println("E!", err)
} else if np.Name != "" {
k.timer.Pause()
Expand Down
2 changes: 2 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *LogNode) runLog([]byte) error {
for p, ok := s.ins[0].NextPoint(); ok; p, ok = s.ins[0].NextPoint() {
buf.Reset()
if err := env.Encode(p); err != nil {
s.incrementErrorCount()
s.logger.Println("E!", err)
continue
}
Expand All @@ -56,6 +57,7 @@ func (s *LogNode) runLog([]byte) error {
for b, ok := s.ins[0].NextBatch(); ok; b, ok = s.ins[0].NextBatch() {
buf.Reset()
if err := env.Encode(b); err != nil {
s.incrementErrorCount()
s.logger.Println("E!", err)
continue
}
Expand Down
Loading

0 comments on commit 33f9997

Please sign in to comment.