Skip to content

Commit

Permalink
add timer logic to each node
Browse files Browse the repository at this point in the history
add throughput to task
  • Loading branch information
nathanielc committed Feb 22, 2016
1 parent 1df9c62 commit 246f111
Show file tree
Hide file tree
Showing 28 changed files with 467 additions and 92 deletions.
5 changes: 5 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,11 @@ func (a *AlertNode) runAlert([]byte) error {
switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
a.timer.Start()
l := a.determineLevel(p.Time, p.Fields, p.Tags)
state := a.updateState(l, p.Group)
if (a.a.UseFlapping && state.flapping) || (a.a.IsStateChangesOnly && !state.changed) {
a.timer.Stop()
continue
}
// send alert if we are not OK or we are OK and state changed (i.e recovery)
Expand All @@ -293,9 +295,11 @@ func (a *AlertNode) runAlert([]byte) error {
h(ad)
}
}
a.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := a.ins[0].NextBatch(); ok; b, ok = a.ins[0].NextBatch() {
a.timer.Start()
triggered := false
for _, p := range b.Points {
l := a.determineLevel(p.Time, p.Fields, p.Tags)
Expand Down Expand Up @@ -331,6 +335,7 @@ func (a *AlertNode) runAlert([]byte) error {
}
}
}
a.timer.Stop()
}
}
return nil
Expand Down
12 changes: 11 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string {

// Do not add the source batch node to the dot output
// since its not really an edge.
func (s *SourceBatchNode) edot(buf *bytes.Buffer) {
func (s *SourceBatchNode) edot(*bytes.Buffer, time.Duration) {}

func (s *SourceBatchNode) collectedCount() (count int64) {
for _, child := range s.children {
count += child.collectedCount()
}
return
}

type BatchNode struct {
Expand Down Expand Up @@ -244,6 +250,7 @@ func (b *BatchNode) doQuery() error {
case <-b.aborting:
return errors.New("batch doQuery aborted")
case now := <-tickC:
b.timer.Start()

// Update times for query
stop := now.Add(-1 * b.b.Offset)
Expand Down Expand Up @@ -277,10 +284,13 @@ func (b *BatchNode) doQuery() error {
if err != nil {
return err
}
b.timer.Pause()
for _, bch := range batches {
b.ins[0].CollectBatch(bch)
}
b.timer.Resume()
}
b.timer.Stop()
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (s *Server) appendStatsService(c stats.Config) {
srv := stats.NewService(c, l)
srv.TaskMaster = s.TaskMaster

s.TaskMaster.TimingService = srv
s.Services = append(s.Services, srv)
}
}
Expand Down
11 changes: 9 additions & 2 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,16 @@ func TestServer_EnableTask(t *testing.T) {
if ti.TICKscript != tick {
t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick)
}
dot := "digraph testTaskName {\nsrcstream0 -> stream1 [label=\"0\"];\n}"
dot := `digraph testTaskName {
graph [label="Throughput: 0.00 points/s"];
srcstream0 [label="srcstream0 0"];
srcstream0 -> stream1 [label="0"];
stream1 [label="stream1 0"];
}`
if ti.Dot != dot {
t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot)
t.Fatalf("unexpected dot got\n%s exp\n%s", ti.Dot, dot)
}
}

Expand Down
6 changes: 6 additions & 0 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
case pipeline.StreamEdge:
previous := make(map[models.GroupID]models.Point)
for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
pr, ok := previous[p.Group]
if !ok {
previous[p.Group] = p
Expand All @@ -40,17 +41,21 @@ func (d *DerivativeNode) runDerivative([]byte) error {
fields := pr.Fields.Copy()
fields[d.d.As] = value
pr.Fields = fields
d.timer.Pause()
for _, child := range d.outs {
err := child.CollectPoint(pr)
if err != nil {
return err
}
}
d.timer.Resume()
}
previous[p.Group] = p
d.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := d.ins[0].NextBatch(); ok; b, ok = d.ins[0].NextBatch() {
d.timer.Start()
if len(b.Points) > 0 {
pr := b.Points[0]
var p models.BatchPoint
Expand All @@ -69,6 +74,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
}
b.Points = b.Points[:len(b.Points)-1]
}
d.timer.Stop()
for _, child := range d.outs {
err := child.CollectBatch(b)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ func (e *EvalNode) runEval(snapshot []byte) error {
switch e.Provides() {
case pipeline.StreamEdge:
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
e.timer.Start()
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
p.Fields = fields
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectPoint(p)
if err != nil {
Expand All @@ -53,13 +55,15 @@ func (e *EvalNode) runEval(snapshot []byte) error {
}
case pipeline.BatchEdge:
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
for i, p := range b.Points {
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
b.Points[i].Fields = fields
}
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectBatch(b)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (g *GroupByNode) runGroupBy([]byte) error {
switch g.Wants() {
case pipeline.StreamEdge:
for pt, ok := g.ins[0].NextPoint(); ok; pt, ok = g.ins[0].NextPoint() {
g.timer.Start()
pt = setGroupOnPoint(pt, g.allDimensions, g.dimensions)
g.timer.Stop()
for _, child := range g.outs {
err := child.CollectPoint(pt)
if err != nil {
Expand All @@ -42,6 +44,7 @@ func (g *GroupByNode) runGroupBy([]byte) error {
}
default:
for b, ok := g.ins[0].NextBatch(); ok; b, ok = g.ins[0].NextBatch() {
g.timer.Start()
groups := make(map[models.GroupID]*models.Batch)
for _, p := range b.Points {
var dims []string
Expand All @@ -67,7 +70,7 @@ func (g *GroupByNode) runGroupBy([]byte) error {
}
group.Points = append(group.Points, p)
}

g.timer.Stop()
for _, group := range groups {
for _, child := range g.outs {
err := child.CollectBatch(*group)
Expand All @@ -76,7 +79,6 @@ func (g *GroupByNode) runGroupBy([]byte) error {
}
}
}

}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions http_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,17 @@ func (h *HTTPOutNode) runOut([]byte) error {
switch h.Wants() {
case pipeline.StreamEdge:
for p, ok := h.ins[0].NextPoint(); ok; p, ok = h.ins[0].NextPoint() {
h.timer.Start()
row := models.PointToRow(p)
h.updateResultWithRow(p.Group, row)
h.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := h.ins[0].NextBatch(); ok; b, ok = h.ins[0].NextBatch() {
h.timer.Start()
row := models.BatchToRow(b)
h.updateResultWithRow(b.Group, row)
h.timer.Stop()
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (i *InfluxDBOutNode) runOut([]byte) error {
switch i.Wants() {
case pipeline.StreamEdge:
for p, ok := i.ins[0].NextPoint(); ok; p, ok = i.ins[0].NextPoint() {
i.timer.Start()
batch := models.Batch{
Name: p.Name,
Group: p.Group,
Expand All @@ -37,13 +38,16 @@ func (i *InfluxDBOutNode) runOut([]byte) error {
if err != nil {
return err
}
i.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := i.ins[0].NextBatch(); ok; b, ok = i.ins[0].NextBatch() {
i.timer.Start()
err := i.write("", "", b)
if err != nil {
return err
}
i.timer.Stop()
}
}
return nil
Expand Down
89 changes: 57 additions & 32 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/timer"
"github.com/influxdb/influxdb/influxql"
)

Expand All @@ -18,7 +19,7 @@ type JoinNode struct {
fill influxql.FillOption
fillValue interface{}
groups map[models.GroupID]*group
mu sync.Mutex
mu sync.RWMutex
runningGroups sync.WaitGroup
}

Expand Down Expand Up @@ -98,6 +99,22 @@ func (j *JoinNode) runJoin([]byte) error {
return nil
}

func (j *JoinNode) nodeExecTime() time.Duration {
j.mu.RLock()
defer j.mu.RUnlock()
sum := 0.0
total := 0.0
for _, group := range j.groups {
avg, count := group.timer.AverageTime()
sum += float64(avg) * float64(count)
total += float64(count)
}
if total == 0 {
return 0
}
return time.Duration(sum / total)
}

// safely get the group for the point or create one if it doesn't exist.
func (j *JoinNode) getGroup(p models.PointInterface) *group {
j.mu.Lock()
Expand All @@ -112,36 +129,6 @@ func (j *JoinNode) getGroup(p models.PointInterface) *group {
return group
}

// emit a single joined set
func (j *JoinNode) emitJoinedSet(set *joinset) error {
if set.name == "" {
set.name = set.First().PointName()
}
switch j.Wants() {
case pipeline.StreamEdge:
p, ok := set.JoinIntoPoint()
if ok {
for _, out := range j.outs {
err := out.CollectPoint(p)
if err != nil {
return err
}
}
}
case pipeline.BatchEdge:
b, ok := set.JoinIntoBatch()
if ok {
for _, out := range j.outs {
err := out.CollectBatch(b)
if err != nil {
return err
}
}
}
}
return nil
}

// represents an incoming data point and which parent it came from
type srcPoint struct {
src int
Expand All @@ -155,6 +142,7 @@ type group struct {
oldestTime time.Time
j *JoinNode
points chan srcPoint
timer timer.Timer
}

func newGroup(i int, j *JoinNode) *group {
Expand All @@ -163,14 +151,17 @@ func newGroup(i int, j *JoinNode) *group {
head: make([]time.Time, i),
j: j,
points: make(chan srcPoint),
timer: j.et.tm.TimingService.NewTimer(),
}
}

// start consuming incoming points
func (g *group) run() {
defer g.j.runningGroups.Done()
for sp := range g.points {
g.timer.Start()
g.collect(sp.src, sp.p)
g.timer.Stop()
}
}

Expand Down Expand Up @@ -211,7 +202,7 @@ func (g *group) collect(i int, p models.PointInterface) {
// emit a set and update the oldestTime.
func (g *group) emit() {
set := g.sets[g.oldestTime]
g.j.emitJoinedSet(set)
g.emitJoinedSet(set)
delete(g.sets, g.oldestTime)

g.oldestTime = time.Time{}
Expand All @@ -229,6 +220,40 @@ func (g *group) emitAll() {
}
}

// emit a single joined set
func (g *group) emitJoinedSet(set *joinset) error {
if set.name == "" {
set.name = set.First().PointName()
}
switch g.j.Wants() {
case pipeline.StreamEdge:
p, ok := set.JoinIntoPoint()
if ok {
g.timer.Pause()
for _, out := range g.j.outs {
err := out.CollectPoint(p)
if err != nil {
return err
}
}
g.timer.Resume()
}
case pipeline.BatchEdge:
b, ok := set.JoinIntoBatch()
if ok {
g.timer.Pause()
for _, out := range g.j.outs {
err := out.CollectBatch(b)
if err != nil {
return err
}
}
g.timer.Resume()
}
}
return nil
}

// represents a set of points or batches from the same joined time
type joinset struct {
name string
Expand Down
Loading

0 comments on commit 246f111

Please sign in to comment.