Skip to content

Commit

Permalink
Merge pull request influxdata#2335 from influxdata/fix/tickerpanic
Browse files Browse the repository at this point in the history
fix: panic when setting a zero interval for ticker
  • Loading branch information
docmerlin authored May 14, 2020
2 parents b53007d + 9a0d74a commit bf60b18
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 10 deletions.
6 changes: 3 additions & 3 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (n *BarrierNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge

func (n *BarrierNode) newBarrier(group edge.GroupInfo, first edge.PointMeta) (edge.ForwardReceiver, func(), error) {
switch {
case n.b.Idle != 0:
case n.b.Idle > 0:
idleBarrier := newIdleBarrier(
first.Name(),
group,
Expand All @@ -69,7 +69,7 @@ func (n *BarrierNode) newBarrier(group edge.GroupInfo, first edge.PointMeta) (ed
n.b.Delete,
)
return idleBarrier, idleBarrier.Stop, nil
case n.b.Period != 0:
case n.b.Period > 0:
periodicBarrier := newPeriodicBarrier(
first.Name(),
group,
Expand All @@ -80,7 +80,7 @@ func (n *BarrierNode) newBarrier(group edge.GroupInfo, first edge.PointMeta) (ed
)
return periodicBarrier, periodicBarrier.Stop, nil
default:
return nil, nil, errors.New("unreachable code, barrier node should have non-zero idle or non-zero period")
return nil, nil, errors.New("unreachable code, barrier node should have positive idle or positive period")
}
}

Expand Down
4 changes: 3 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,16 @@ func newQueryNode(et *ExecutingTask, n *pipeline.QueryNode, d NodeDiagnostic) (*
return nil, errors.New("must not set both 'every' and 'cron' properties")
}
switch {
case n.Every != 0:
case n.Every > 0:
bn.ticker = newTimeTicker(n.Every, n.AlignFlag)
case n.Cron != "":
var err error
bn.ticker, err = newCronTicker(n.Cron)
if err != nil {
return nil, err
}
case n.Every < 0:
return nil, errors.New("'every' duration must must non-negative")
default:
return nil, errors.New("must define one of 'every' or 'cron'")
}
Expand Down
4 changes: 4 additions & 0 deletions pipeline/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"sort"
"time"
Expand Down Expand Up @@ -447,6 +448,9 @@ func unmarshalStats(data []byte, parents []Node, typ TypeOf) (Node, error) {
}
child := nodeParent.Stats(0)
err := json.Unmarshal(data, child)
if child.Interval == 0 {
return nil, errors.New("zero is an invalid stats interval")
}
return child, err
}

Expand Down
28 changes: 23 additions & 5 deletions pipeline/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,21 +994,39 @@ func Test_unmarshalStats(t *testing.T) {
wantErr: true,
},
{
name: "unmarshal stats",
name: "should error when interval is zero",
args: args{
parents: []Node{
&node{},
},
data: []byte(`{
"typeOf": "stats",
"id": "2",
"interval": "5s",
"interval": "0",
"align": true
}`),
typ: TypeOf{
Type: "stats",
},
},
wantErr: true,
},
{
name: "unmarshal stats",
args: args{
parents: []Node{
&node{},
},
data: []byte(`{
"typeOf": "stats",
"id": "2",
"interval": "5s",
"align": true
}`),
typ: TypeOf{
Type: "stats",
},
},
want: &StatsNode{
Interval: 5 * time.Second,
AlignFlag: true,
Expand All @@ -1017,9 +1035,9 @@ func Test_unmarshalStats(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if !tt.wantErr {
p := &Pipeline{}
p1 := tt.args.parents[0]
p := &Pipeline{}
p1 := tt.args.parents[0]
if p1 != nil {
p.addSource(p1)
}
got, err := unmarshalStats(tt.args.data, tt.args.parents, tt.args.typ)
Expand Down
15 changes: 14 additions & 1 deletion stats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kapacitor

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -25,6 +26,11 @@ func newStatsNode(et *ExecutingTask, n *pipeline.StatsNode, d NodeDiagnostic) (*
if en == nil {
return nil, fmt.Errorf("no node found for %s", n.SourceNode.Name())
}

if n.Interval <= 0 {
return nil, errors.New("stats node must have positive interval")
}

sn := &StatsNode{
node: node{Node: n, et: et, diag: d},
s: n,
Expand All @@ -41,7 +47,11 @@ func (n *StatsNode) runStats([]byte) error {
// Wait till we are roughly aligned with the interval.
now := time.Now()
next := now.Truncate(n.s.Interval).Add(n.s.Interval)
after := time.NewTicker(next.Sub(now))
dur := next.Sub(now)
if dur <= 0 { // this can happen during a time-changeover like a leap second, or if something is messing about with the system
return errors.New("alignment interval should be positive but was not")
}
after := time.NewTicker(dur)
select {
case <-after.C:
after.Stop()
Expand All @@ -53,6 +63,9 @@ func (n *StatsNode) runStats([]byte) error {
return err
}
}
if n.s.Interval <= 0 {
return errors.New("stats interval should be positive but was not")
}
ticker := time.NewTicker(n.s.Interval)
defer ticker.Stop()
for {
Expand Down

0 comments on commit bf60b18

Please sign in to comment.