Skip to content

Commit

Permalink
Merged pull request influxdata#1462 from sputnik13/post_timeout
Browse files Browse the repository at this point in the history
adding timeout to alert.post and http_post
  • Loading branch information
nathanielc committed Oct 27, 2017
2 parents b1503ae + 598fde6 commit d97f29f
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features
- [#1408](https://github.com/influxdata/kapacitor/issues/1408): Add Previous state

- [#1461](https://github.com/influxdata/kapacitor/issues/1461): alert.post and https_post timeouts needed.
- [#1413](https://github.com/influxdata/kapacitor/issues/1413): Add subscriptions modes to InfluxDB subscriptions.
- [#1436](https://github.com/influxdata/kapacitor/issues/1436): Add linear fill support for QueryNode.
- [#1345](https://github.com/influxdata/kapacitor/issues/1345): Add MQTT Alert Handler
Expand Down
1 change: 1 addition & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
Endpoint: p.Endpoint,
Headers: p.Headers,
CaptureResponse: p.CaptureResponseFlag,
Timeout: p.Timeout,
}
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
Expand Down
25 changes: 18 additions & 7 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/influxdata/kapacitor/bufpool"
"bytes"
"context"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
Expand All @@ -22,16 +24,17 @@ type HTTPPostNode struct {
c *pipeline.HTTPPostNode
endpoint *httppost.Endpoint
mu sync.RWMutex
bp *bufpool.Pool
timeout time.Duration
hc *http.Client
}

// Create a new HTTPPostNode which submits received items via POST to an HTTP endpoint
func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, d NodeDiagnostic) (*HTTPPostNode, error) {

hn := &HTTPPostNode{
node: node{Node: n, et: et, diag: d},
c: n,
bp: bufpool.New(),
node: node{Node: n, et: et, diag: d},
c: n,
timeout: n.Timeout,
}

// Should only ever be 0 or 1 from validation of n
Expand Down Expand Up @@ -159,8 +162,7 @@ func (n *HTTPPostNode) doPost(row *models.Row) int {
}

func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
body := n.bp.Get()
defer n.bp.Put(body)
body := new(bytes.Buffer)

var contentType string
if n.endpoint.RowTemplate() != nil {
Expand All @@ -184,12 +186,21 @@ func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
return nil, errors.Wrap(err, "failed to marshal row data json")
}

// Set content type and other headers
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
for k, v := range n.c.Headers {
req.Header.Set(k, v)
}

// Set timeout
if n.timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), n.timeout)
defer cancel()
req = req.WithContext(ctx)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
Expand Down
171 changes: 171 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2980,6 +2980,177 @@ batch
testBatcherWithOutput(t, "TestBatch_HttpPost", script, 30*time.Second, er, false)
}

func TestBatch_HttpPost_Timeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
result := models.Result{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&result)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
}))
defer ts.Close()

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|httpPost('` + ts.URL + `').timeout(1ms)
|httpOut('TestBatch_HttpPost_Timeout')
`

er := models.Result{
Series: models.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu-total"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
91.06416290101595,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
85.9694442394385,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
90.62985736134186,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
86.45443196005628,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
88.97243107764031,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu0"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
85.08910891088406,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
78.00000000002001,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
84.23607066586464,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
80.85858585861834,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
80.61224489791657,
},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
96.49999999996908,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
93.46464646468584,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
95.00950095007724,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
92.99999999998636,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
90.99999999998545,
},
},
},
},
}

c := make(chan bool, 1)
go func() {
testBatcherWithOutput(t, "TestBatch_HttpPost_Timeout", script, 30*time.Second, er, false)
c <- true
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Test timeout reached, httpPost().timeout() may not be functioning")
}
}

func TestBatch_AlertPost_Timeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ad := alert.Data{}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&ad)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}))
defer ts.Close()
var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|alert()
.crit(lambda:"mean" > 90)
.stateChangesOnly()
.levelField('level')
.details('')
.post('` + ts.URL + `').timeout(1ms)
`

c := make(chan bool, 1)
go func() {
clock, et, replayErr, tm := testBatcher(t, "TestBatch_AlertPostTimeout", script)
defer tm.Close()

err := fastForwardTask(clock, et, replayErr, tm, 40*time.Second)
if err != nil {
t.Error(err)
}
c <- true
}()
select {
case <-c:
case <-time.After(time.Second):
t.Fatal("Test timeout reached, alert().post().timeout() may not be functioning")
}
}

