Skip to content

Commit

Permalink
Merged pull request influxdata#1574 from influxdata/nc-post-code
Browse files Browse the repository at this point in the history
Add support for http status code as a field
  • Loading branch information
nathanielc committed Sep 21, 2017
2 parents ffb8961 + 932d11f commit b030980
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [#1485](https://github.com/influxdata/kapacitor/issues/1485): Add bools field types to UDFs.
- [#1545](https://github.com/influxdata/kapacitor/pull/1545): Add support for timeout, tags and service template in the Alerta AlertNode
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost

### Bugfixes

Expand Down
7 changes: 4 additions & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a

for _, p := range n.HTTPPostHandlers {
c := httppost.HandlerConfig{
URL: p.URL,
Endpoint: p.Endpoint,
Headers: p.Headers,
URL: p.URL,
Endpoint: p.Endpoint,
Headers: p.Headers,
CaptureResponse: p.CaptureResponseFlag,
}
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
Expand Down
71 changes: 60 additions & 11 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package kapacitor
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/pkg/errors"
)

type HTTPPostNode struct {
Expand Down Expand Up @@ -91,13 +95,35 @@ func (g *httpPostGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, error)

func (g *httpPostGroup) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error) {
row := batch.ToRow()
g.n.postRow(row)
code := g.n.doPost(row)
if g.n.c.CodeField != "" {
//Add code to all points
batch = batch.ShallowCopy()
points := make([]edge.BatchPointMessage, len(batch.Points()))
for i, bp := range batch.Points() {
fields := bp.Fields().Copy()
fields[g.n.c.CodeField] = int64(code)
points[i] = edge.NewBatchPointMessage(
fields,
bp.Tags(),
bp.Time(),
)
}
batch.SetPoints(points)
}
return batch, nil
}

func (g *httpPostGroup) Point(p edge.PointMessage) (edge.Message, error) {
row := p.ToRow()
g.n.postRow(row)
code := g.n.doPost(row)
if g.n.c.CodeField != "" {
//Add code to point
p = p.ShallowCopy()
fields := p.Fields().Copy()
fields[g.n.c.CodeField] = int64(code)
p.SetFields(fields)
}
return p, nil
}

Expand All @@ -108,30 +134,54 @@ func (g *httpPostGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, er
return d, nil
}

func (n *HTTPPostNode) postRow(row *models.Row) {
func (n *HTTPPostNode) doPost(row *models.Row) int {
resp, err := n.postRow(row)
if err != nil {
n.diag.Error("failed to POST data", err)
return 0
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
var err error
if n.c.CaptureResponseFlag {
var body []byte
body, err = ioutil.ReadAll(resp.Body)
if err == nil {
// Use the body content as the error
err = errors.New(string(body))
}
} else {
err = errors.New("unknown error, use .captureResponse() to capture the HTTP response")
}
n.diag.Error("POST returned non 2xx status code", err, keyvalue.KV("code", strconv.Itoa(resp.StatusCode)))
}
return resp.StatusCode
}

func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
body := n.bp.Get()
defer n.bp.Put(body)

var contentType string
if n.endpoint.RowTemplate() != nil {
mr := newMappedRow(row)
n.endpoint.RowTemplate().Execute(body, mr)
err := n.endpoint.RowTemplate().Execute(body, mr)
if err != nil {
return nil, errors.Wrap(err, "failed to execute template")
}
} else {
result := new(models.Result)
result.Series = []*models.Row{row}
err := json.NewEncoder(body).Encode(result)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
return nil, errors.Wrap(err, "failed to marshal row data json")
}
contentType = "application/json"
}

req, err := n.endpoint.NewHTTPRequest(body)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
return nil, errors.Wrap(err, "failed to marshal row data json")
}

if contentType != "" {
Expand All @@ -142,10 +192,9 @@ func (n *HTTPPostNode) postRow(row *models.Row) {
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
n.diag.Error("failed to POST row data", err)
return
return nil, err
}
resp.Body.Close()
return resp, nil
}

type mappedRow struct {
Expand Down
103 changes: 103 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2653,6 +2653,109 @@ stream
}
}

