Skip to content

Commit

Permalink
Align deadman times (influxdata#688)
Browse files Browse the repository at this point in the history
* align deadman times

* update comment
  • Loading branch information
Nathaniel Cook authored Jun 30, 2016
1 parent b56550e commit ce92786
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [#674](https://github.com/influxdata/kapacitor/issue/674): Fix panic with Join On and batches.
- [#665](https://github.com/influxdata/kapacitor/issue/665): BREAKING: Fix file mode not being correct for Alert.Log files.
Breaking change is that integers numbers prefixed with a 0 in TICKscript are interpreted as octal numbers.
- [#667](https://github.com/influxdata/kapacitor/issue/667): Align deadman timestamps to interval.

## v1.0.0-beta2 [2016-06-17]

Expand Down
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (t *timeTicker) Start() <-chan time.Time {
case <-t.stopping:
return
case now := <-t.ticker.C:
now = now.Truncate(t.every)
now = now.Round(t.every)
t.alignChan <- now
}
}
Expand Down
3 changes: 2 additions & 1 deletion pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ const intervalMarker = "INTERVAL"
// // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
// data
// |stats(10s)
// .align()
// |derivative('emitted')
// .unit(10s)
// .nonNegative()
Expand Down Expand Up @@ -259,7 +260,7 @@ const intervalMarker = "INTERVAL"
// data...
//
func (n *node) Deadman(threshold float64, interval time.Duration, expr ...*ast.LambdaNode) *AlertNode {
dn := n.Stats(interval).
dn := n.Stats(interval).Align().
Derivative("emitted").NonNegative()
dn.Unit = interval

Expand Down
10 changes: 10 additions & 0 deletions pipeline/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type StatsNode struct {
SourceNode Node
// tick:ignore
Interval time.Duration

// tick:ignore
AlignFlag bool `tick:"Align"`
}

func newStatsNode(n Node, interval time.Duration) *StatsNode {
Expand All @@ -49,3 +52,10 @@ func newStatsNode(n Node, interval time.Duration) *StatsNode {
Interval: interval,
}
}

// Round times to the StatsNode.Interval value.
// tick:property
func (n *StatsNode) Align() *StatsNode {
n.AlignFlag = true
return n
}
69 changes: 48 additions & 21 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,64 @@ func newStatsNode(et *ExecutingTask, n *pipeline.StatsNode, l *log.Logger) (*Sta
}

func (s *StatsNode) runStats([]byte) error {
if s.s.AlignFlag {
// Wait till we are roughly aligned with the interval.
now := time.Now()
next := now.Truncate(s.s.Interval).Add(s.s.Interval)
after := time.NewTicker(next.Sub(now))
select {
case <-after.C:
after.Stop()
case <-s.closing:
after.Stop()
return nil
}
if err := s.emit(now); err != nil {
return err
}
}
ticker := time.NewTicker(s.s.Interval)
defer ticker.Stop()
point := models.Point{
Name: "stats",
Tags: map[string]string{"node": s.en.Name()},
}
for {
select {
case <-s.closing:
return nil
case now := <-ticker.C:
s.timer.Start()
point.Time = now.UTC()
stats := s.en.nodeStatsByGroup()
for group, stat := range stats {
point.Fields = stat.Fields
point.Group = group
point.Dimensions = stat.Dimensions
point.Tags = stat.Tags
s.timer.Pause()
for _, out := range s.outs {
err := out.CollectPoint(point)
if err != nil {
return err
}
}
s.timer.Resume()
if err := s.emit(now); err != nil {
return err
}
}
}
}

// Emit a set of stats data points.
func (s *StatsNode) emit(now time.Time) error {
s.timer.Start()
point := models.Point{
Name: "stats",
Tags: map[string]string{"node": s.en.Name()},
Time: now.UTC(),
}
if s.s.AlignFlag {
point.Time = point.Time.Round(s.s.Interval)
}
stats := s.en.nodeStatsByGroup()
for group, stat := range stats {
point.Fields = stat.Fields
point.Group = group
point.Dimensions = stat.Dimensions
point.Tags = stat.Tags
s.timer.Pause()
for _, out := range s.outs {
err := out.CollectPoint(point)
if err != nil {
return err
}
s.timer.Stop()
}
s.timer.Resume()
}
s.timer.Stop()
return nil
}

func (s *StatsNode) stopStats() {
Expand Down

0 comments on commit ce92786

Please sign in to comment.