Skip to content

Commit

Permalink
add support for http status code as a field
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Sep 20, 2017
1 parent ffb8961 commit 37c8ea1
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [#1485](https://github.com/influxdata/kapacitor/issues/1485): Add bools field types to UDFs.
- [#1545](https://github.com/influxdata/kapacitor/pull/1545): Add support for timeout, tags and service template in the Alerta AlertNode
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost

### Bugfixes

Expand Down
72 changes: 61 additions & 11 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package kapacitor
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/pkg/errors"
)

type HTTPPostNode struct {
Expand Down Expand Up @@ -91,13 +95,35 @@ func (g *httpPostGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, error)

func (g *httpPostGroup) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error) {
row := batch.ToRow()
g.n.postRow(row)
code := g.n.doPost(row)
if g.n.c.CodeField != "" {
//Add code to all points
batch = batch.ShallowCopy()
points := make([]edge.BatchPointMessage, len(batch.Points()))
for i, bp := range batch.Points() {
fields := bp.Fields().Copy()
fields[g.n.c.CodeField] = int64(code)
points[i] = edge.NewBatchPointMessage(
fields,
bp.Tags(),
bp.Time(),
)
}
batch.SetPoints(points)
}
return batch, nil
}

func (g *httpPostGroup) Point(p edge.PointMessage) (edge.Message, error) {
row := p.ToRow()
g.n.postRow(row)
code := g.n.doPost(row)
if g.n.c.CodeField != "" {
//Add code to point
p = p.ShallowCopy()
fields := p.Fields().Copy()
fields[g.n.c.CodeField] = int64(code)
p.SetFields(fields)
}
return p, nil
}

Expand All @@ -108,30 +134,55 @@ func (g *httpPostGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, er
return d, nil
}

func (n *HTTPPostNode) postRow(row *models.Row) {
func (n *HTTPPostNode) doPost(row *models.Row) int {
resp, err := n.postRow(row)
if err != nil {
n.diag.Error("failed to POST data", err)
return 0
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
var err error
if n.c.CaptureResponseFlag {
var body []byte
body, err = ioutil.ReadAll(resp.Body)
if err == nil {
// Use the body content as the error
err = errors.New(string(body))
}
} else {
err = errors.New("unknown error, use .captureResponse() to capture the HTTP response")
}
n.diag.Error("POST returned non 2xx status code", err, keyvalue.KV("code", strconv.Itoa(resp.StatusCode)))
}
return resp.StatusCode
}

func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {

body := n.bp.Get()
defer n.bp.Put(body)

var contentType string
if n.endpoint.RowTemplate() != nil {
mr := newMappedRow(row)
n.endpoint.RowTemplate().Execute(body, mr)
err := n.endpoint.RowTemplate().Execute(body, mr)
if err != nil {
return nil, errors.Wrap(err, "failed to execute template")
}
} else {
result := new(models.Result)
result.Series = []*models.Row{row}
err := json.NewEncoder(body).Encode(result)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
return nil, errors.Wrap(err, "failed to marshal row data json")
}
contentType = "application/json"
}

req, err := n.endpoint.NewHTTPRequest(body)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
return nil, errors.Wrap(err, "failed to marshal row data json")
}

if contentType != "" {
Expand All @@ -142,10 +193,9 @@ func (n *HTTPPostNode) postRow(row *models.Row) {
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
n.diag.Error("failed to POST row data", err)
return
return nil, err
}
resp.Body.Close()
return resp, nil
}

type mappedRow struct {
Expand Down
103 changes: 103 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2653,6 +2653,109 @@ stream
}
}

func TestStream_HttpPostEndpoint_StatusCodes(t *testing.T) {
headers := map[string]string{"my": "header"}
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for k, v := range headers {
nv := r.Header.Get(k)
if nv != v {
t.Fatalf("got '%s:%s', exp '%s:%s'", k, nv, k, v)
}
}
atomic.AddInt32(&requestCount, 1)
rc := atomic.LoadInt32(&requestCount)

switch rc {
case 1:
w.WriteHeader(http.StatusOK)
case 2:
w.WriteHeader(http.StatusCreated)
case 3:
w.WriteHeader(http.StatusNotFound)
case 4:
w.WriteHeader(http.StatusForbidden)
case 5:
w.WriteHeader(http.StatusInternalServerError)
case 6:
w.WriteHeader(http.StatusBadGateway)
}
}))
defer ts.Close()

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|httpPost()
.endpoint('test')
.header('my', 'header')
.codeField('code')
|window()
.every(5s)
.period(5s)
|httpOut('TestStream_HttpPost')
`

er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "code", "type", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
200.0,
"idle",
97.1,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
201.0,
"idle",
92.6,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
404.0,
"idle",
95.6,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
403.0,
"idle",
93.1,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
500.0,
"idle",
92.6,
},
},
},
},
}

tmInit := func(tm *kapacitor.TaskMaster) {
c := httppost.Config{}
c.URL = ts.URL
c.Endpoint = "test"
sl, _ := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
tm.HTTPPostService = sl
}

testStreamerWithOutput(t, "TestStream_HttpPost", script, 13*time.Second, er, false, tmInit)

if rc := atomic.LoadInt32(&requestCount); rc != 6 {
t.Errorf("got %v exp %v", rc, 6)
}
}

func TestStream_HttpOutPassThrough(t *testing.T) {

var script = `
Expand Down
17 changes: 16 additions & 1 deletion pipeline/http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ type HTTPPostNode struct {
// tick:ignore
Endpoints []string `tick:"Endpoint"`

// Headers
// tick:ignore
Headers map[string]string `tick:"Header"`

// CodeField is the name of the field in which to place the HTTP status code.
// If the HTTP request fails at a layer below HTTP, (i.e. rejected TCP connection), then the status code is set to 0.
CodeField string

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`

// tick:ignore
URLs []string
}
Expand Down Expand Up @@ -104,3 +111,11 @@ func (p *HTTPPostNode) Header(k, v string) *HTTPPostNode {

return p
}

// CaptureResponse indicates that the HTTP response should be read and logged if
// the status code was not an 2xx code.
// tick:property
func (p *HTTPPostNode) CaptureResponse() *HTTPPostNode {
p.CaptureResponseFlag = true
return p
}

0 comments on commit 37c8ea1

Please sign in to comment.