Skip to content

Commit

Permalink
support for db.rp.name and batch has its own model now
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 14, 2015
1 parent c789f65 commit 7d84343
Show file tree
Hide file tree
Showing 39 changed files with 1,354 additions and 703 deletions.
121 changes: 22 additions & 99 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,19 @@ package kapacitor
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math"
"net/http"

"github.com/influxdb/kapacitor/expr"
"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
)

type AlertHandler func(pts []*models.Point)
type AlertHandler func(batch models.Batch)

type AlertNode struct {
node
a *pipeline.AlertNode
endpoint string
handlers []AlertHandler
predicate *expr.Tree
funcs expr.Funcs
fs []exprFunc
}

type exprFunc interface {
name() string
fnc() expr.Func
a *pipeline.AlertNode
endpoint string
handlers []AlertHandler
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
Expand All @@ -42,107 +30,42 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err
if n.Post != "" {
an.handlers = append(an.handlers, an.handlePost)
}
// Parse predicate
an.predicate, err = expr.Parse(n.Predicate)
if err != nil {
return nil, err
}
if an.predicate.RType() != expr.ReturnBool {
return nil, fmt.Errorf("Predicate does not evaluate to boolean value %q", n.Predicate)
}

// Initialize functions for the predicate
an.fs = append(an.fs, &sigma{})

an.funcs = make(expr.Funcs)
for _, f := range an.fs {
an.funcs[f.name()] = f.fnc()
}

return
}