func TestStream_HttpPostEndpoint_StatusCodes(t *testing.T) {
headers := map[string]string{"my": "header"}
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for k, v := range headers {
nv := r.Header.Get(k)
if nv != v {
t.Fatalf("got '%s:%s', exp '%s:%s'", k, nv, k, v)
}
}
atomic.AddInt32(&requestCount, 1)
rc := atomic.LoadInt32(&requestCount)

switch rc {
case 1:
w.WriteHeader(http.StatusOK)
case 2:
w.WriteHeader(http.StatusCreated)
case 3:
w.WriteHeader(http.StatusNotFound)
case 4:
w.WriteHeader(http.StatusForbidden)
case 5:
w.WriteHeader(http.StatusInternalServerError)
case 6:
w.WriteHeader(http.StatusBadGateway)
}
}))
defer ts.Close()

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|httpPost()
.endpoint('test')
.header('my', 'header')
.codeField('code')
|window()
.every(5s)
.period(5s)
|httpOut('TestStream_HttpPost')
`

er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "code", "type", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
200.0,
"idle",
97.1,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
201.0,
"idle",
92.6,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
404.0,
"idle",
95.6,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
403.0,
"idle",
93.1,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
500.0,
"idle",
92.6,
},
},
},
},
}

tmInit := func(tm *kapacitor.TaskMaster) {
c := httppost.Config{}
c.URL = ts.URL
c.Endpoint = "test"
sl, _ := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
tm.HTTPPostService = sl
}

testStreamerWithOutput(t, "TestStream_HttpPost", script, 13*time.Second, er, false, tmInit)

if rc := atomic.LoadInt32(&requestCount); rc != 6 {
t.Errorf("got %v exp %v", rc, 6)
}
}

func TestStream_HttpOutPassThrough(t *testing.T) {

var script = `
Expand Down
37 changes: 24 additions & 13 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,24 @@ func (a *AlertNode) Post(urls ...string) *AlertHTTPPostHandler {
return post
}

// tick:embedded:AlertNode.Post
type AlertHTTPPostHandler struct {
*AlertNode

// The POST URL.
// tick:ignore
URL string

// Name of the endpoint to be used, as is defined in the configuration file
Endpoint string

// tick:ignore
Headers map[string]string `tick:"Header"`

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`
}

// Set a header key and value on the post request.
// Setting the Authenticate header is not allowed from within TICKscript,
// please use the configuration file to specify sensitive headers.
Expand All @@ -527,19 +545,12 @@ func (a *AlertHTTPPostHandler) Header(k, v string) *AlertHTTPPostHandler {
return a
}

// tick:embedded:AlertNode.Post
type AlertHTTPPostHandler struct {
*AlertNode

// The POST URL.
// tick:ignore
URL string

// Name of the endpoint to be used, as is defined in the configuration file
Endpoint string

// tick:ignore
Headers map[string]string `tick:"Header"`
// CaptureResponse indicates that the HTTP response should be read and logged if
// the status code was not an 2xx code.
// tick:property
func (a *AlertHTTPPostHandler) CaptureResponse() *AlertHTTPPostHandler {
a.CaptureResponseFlag = true
return a
}

func (a *AlertHTTPPostHandler) validate() error {
Expand Down
17 changes: 16 additions & 1 deletion pipeline/http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ type HTTPPostNode struct {
// tick:ignore
Endpoints []string `tick:"Endpoint"`

// Headers
// tick:ignore
Headers map[string]string `tick:"Header"`

// CodeField is the name of the field in which to place the HTTP status code.
// If the HTTP request fails at a layer below HTTP, (i.e. rejected TCP connection), then the status code is set to 0.
CodeField string

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`

// tick:ignore
URLs []string
}
Expand Down Expand Up @@ -104,3 +111,11 @@ func (p *HTTPPostNode) Header(k, v string) *HTTPPostNode {

return p
}

// CaptureResponse indicates that the HTTP response should be read and logged if
// the status code was not an 2xx code.
// tick:property
func (p *HTTPPostNode) CaptureResponse() *HTTPPostNode {
p.CaptureResponseFlag = true
return p
}
9 changes: 7 additions & 2 deletions services/diagnostic/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,13 @@ type HTTPPostHandler struct {
l *klog.Logger
}

func (h *HTTPPostHandler) Error(msg string, err error) {
h.l.Error(msg, klog.Error(err))
func (h *HTTPPostHandler) Error(msg string, err error, ctx ...keyvalue.T) {
fields := make([]klog.Field, len(ctx)+1)
fields[0] = klog.Error(err)
for i, kv := range ctx {
fields[i+1] = klog.String(kv.Key, kv.Value)
}
h.l.Error(msg, fields...)
}

func (h *HTTPPostHandler) WithContext(ctx ...keyvalue.T) httppost.Diagnostic {
Expand Down
Loading

0 comments on commit b030980

Please sign in to comment.