Skip to content

Commit

Permalink
add support for custom HTTP post bodies via a template system
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Sep 20, 2017
1 parent 3c9fed4 commit c5b1b3d
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 130 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

### Features

- [#1413](https://github.com/influxdata/kapacitor/issues/1413): Add subscriptions modes to InfluxDB subscriptions.
- [#1436](https://github.com/influxdata/kapacitor/issues/1436): Add linear fill support for QueryNode.
- [#1345](https://github.com/influxdata/kapacitor/issues/1345): Add MQTT Alert Handler
Expand All @@ -12,6 +13,7 @@
- [#1497](https://github.com/influxdata/kapacitor/pull/1497): Add support for Docker Swarm autoscaling services.
- [#1485](https://github.com/influxdata/kapacitor/issues/1485): Add bools field types to UDFs.
- [#1545](https://github.com/influxdata/kapacitor/pull/1545): Add support for timeout, tags and service template in the Alerta AlertNode
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.

### Bugfixes

Expand Down
29 changes: 29 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,35 @@ default-retention-policy = ""
# url = "http://example.com"
# headers = { Example = "your-key" }
# basic-auth = { username = "my-user", password = "my-pass" }
#
# # Provide an alert template for constructing a custom HTTP body.
# # Alert templates are only used with post alert handlers as they consume alert data.
# # The template uses https://golang.org/pkg/text/template/ and has access to the following fields:
# # * .ID - The unique ID for this alert
# # * .Message - The message of the alert
# # * .Details - The details of the alert
# # * .Time - The time the alert event occurred
# # * .Duration - The duration of the alert event.
# # * .Level - The level of the alert, i.e INFO, WARN, or CRITICAL.
# # * .Data - The data that triggered the alert.
# #
# # Specify the template inline.
# alert-template = "{{.Message}}:{{range .Data.Series}}{{.Tags}},{{range .Values}}{{.}}{{end}}{{end}}"
# # Specify an absolute path to a template file.
# alert-template-file = "/path/to/template/file"
#
# # Provide a row template for constructing a custom HTTP body.
# # Row templates are only used with httpPost pipeline nodes as they consume a row at a time.
# # The template uses https://golang.org/pkg/text/template/ and has access to the following fields:
# # * .Name - The measurement name of the data stream
# # * .Tags - A map of tags on the data.
# # * .Values - A list of values, each entry is a map containing a "time" key for the time of the point
# # and keys for all other fields on the point.
# #
# # Specify the template inline.
# row-template = "{{.Name}} host={{index .Tags \"host\"}}{{range .Values}} {{index . "time"}} {{index . "value"}}{{end}}"
# # Specify an absolute path to a template file.
# row-template-file = "/path/to/template/file"

[slack]
# Configure Slack.
Expand Down
48 changes: 40 additions & 8 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, d NodeDiagnost

// Should only ever be 0 or 1 from validation of n
if len(n.URLs) == 1 {
e := httppost.NewEndpoint(n.URLs[0], nil, httppost.BasicAuth{})
e := httppost.NewEndpoint(n.URLs[0], nil, httppost.BasicAuth{}, nil, nil)
hn.endpoint = e
}

Expand Down Expand Up @@ -109,23 +109,34 @@ func (g *httpPostGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, er
}

func (n *HTTPPostNode) postRow(row *models.Row) {
result := new(models.Result)
result.Series = []*models.Row{row}

body := n.bp.Get()
defer n.bp.Put(body)
err := json.NewEncoder(body).Encode(result)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return

var contentType string
if n.endpoint.RowTemplate() != nil {
mr := newMappedRow(row)
n.endpoint.RowTemplate().Execute(body, mr)
} else {
result := new(models.Result)
result.Series = []*models.Row{row}
err := json.NewEncoder(body).Encode(result)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
}
contentType = "application/json"
}

req, err := n.endpoint.NewHTTPRequest(body)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
}

req.Header.Set("Content-Type", "application/json")
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
for k, v := range n.c.Headers {
req.Header.Set(k, v)
}
Expand All @@ -136,3 +147,24 @@ func (n *HTTPPostNode) postRow(row *models.Row) {
}
resp.Body.Close()
}

type mappedRow struct {
Name string
Tags map[string]string
Values []map[string]interface{}
}

func newMappedRow(row *models.Row) *mappedRow {
values := make([]map[string]interface{}, len(row.Values))
for i, v := range row.Values {
values[i] = make(map[string]interface{}, len(row.Columns))
for c, col := range row.Columns {
values[i][col] = v[c]
}
}
return &mappedRow{
Name: row.Name,
Tags: row.Tags,
Values: values,
}
}
2 changes: 1 addition & 1 deletion integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2981,7 +2981,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService = httppost.NewService(nil, diagService.NewHTTPPostHandler())
tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler())
as := alertservice.NewService(diagService.NewAlertServiceHandler())
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
Expand Down
90 changes: 85 additions & 5 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2562,7 +2562,87 @@ stream
c := httppost.Config{}
c.URL = ts.URL
c.Endpoint = "test"
sl := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
sl, _ := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
tm.HTTPPostService = sl
}

testStreamerWithOutput(t, "TestStream_HttpPost", script, 13*time.Second, er, false, tmInit)

if rc := atomic.LoadInt32(&requestCount); rc != 6 {
t.Errorf("got %v exp %v", rc, 6)
}
}
func TestStream_HttpPostEndpoint_CustomBody(t *testing.T) {
headers := map[string]string{"my": "header"}
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for k, v := range headers {
nv := r.Header.Get(k)
if nv != v {
t.Fatalf("got '%s:%s', exp '%s:%s'", k, nv, k, v)
}
}
data, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
got := string(data)
atomic.AddInt32(&requestCount, 1)
rc := atomic.LoadInt32(&requestCount)

var exp string
switch rc {
case 1:
exp = "cpu host=serverA type=idle 1971-01-01 00:00:00 +0000 UTC 97.1"
case 2:
exp = "cpu host=serverA type=idle 1971-01-01 00:00:01 +0000 UTC 92.6"
case 3:
exp = "cpu host=serverA type=idle 1971-01-01 00:00:02 +0000 UTC 95.6"
case 4:
exp = "cpu host=serverA type=idle 1971-01-01 00:00:03 +0000 UTC 93.1"
case 5:
exp = "cpu host=serverA type=idle 1971-01-01 00:00:04 +0000 UTC 92.6"
case 6:
exp = "cpu host=serverA type=idle 1971-01-01 00:00:05 +0000 UTC 95.8"
}
if exp != got {
t.Errorf("unexpected alert data for request: %d\n%s\n%s\n", rc, exp, got)
}
}))
defer ts.Close()

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|httpPost()
.endpoint('test')
.header('my', 'header')
|httpOut('TestStream_HttpPost')
`

er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA", "type": "idle"},
Columns: []string{"time", "value"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
95.8,
}},
},
},
}

tmInit := func(tm *kapacitor.TaskMaster) {
c := httppost.Config{}
c.URL = ts.URL
c.Endpoint = "test"
c.RowTemplate = `{{.Name}} host={{index .Tags "host"}} type={{index .Tags "type"}}{{range .Values}} {{index . "time"}} {{index . "value"}}{{end}}`
sl, _ := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
tm.HTTPPostService = sl
}

Expand Down Expand Up @@ -7934,7 +8014,7 @@ stream
}

func TestStream_AlertHTTPPost(t *testing.T) {
ts := httpposttest.NewAlertServer(nil)
ts := httpposttest.NewAlertServer(nil, false)
defer ts.Close()

var script = `
Expand Down Expand Up @@ -7996,7 +8076,7 @@ stream

func TestStream_AlertHTTPPostEndpoint(t *testing.T) {
headers := map[string]string{"Authorization": "works"}
ts := httpposttest.NewAlertServer(headers)
ts := httpposttest.NewAlertServer(headers, false)
defer ts.Close()

var script = `
Expand All @@ -8023,7 +8103,7 @@ stream
c.URL = ts.URL
c.Endpoint = "test"
c.Headers = headers
sl := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
sl, _ := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
tm.HTTPPostService = sl
}
testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit)
Expand Down Expand Up @@ -10893,7 +10973,7 @@ func createTaskMaster() (*kapacitor.TaskMaster, error) {
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService = httppost.NewService(nil, diagService.NewHTTPPostHandler())
tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler())
as := alertservice.NewService(diagService.NewAlertServiceHandler())
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
Expand Down
Loading

0 comments on commit c5b1b3d

Please sign in to comment.