func (a *AlertNode) runAlert() error {
switch a.Wants() {
case pipeline.StreamEdge:
for p := a.ins[0].NextPoint(); p != nil; p = a.ins[0].NextPoint() {
if c, err := a.check(p); err != nil {
return err
} else if c {
for _, h := range a.handlers {
go h([]*models.Point{p})
}
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
batch := models.Batch{
Name: p.Name,
Group: p.Group,
Tags: p.Tags,
Points: []models.TimeFields{{Time: p.Time, Fields: p.Fields}},
}
for _, h := range a.handlers {
h(batch)
}
}
case pipeline.BatchEdge:
for w := a.ins[0].NextBatch(); w != nil; w = a.ins[0].NextBatch() {
for _, p := range w {
if c, err := a.check(p); err != nil {
return err
} else if c {
for _, h := range a.handlers {
go h(w)
}
break
}
for w, ok := a.ins[0].NextBatch(); ok; w, ok = a.ins[0].NextBatch() {
for _, h := range a.handlers {
h(w)
}
}
}
return nil
}

func (a *AlertNode) check(p *models.Point) (bool, error) {
vars := make(expr.Vars)
for k, v := range p.Fields {
if f, ok := v.(float64); ok {
vars[k] = f
} else {
return false, fmt.Errorf("field values must be float64")
}
}
b, err := a.predicate.EvalBool(vars, a.funcs)
return b, err
}

func (a *AlertNode) handlePost(pts []*models.Point) {
b, err := json.Marshal(pts)
func (a *AlertNode) handlePost(batch models.Batch) {
b, err := json.Marshal(batch)
if err != nil {
a.logger.Println("E! failed to marshal points json")
a.logger.Println("E! failed to marshal batch json", err)
return
}
buf := bytes.NewBuffer(b)
http.Post(a.a.Post, "application/json", buf)
}

type sigma struct {
mean float64
variance float64
m2 float64
n float64
}

func (s *sigma) name() string {
return "sigma"
}

func (s *sigma) fnc() expr.Func {
return s.call
}

func (s *sigma) call(args ...float64) (float64, error) {
if len(args) != 1 {
return 0, errors.New("sigma expected exactly one argument")
}
x := args[0]
s.n++
delta := x - s.mean
s.mean = s.mean + delta/s.n
s.m2 = s.m2 + delta*(x-s.mean)
s.variance = s.m2 / (s.n - 1)

if s.n < 2 {
return 0, nil
_, err = http.Post(a.a.Post, "application/json", buf)
if err != nil {
a.logger.Println("E! failed to POST batch", err)
}
return math.Abs(x-s.mean) / math.Sqrt(s.variance), nil

}
20 changes: 10 additions & 10 deletions apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/influxdb/kapacitor/pipeline"
)

type TransFunc func(p *models.Point) (*models.Point, error)
type TransFunc func(f models.Fields) (models.Fields, error)

type ApplyNode struct {
node
Expand All @@ -33,30 +33,30 @@ func newApplyNode(et *ExecutingTask, n *pipeline.ApplyNode) (*ApplyNode, error)
func (a *ApplyNode) runApply() error {
switch a.Provides() {
case pipeline.StreamEdge:
for p := a.ins[0].NextPoint(); p != nil; p = a.ins[0].NextPoint() {
np, err := a.Func(p)
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
fields, err := a.Func(p.Fields)
if err != nil {
return err
}
p.Fields = fields
for _, child := range a.outs {
err := child.CollectPoint(np)
err := child.CollectPoint(p)
if err != nil {
return err
}
}
}
case pipeline.BatchEdge:
for b := a.ins[0].NextBatch(); b != nil; b = a.ins[0].NextBatch() {
nb := make([]*models.Point, len(b))
for i, p := range b {
np, err := a.Func(p)
for b, ok := a.ins[0].NextBatch(); ok; b, ok = a.ins[0].NextBatch() {
for i, p := range b.Points {
fields, err := a.Func(p.Fields)
if err != nil {
return err
}
nb[i] = np
b.Points[i].Fields = fields
}
for _, child := range a.outs {
err := child.CollectBatch(nb)
err := child.CollectBatch(b)
if err != nil {
return err
}
Expand Down
19 changes: 9 additions & 10 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,18 @@ func (b *BatchNode) Query(batch BatchCollector) {
return
}
for _, series := range res.Series {
bch := make([]*models.Point, len(series.Values))
groupID := models.TagsToGroupID(
models.SortedKeys(series.Tags),
series.Tags,
)
bch := models.Batch{
Name: series.Name,
Group: groupID,
Tags: series.Tags,
}
bch.Points = make([]models.TimeFields, len(series.Values))
for i, v := range series.Values {
fields := make(map[string]interface{})
fields := make(models.Fields)
var t time.Time
for i, c := range series.Columns {
if c == "time" {
Expand All @@ -102,13 +107,7 @@ func (b *BatchNode) Query(batch BatchCollector) {
fields[c] = v[i]
}
}
bch[i] = models.NewPoint(
series.Name,
groupID,
series.Tags,
fields,
t,
)
bch.Points[i] = models.TimeFields{Time: t, Fields: fields}
}
batch.CollectBatch(bch)
}
Expand All @@ -123,7 +122,7 @@ func (b *BatchNode) stopBatch() {
}

func (b *BatchNode) runBatch() error {
for bt := b.ins[0].NextBatch(); bt != nil; bt = b.ins[0].NextBatch() {
for bt, ok := b.ins[0].NextBatch(); ok; bt, ok = b.ins[0].NextBatch() {
for _, child := range b.outs {
err := child.CollectBatch(bt)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"github.com/influxdb/kapacitor/services/influxdb"
"github.com/influxdb/kapacitor/services/replay"
"github.com/influxdb/kapacitor/services/task_store"
"github.com/influxdb/kapacitor/services/udp"

"github.com/influxdb/influxdb/services/collectd"
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/udp"
)

// Config represents the configuration format for the kapacitord binary.
Expand Down Expand Up @@ -70,12 +70,18 @@ func NewDemoConfig() (*Config, error) {

c.Replay.Dir = filepath.Join(homeDir, ".kapacitor", c.Replay.Dir)
c.Task.Dir = filepath.Join(homeDir, ".kapacitor", c.Task.Dir)
c.InfluxDB.Dir = filepath.Join(homeDir, ".kapacitor", c.InfluxDB.Dir)

c.Hostname, _ = os.Hostname()

return c, nil
}

// Validate returns an error if the config is invalid.
func (c *Config) Validate() error {
if c.Hostname == "" {
return fmt.Errorf("must configure valid hostname")
}
err := c.Replay.Validate()
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
"github.com/influxdb/kapacitor/services/replay"
"github.com/influxdb/kapacitor/services/streamer"
"github.com/influxdb/kapacitor/services/task_store"
"github.com/influxdb/kapacitor/services/udp"

"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/collectd"
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/udp"
)

// BuildInfo represents the build details for the server code.
Expand Down Expand Up @@ -85,8 +85,8 @@ func NewServer(c *Config, buildInfo *BuildInfo, l *log.Logger) (*Server, error)

// Append Kapacitor services.
s.appendStreamerService()
s.appendInfluxDBService(c.InfluxDB)
s.appendHTTPDService(c.HTTP)
s.appendInfluxDBService(c.InfluxDB, c.Hostname)
s.appendTaskStoreService(c.Task)
s.appendReplayStoreService(c.Replay)

Expand Down Expand Up @@ -115,8 +115,9 @@ func (s *Server) appendStreamerService() {
s.Services = append(s.Services, srv)
}

func (s *Server) appendInfluxDBService(c influxdb.Config) {
srv := influxdb.NewService(c)
func (s *Server) appendInfluxDBService(c influxdb.Config, hostname string) {
srv := influxdb.NewService(c, hostname)
srv.PointsWriter = s.Streamer

s.InfluxDB = srv
s.TaskMaster.InfluxDBService = srv
Expand Down
Loading

0 comments on commit 7d84343

Please sign in to comment.