// Helper test function for batcher
func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) {
if testing.Verbose() {
Expand Down
4 changes: 4 additions & 0 deletions integrations/data/TestBatch_AlertPostTimeout.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.38281469458698},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86.51447101892941},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91.71877558217454},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87.10524436107617},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":90.3900735196668},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.8919959776013},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86.54244306420236},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91.01699558842134},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.66378399063848},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89.90919811320221},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":91.06416290101595},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":95.9694442394385},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":70.62985736134186},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":71.06416290101595},"time":"2015-10-30T17:14:42Z"},{"fields":{"mean":85.9694442394385},"time":"2015-10-30T17:14:44Z"},{"fields":{"mean":70.62985736134186},"time":"2015-10-30T17:14:46Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:48Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:50Z"}]}
9 changes: 9 additions & 0 deletions integrations/data/TestBatch_HttpPost_Timeout.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.38281469458698},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86.51447101892941},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91.71877558217454},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87.10524436107617},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":90.3900735196668},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":83.56930693069836},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":79.12871287128638},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":88.99559823928229},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":85.50000000000182},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":86.02860286029956},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":93.49999999999409},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":91.44444444443974},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":93.44897959187637},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":95.99999999995998},"time":"2015-10-30T17:14:18Z"},{"fields":{"mean":97.00970097012197},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":90.8919959776013},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86.54244306420236},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91.01699558842134},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.66378399063848},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89.90919811320221},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":81.72501716191164},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":81.03810381037587},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":85.93434343435388},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85.36734693878043},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":83.01320528210614},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":95.98484848485191},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":92.098039215696},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":92.99999999998363},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":86.54015887023496},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":95.48979591840603},"time":"2015-10-30T17:14:30Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{"mean":91.06416290101595},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":85.9694442394385},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":90.62985736134186},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":86.45443196005628},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":88.97243107764031},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu0"},"points":[{"fields":{"mean":85.08910891088406},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":78.00000000002001},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":84.23607066586464},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":80.85858585861834},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":80.61224489791657},"time":"2015-10-30T17:14:40Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu1"},"points":[{"fields":{"mean":96.49999999996908},"time":"2015-10-30T17:14:32Z"},{"fields":{"mean":93.46464646468584},"time":"2015-10-30T17:14:34Z"},{"fields":{"mean":95.00950095007724},"time":"2015-10-30T17:14:36Z"},{"fields":{"mean":92.99999999998636},"time":"2015-10-30T17:14:38Z"},{"fields":{"mean":90.99999999998545},"time":"2015-10-30T17:14:40Z"}]}
3 changes: 3 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ type AlertHTTPPostHandler struct {

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`

// Timeout for HTTP Post
Timeout time.Duration
}

// Set a header key and value on the post request.
Expand Down
4 changes: 4 additions & 0 deletions pipeline/http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"strings"
"time"
)

// An HTTPPostNode will take the incoming data stream and POST it to an HTTP endpoint.
Expand Down Expand Up @@ -47,6 +48,9 @@ type HTTPPostNode struct {

// tick:ignore
URLs []string

// Timeout for HTTP Post
Timeout time.Duration
}

func newHTTPPostNode(wants EdgeType, urls ...string) *HTTPPostNode {
Expand Down
1 change: 1 addition & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8042,6 +8042,7 @@ func TestServer_ListServiceTests(t *testing.T) {
"endpoint": "example",
"url": "http://localhost:3000/",
"headers": map[string]interface{}{"Auth": "secret"},
"timeout": float64(0),
},
},
{
Expand Down
25 changes: 19 additions & 6 deletions services/httppost/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"sync"
"text/template"

"time"

"context"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -185,6 +187,7 @@ type testOptions struct {
Endpoint string `json:"endpoint"`
URL string `json:"url"`
Headers map[string]string `json:"headers"`
Timeout time.Duration `json:"timeout"`
}

func (s *Service) TestOptions() interface{} {
Expand Down Expand Up @@ -234,18 +237,22 @@ type HandlerConfig struct {
Endpoint string `mapstructure:"endpoint"`
Headers map[string]string `mapstructure:"headers"`
CaptureResponse bool `mapstructure:"capture-response"`
Timeout time.Duration `mapstructure:"timeout"`
}

type handler struct {
s *Service
bp *bufpool.Pool
s *Service

endpoint *Endpoint
headers map[string]string

captureResponse bool

diag Diagnostic

timeout time.Duration

hc *http.Client
}

func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
Expand All @@ -255,11 +262,11 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
}
return &handler{
s: s,
bp: bufpool.New(),
endpoint: e,
diag: s.diag.WithContext(ctx...),
headers: c.Headers,
captureResponse: c.CaptureResponse,
timeout: c.Timeout,
}
}

Expand All @@ -280,8 +287,7 @@ func (h *handler) Handle(event alert.Event) {
var err error

// Construct the body of the HTTP request
body := h.bp.Get()
defer h.bp.Put(body)
body := new(bytes.Buffer)
ad := event.AlertData()

var contentType string
Expand Down Expand Up @@ -310,6 +316,13 @@ func (h *handler) Handle(event alert.Event) {
req.Header.Set("Content-Type", contentType)
}

// Set timeout
if h.timeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), h.timeout)
defer cancel()
req = req.WithContext(ctx)
}

// Execute the request
resp, err := http.DefaultClient.Do(req)
if err != nil {
Expand Down

0 comments on commit d97f29f

Please sign in to comment.