Skip to content

Commit

Permalink
Merge branch 'master' into feature/jsonpipe
Browse files Browse the repository at this point in the history
  • Loading branch information
goller committed Oct 30, 2017
2 parents 20025dc + d97f29f commit 7eed35c
Show file tree
Hide file tree
Showing 43 changed files with 1,062 additions and 333 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
## Unreleased

### Features
- [#1408](https://github.com/influxdata/kapacitor/issues/1408): Add Previous state

- [#1461](https://github.com/influxdata/kapacitor/issues/1461): alert.post and https_post timeouts needed.
- [#1413](https://github.com/influxdata/kapacitor/issues/1413): Add subscriptions modes to InfluxDB subscriptions.
- [#1436](https://github.com/influxdata/kapacitor/issues/1436): Add linear fill support for QueryNode.
- [#1345](https://github.com/influxdata/kapacitor/issues/1345): Add MQTT Alert Handler
Expand All @@ -12,6 +14,7 @@
The breaking change is that the Combine and Flatten nodes previously, but erroneously, operated across batch boundaries; this has been fixed.
- [#1497](https://github.com/influxdata/kapacitor/pull/1497): Add support for Docker Swarm autoscaling services.
- [#1485](https://github.com/influxdata/kapacitor/issues/1485): Add bools field types to UDFs.
- [#1549](https://github.com/influxdata/kapacitor/issues/1549): Add stateless now() function to get the current local time.
- [#1545](https://github.com/influxdata/kapacitor/pull/1545): Add support for timeout, tags and service template in the Alerta AlertNode
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost
Expand All @@ -24,6 +27,7 @@
Load service was added; the service can load tasks/handlers from a directory.
- [#1606](https://github.com/influxdata/kapacitor/pull/1606): Update Go version to 1.9.1
- [#1578](https://github.com/influxdata/kapacitor/pull/1578): Add support for exposing logs via the API.
- [#1605](https://github.com/influxdata/kapacitor/issues/1605): Add support for {{ .Duration }} on Alert Message property

### Bugfixes

Expand All @@ -35,6 +39,8 @@
- [#1516](https://github.com/influxdata/kapacitor/pull/1516): Fix bad PagerDuty test the required server info.
- [#1581](https://github.com/influxdata/kapacitor/pull/1581): Add SNMP sysUpTime to SNMP Trap service
- [#1547](https://github.com/influxdata/kapacitor/issues/1547): Fix panic on recording replay with HTTPPostHandler.
- [#1623](https://github.com/influxdata/kapacitor/issues/1623): Fix k8s incluster master api dns resolution
- [#1630](https://github.com/influxdata/kapacitor/issues/1630): Remove the pidfile after the server has exited.

## v1.3.3 [2017-08-11]

Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ But instead of cloning the main repo, instead clone your fork. Follow the steps
export GOPATH=$HOME/go
mkdir -p $GOPATH/src/github.com/influxdata
cd $GOPATH/src/github.com/influxdata
git clone [email protected]:<username>/kapacitor
git clone [email protected]:<username>/kapacitor.git

Retaining the directory structure `$GOPATH/src/github.com/influxdata` is necessary so that Go imports work correctly.

Expand Down
17 changes: 11 additions & 6 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
Endpoint: p.Endpoint,
Headers: p.Headers,
CaptureResponse: p.CaptureResponseFlag,
Timeout: p.Timeout,
}
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
Expand Down Expand Up @@ -695,7 +696,7 @@ func (n *AlertNode) event(
d time.Duration,
result models.Result,
) (alert.Event, error) {
msg, details, err := n.renderMessageAndDetails(id, name, t, group, tags, fields, level)
msg, details, err := n.renderMessageAndDetails(id, name, t, group, tags, fields, level, d)
if err != nil {
return alert.Event{}, err
}
Expand Down Expand Up @@ -1046,6 +1047,9 @@ type messageInfo struct {

// Time
Time time.Time

// Duration of the alert
Duration time.Duration
}

type detailsInfo struct {
Expand Down Expand Up @@ -1087,7 +1091,7 @@ func (n *AlertNode) renderID(name string, group models.GroupID, tags models.Tags
return id.String(), nil
}

func (n *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group models.GroupID, tags models.Tags, fields models.Fields, level alert.Level) (string, string, error) {
func (n *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group models.GroupID, tags models.Tags, fields models.Fields, level alert.Level, d time.Duration) (string, string, error) {
g := string(group)
if group == models.NilGroup {
g = "nil"
Expand All @@ -1100,10 +1104,11 @@ func (n *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group
Tags: tags,
ServerInfo: n.serverInfo(),
},
ID: id,
Fields: fields,
Level: level.String(),
Time: t,
ID: id,
Fields: fields,
Level: level.String(),
Time: t,
Duration: d,
}

// Grab a buffer for the message template and the details template
Expand Down
30 changes: 16 additions & 14 deletions alert/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ type Event struct {

func (e Event) AlertData() Data {
return Data{
ID: e.State.ID,
Message: e.State.Message,
Details: e.State.Details,
Time: e.State.Time,
Duration: e.State.Duration,
Level: e.State.Level,
Data: e.Data.Result,
ID: e.State.ID,
Message: e.State.Message,
Details: e.State.Details,
Time: e.State.Time,
Duration: e.State.Duration,
Level: e.State.Level,
Data: e.Data.Result,
PreviousLevel: e.previousState.Level,
}
}

Expand Down Expand Up @@ -170,11 +171,12 @@ type TopicState struct {
// Data is a structure that contains relevant data about an alert event.
// The structure is intended to be JSON encoded, providing a consistent data format.
type Data struct {
ID string `json:"id"`
Message string `json:"message"`
Details string `json:"details"`
Time time.Time `json:"time"`
Duration time.Duration `json:"duration"`
Level Level `json:"level"`
Data models.Result `json:"data"`
ID string `json:"id"`
Message string `json:"message"`
Details string `json:"details"`
Time time.Time `json:"time"`
Duration time.Duration `json:"duration"`
Level Level `json:"level"`
Data models.Result `json:"data"`
PreviousLevel Level `json:"previousLevel"`
}
16 changes: 13 additions & 3 deletions cmd/kapacitord/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Command struct {
Commit string

closing chan struct{}
pidfile string
Closed chan struct{}

Stdin io.Reader
Expand Down Expand Up @@ -104,7 +105,7 @@ func (cmd *Command) Run(args ...string) error {
// Initialize Logging Services
cmd.diagService = diagnostic.NewService(config.Logging, cmd.Stdout, cmd.Stderr)
if err := cmd.diagService.Open(); err != nil {
return fmt.Errorf("failed to opend diagnostic service: %v", err)
return fmt.Errorf("failed to open diagnostic service: %v", err)
}

// Initialize cmd diagnostic
Expand All @@ -118,6 +119,7 @@ func (cmd *Command) Run(args ...string) error {
if err := cmd.writePIDFile(options.PIDFile); err != nil {
return fmt.Errorf("write pid file: %s", err)
}
cmd.pidfile = options.PIDFile

// Create server from config and start it.
buildInfo := server.BuildInfo{Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch}
Expand All @@ -142,6 +144,7 @@ func (cmd *Command) Run(args ...string) error {
// Close shuts down the server.
func (cmd *Command) Close() error {
defer close(cmd.Closed)
defer cmd.removePIDFile()
close(cmd.closing)
if cmd.Server != nil {
return cmd.Server.Close()
Expand All @@ -165,6 +168,14 @@ func (cmd *Command) monitorServerErrors() {
}
}

func (cmd *Command) removePIDFile() {
if cmd.pidfile != "" {
if err := os.Remove(cmd.pidfile); err != nil {
cmd.Diag.Error("unable to remove pidfile", err)
}
}
}

// ParseFlags parses the command line flags from args and returns an options set.
func (cmd *Command) ParseFlags(args ...string) (Options, error) {
var options Options
Expand All @@ -191,8 +202,7 @@ func (cmd *Command) writePIDFile(path string) error {
}

// Ensure the required directory structure exists.
err := os.MkdirAll(filepath.Dir(path), 0777)
if err != nil {
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
return fmt.Errorf("mkdir: %s", err)
}

Expand Down
70 changes: 70 additions & 0 deletions cmd/kapacitord/run/command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package run_test

import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"
"text/template"
"time"

"fmt"

"github.com/influxdata/kapacitor/cmd/kapacitord/run"
)

func TestCommand_PIDFile(t *testing.T) {
tmpdir, err := ioutil.TempDir(os.TempDir(), "kapacitord-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)

// Write out a config file that does not attempt to connect to influxdb.
configFile := filepath.Join(tmpdir, "kapacitor.conf")
configTemplate := template.Must(template.New("config_file").Parse(`data_dir = "{{.TempDir}}/data"
[[influxdb]]
enabled = false
[replay]
dir = "{{.TempDir}}/replay"
[storage]
boltdb = "{{.TempDir}}/kapacitor.db"
[task]
dir = "{{.TempDir}}/tasks"
[load]
dir = "{{.TempDir}}/load"
`))
var buf bytes.Buffer
if err := configTemplate.Execute(&buf, map[string]string{"TempDir": tmpdir}); err != nil {
t.Fatalf("unable to generate config file: %s", err)
}
fmt.Println(buf.String())
if err := ioutil.WriteFile(configFile, buf.Bytes(), 0600); err != nil {
t.Fatalf("unable to write %s: %s", configFile, err)
}

pidFile := filepath.Join(tmpdir, "kapacitor.pid")

cmd := run.NewCommand()
if err := cmd.Run("-config", configFile, "-pidfile", pidFile); err != nil {
t.Fatalf("unexpected error: %s", err)
}

if _, err := os.Stat(pidFile); err != nil {
t.Fatalf("could not stat pid file: %s", err)
}
go cmd.Close()

timeout := time.NewTimer(5 * time.Second)
select {
case <-timeout.C:
t.Fatal("unexpected timeout")
case <-cmd.Closed:
timeout.Stop()
}

if _, err := os.Stat(pidFile); err == nil {
t.Fatal("expected pid file to be removed")
}
}
8 changes: 8 additions & 0 deletions examples/nodes/tasks/alert.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('cpu')
.groupBy(*)
|alert()
.crit(lambda: "usage_user" > 80)
6 changes: 6 additions & 0 deletions examples/nodes/tasks/batch.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dbrp "telegraf"."autogen"

batch
|query('SELECT mean("usage_user") FROM "telegraf"."autogen"."cpu"')
.period(1m)
.every(10s)
8 changes: 8 additions & 0 deletions examples/nodes/tasks/combine.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('cpu')
.groupBy(*)
|combine(lambda: "cpu" == 'cpu-total', lambda: TRUE)
.as('total', 'cpu')
9 changes: 9 additions & 0 deletions examples/nodes/tasks/default.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('cpu')
.groupBy(*)
|default()
.tag('sweet_new_tag', 'this_is')
.field('value', 10.9)
9 changes: 9 additions & 0 deletions examples/nodes/tasks/delete.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('cpu')
.groupBy(*)
|delete()
.tag('cpu')
.field('usuage_idle')
7 changes: 7 additions & 0 deletions examples/nodes/tasks/eval.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('cpu')
|eval(lambda: 100.0 - "usage_idle")
.as('usage_not_idle')
7 changes: 7 additions & 0 deletions examples/nodes/tasks/flatten.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('cpu')
|flatten()
.on('cpu')
6 changes: 6 additions & 0 deletions examples/nodes/tasks/groupby.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('system')
|groupBy(*)
6 changes: 6 additions & 0 deletions examples/nodes/tasks/handle_loopback.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dbrp "loopback"."autogen"

stream
|from()
.measurement('system')
.groupBy(*)
9 changes: 9 additions & 0 deletions examples/nodes/tasks/httpout.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('system')
|window()
.period(1m)
.every(10s)
|httpOut('data')
9 changes: 9 additions & 0 deletions examples/nodes/tasks/httppost.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('system')
|window()
.period(1m)
.every(10s)
|httpPost('http://localhost:8080/example')
13 changes: 13 additions & 0 deletions examples/nodes/tasks/influxdbout.tick
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
dbrp "telegraf"."autogen"

stream
|from()
.measurement('system')
|window()
.period(1m)
.every(10s)
|mean('load1')
.as('load1')
|influxDBOut()
.database('mydb')
.measurement('derived_system')
Loading

0 comments on commit 7eed35c

Please sign in to comment.