Skip to content

Commit

Permalink
Add alertpost service
Browse files Browse the repository at this point in the history
Add enough to make tests pass

Add correct POST alert handling

Add tests for .endpoint on .post on alert node

Add alertpost config tests

Remove NewConfig section

Add comments to alertpost/config.go

Add fixes suggested in PR

Move AlertData to alert package from service/alert

Implement Test function

Allow override variables to set map values

Rename alertpost service to httppost

Favorite package name is httpposttest :)

Add endpoint property method to httpPost

Allow variadic httpPost arguments

Update config file for httppost

Add endpoint test to http post node

Make Fixes suggested in PR

Add documentation for variadic arguments it httpPost

Add test for map enviornment vars

Refactor httppost endpoints

Reorder code in httpost service package

Add support for basic auth to endpoints

Make changes from PR

Fix httpPost functionality

Allow headers to be set for alerts via tickscript

Check headers

Unredact headers
  • Loading branch information
desa committed May 1, 2017
1 parent 556792c commit 40023cd
Show file tree
Hide file tree
Showing 25 changed files with 1,256 additions and 171 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased [unreleased]

### Features

- [#117](https://github.com/influxdata/kapacitor/issues/117): Add headers to alert POST requests.

### Bugfixes

- [#1294](https://github.com/influxdata/kapacitor/issues/1294): Fix bug where batch queries would be missing all fields after the first nil field.
Expand Down
20 changes: 11 additions & 9 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/kapacitor/pipeline"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
Expand Down Expand Up @@ -130,15 +131,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
return nil, err
}

// Construct alert handlers
for _, post := range n.PostHandlers {
c := alertservice.PostHandlerConfig{
URL: post.URL,
}
h := alertservice.NewPostHandler(c, l)
an.handlers = append(an.handlers, h)
}

for _, tcp := range n.TcpHandlers {
c := alertservice.TCPHandlerConfig{
Address: tcp.Address,
Expand Down Expand Up @@ -363,6 +355,16 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, h)
}

for _, p := range n.HTTPPostHandlers {
c := httppost.HandlerConfig{
URL: p.URL,
Endpoint: p.Endpoint,
Headers: p.Headers,
}
h := et.tm.HTTPPostService.Handler(c, l)
an.handlers = append(an.handlers, h)
}

for _, og := range n.OpsGenieHandlers {
c := opsgenie.HandlerConfig{
TeamsList: og.TeamsList,
Expand Down
24 changes: 24 additions & 0 deletions alert/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ type Event struct {
previousState EventState
}

func (e Event) AlertData() Data {
return Data{
ID: e.State.ID,
Message: e.State.Message,
Details: e.State.Details,
Time: e.State.Time,
Duration: e.State.Duration,
Level: e.State.Level,
Data: e.Data.Result,
}
}

func (e Event) PreviousState() EventState {
return e.previousState
}
Expand Down Expand Up @@ -154,3 +166,15 @@ type TopicState struct {
Level Level
Collected int64
}

// Data is a structure that contains relevant data about an alert event.
// The structure is intended to be JSON encoded, providing a consistent data format.
type Data struct {
ID string `json:"id"`
Message string `json:"message"`
Details string `json:"details"`
Time time.Time `json:"time"`
Duration time.Duration `json:"duration"`
Level Level `json:"level"`
Data models.Result `json:"data"`
}
14 changes: 14 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,20 @@ default-retention-policy = ""
# The URL for the Pushover API.
url = "https://api.pushover.net/1/messages.json"

##########################################
# Configure Alert POST request Endpoints

# As ENV variables:
# KAPACITOR_HTTPPOST_0_ENDPOINT = "example"
# KAPACITOR_HTTPPOST_0_URL = "http://example.com"
# KAPACITOR_HTTPPOST_0_HEADERS_Example = "header"

# [[httppost]]
# endpoint = "example"
# url = "http://example.com"
# headers = { Example = "your-key" }
# basic-auth = { username = "my-user", password = "my-pass" }

[slack]
# Configure Slack.
enabled = false
Expand Down
44 changes: 38 additions & 6 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,50 @@ package kapacitor

import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httppost"
)

type HTTPPostNode struct {
node
c *pipeline.HTTPPostNode
url string
mu sync.RWMutex
bp *bufpool.Pool
c *pipeline.HTTPPostNode
endpoint *httppost.Endpoint
mu sync.RWMutex
bp *bufpool.Pool
}

// Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint
func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, l *log.Logger) (*HTTPPostNode, error) {

hn := &HTTPPostNode{
node: node{Node: n, et: et, logger: l},
c: n,
bp: bufpool.New(),
url: n.Url,
}

// Should only ever be 0 or 1 from validation of n
if len(n.URLs) == 1 {
e := httppost.NewEndpoint(n.URLs[0], nil, httppost.BasicAuth{})
hn.endpoint = e
}

// Should only ever be 0 or 1 from validation of n
if len(n.HTTPPostEndpoints) == 1 {
endpointName := n.HTTPPostEndpoints[0].Endpoint
e, ok := et.tm.HTTPPostService.Endpoint(endpointName)
if !ok {
return nil, fmt.Errorf("endpoint '%s' does not exist", endpointName)
}
hn.endpoint = e
}

hn.node.runF = hn.runPost
return hn, nil
}
Expand Down Expand Up @@ -72,14 +91,27 @@ func (h *HTTPPostNode) postRow(group models.GroupID, row *models.Row) {
defer h.bp.Put(body)
err := json.NewEncoder(body).Encode(result)
if err != nil {
h.incrementErrorCount()
h.logger.Printf("E! failed to marshal row data json: %v", err)
return
}
req, err := h.endpoint.NewHTTPRequest(body)
if err != nil {
h.incrementErrorCount()
h.logger.Printf("E! failed to marshal row data json: %v", err)
return
}

resp, err := http.Post(h.url, "application/json", body)
req.Header.Set("Content-Type", "application/json")
for k, v := range h.c.Headers {
req.Header.Set(k, v)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
h.incrementErrorCount()
h.logger.Printf("E! failed to POST row data: %v", err)
return
}
resp.Body.Close()

}
16 changes: 9 additions & 7 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdata/kapacitor/clock"
"github.com/influxdata/kapacitor/models"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/storage/storagetest"
"github.com/influxdata/wlog"
)
Expand Down Expand Up @@ -1389,15 +1390,15 @@ batch
func TestBatch_AlertStateChangesOnly(t *testing.T) {
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad := alertservice.AlertData{}
ad := alert.Data{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&ad)
if err != nil {
t.Fatal(err)
}
atomic.AddInt32(&requestCount, 1)
if rc := atomic.LoadInt32(&requestCount); rc == 1 {
expAd := alertservice.AlertData{
expAd := alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is CRITICAL",
Time: time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
Expand All @@ -1408,7 +1409,7 @@ func TestBatch_AlertStateChangesOnly(t *testing.T) {
t.Error(msg)
}
} else {
expAd := alertservice.AlertData{
expAd := alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Expand Down Expand Up @@ -1454,27 +1455,27 @@ batch
func TestBatch_AlertStateChangesOnlyExpired(t *testing.T) {
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad := alertservice.AlertData{}
ad := alert.Data{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&ad)
if err != nil {
t.Fatal(err)
}
// We don't care about the data for this test
ad.Data = models.Result{}
var expAd alertservice.AlertData
var expAd alert.Data
atomic.AddInt32(&requestCount, 1)
rc := atomic.LoadInt32(&requestCount)
if rc < 3 {
expAd = alertservice.AlertData{
expAd = alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is CRITICAL",
Time: time.Date(1971, 1, 1, 0, 0, int(rc-1)*20, 0, time.UTC),
Duration: time.Duration(rc-1) * 20 * time.Second,
Level: alert.Critical,
}
} else {
expAd = alertservice.AlertData{
expAd = alert.Data{
ID: "cpu_usage_idle:cpu=cpu-total",
Message: "cpu_usage_idle:cpu=cpu-total is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Expand Down Expand Up @@ -2923,6 +2924,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService = httppost.NewService(nil, logService.NewLogger("[httppost] ", log.LstdFlags))
as := alertservice.NewService(logService.NewLogger("[alert] ", log.LstdFlags))
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
Expand Down
4 changes: 2 additions & 2 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"time"

"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/influxdb"
"github.com/influxdata/kapacitor/models"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/httpd"
k8s "github.com/influxdata/kapacitor/services/k8s/client"
"github.com/influxdata/kapacitor/udf"
Expand Down Expand Up @@ -117,7 +117,7 @@ func compareResultsIgnoreSeriesOrder(exp, got models.Result) (bool, string) {
return true, ""
}

func compareAlertData(exp, got alertservice.AlertData) (bool, string) {
func compareAlertData(exp, got alert.Data) (bool, string) {
// Pull out Result for comparison
expData := exp.Data
exp.Data = models.Result{}
Expand Down
Loading

0 comments on commit 40023cd

Please sign in to comment.