Skip to content

Commit

Permalink
adding source for sensu alert as parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
Henry Megarry committed Apr 17, 2017
1 parent 397d612 commit ce759fd
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ kapacitor define-handler system aggregate_by_1m.yaml
See the updated API docs.
- [#1286](https://github.com/influxdata/kapacitor/issues/1286): Default HipChat URL should be blank
- [#507](https://github.com/influxdata/kapacitor/issues/507): Add API endpoint for performing Kapacitor database backups.
- [#1132](https://github.com/influxdata/kapacitor/issues/1132): Adding source for sensu alert as parameter

### Bugfixes

Expand Down
11 changes: 9 additions & 2 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
"github.com/influxdata/kapacitor/services/sensu"
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/snmptrap"
Expand Down Expand Up @@ -214,8 +215,14 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, h)
}

for range n.SensuHandlers {
h := et.tm.SensuService.Handler(l)
for _, s := range n.SensuHandlers {
c := sensu.HandlerConfig{
Source: s.Source,
}
h, err := et.tm.SensuService.Handler(c, l)
if err != nil {
return nil, errors.Wrap(err, "failed to create sensu alert handler")
}
an.handlers = append(an.handlers, h)
}

Expand Down
4 changes: 4 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,10 @@ func (a *AlertNode) Sensu() *SensuHandler {
// tick:embedded:AlertNode.Sensu
type SensuHandler struct {
*AlertNode

// Sensu source in which to post messages.
// If empty uses the Source from the configuration.
Source string
}

// Send the alert to Pushover.
Expand Down
11 changes: 8 additions & 3 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6000,7 +6000,7 @@ func TestServer_UpdateConfig(t *testing.T) {
Set: map[string]interface{}{
"addr": "sensu.local:3000",
"enabled": true,
"source": "",
"source": "Kapacitor",
},
},
expSection: client.ConfigSection{
Expand All @@ -6010,7 +6010,7 @@ func TestServer_UpdateConfig(t *testing.T) {
Options: map[string]interface{}{
"addr": "sensu.local:3000",
"enabled": true,
"source": "",
"source": "Kapacitor",
},
Redacted: nil,
}},
Expand All @@ -6020,7 +6020,7 @@ func TestServer_UpdateConfig(t *testing.T) {
Options: map[string]interface{}{
"addr": "sensu.local:3000",
"enabled": true,
"source": "",
"source": "Kapacitor",
},
Redacted: nil,
},
Expand Down Expand Up @@ -6616,6 +6616,7 @@ func TestServer_ListServiceTests(t *testing.T) {
Options: client.ServiceTestOptions{
"name": "testName",
"output": "testOutput",
"source": "Kapacitor",
"level": "CRITICAL",
},
},
Expand Down Expand Up @@ -6717,6 +6718,7 @@ func TestServer_ListServiceTests_WithPattern(t *testing.T) {
Options: client.ServiceTestOptions{
"name": "testName",
"output": "testOutput",
"source": "Kapacitor",
"level": "CRITICAL",
},
},
Expand Down Expand Up @@ -7391,6 +7393,9 @@ func TestServer_AlertHandlers(t *testing.T) {
{
handler: client.TopicHandler{
Kind: "sensu",
Options: map[string]interface{}{
"source": "Kapacitor",
},
},
setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) {
ts, err := sensutest.NewServer()
Expand Down
13 changes: 11 additions & 2 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
"github.com/influxdata/kapacitor/services/sensu"
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/snmptrap"
Expand Down Expand Up @@ -73,7 +74,7 @@ type Service struct {
Handler(pushover.HandlerConfig, *log.Logger) alert.Handler
}
SensuService interface {
Handler(*log.Logger) alert.Handler
Handler(sensu.HandlerConfig, *log.Logger) (alert.Handler, error)
}
SlackService interface {
Handler(slack.HandlerConfig, *log.Logger) alert.Handler
Expand Down Expand Up @@ -769,7 +770,15 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) {
}
h = NewPublishHandler(c, s.logger)
case "sensu":
h = s.SensuService.Handler(s.logger)
c := sensu.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return handler{}, err
}
h, err = s.SensuService.Handler(c, s.logger)
if err != nil {
return handler{}, err
}
h = newExternalHandler(h)
case "slack":
c := slack.HandlerConfig{}
Expand Down
49 changes: 41 additions & 8 deletions services/sensu/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sensu

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand All @@ -9,6 +10,7 @@ import (
"net"
"regexp"
"sync/atomic"
text "text/template"

"github.com/influxdata/kapacitor/alert"
)
Expand Down Expand Up @@ -54,13 +56,15 @@ func (s *Service) Update(newConfig []interface{}) error {

type testOptions struct {
Name string `json:"name"`
Source string `json:"source"`
Output string `json:"output"`
Level alert.Level `json:"level"`
}

func (s *Service) TestOptions() interface{} {
return &testOptions{
Name: "testName",
Source: "Kapacitor",
Output: "testOutput",
Level: alert.Critical,
}
Expand All @@ -73,17 +77,18 @@ func (s *Service) Test(options interface{}) error {
}
return s.Alert(
o.Name,
o.Source,
o.Output,
o.Level,
)
}

func (s *Service) Alert(name, output string, level alert.Level) error {
func (s *Service) Alert(name, source, output string, level alert.Level) error {
if !validNamePattern.MatchString(name) {
return fmt.Errorf("invalid name %q for sensu alert. Must match %v", name, validNamePattern)
}

addr, postData, err := s.prepareData(name, output, level)
addr, postData, err := s.prepareData(name, source, output, level)
if err != nil {
return err
}
Expand All @@ -109,7 +114,7 @@ func (s *Service) Alert(name, output string, level alert.Level) error {
return nil
}

func (s *Service) prepareData(name, output string, level alert.Level) (*net.TCPAddr, map[string]interface{}, error) {
func (s *Service) prepareData(name, source, output string, level alert.Level) (*net.TCPAddr, map[string]interface{}, error) {

c := s.config()

Expand All @@ -133,7 +138,10 @@ func (s *Service) prepareData(name, output string, level alert.Level) (*net.TCPA

postData := make(map[string]interface{})
postData["name"] = name
postData["source"] = c.Source
if source == "" {
source = c.Source
}
postData["source"] = source
postData["output"] = output
postData["status"] = status

Expand All @@ -145,21 +153,46 @@ func (s *Service) prepareData(name, output string, level alert.Level) (*net.TCPA
return addr, postData, nil
}

type HandlerConfig struct {
// Sensu source for which to post messages.
// If empty uses the source from the configuration.
Source string `mapstructure:"source"`
}

type handler struct {
s *Service
c HandlerConfig
logger *log.Logger

sourceTmpl *text.Template
}

func (s *Service) Handler(l *log.Logger) alert.Handler {
return &handler{
s: s,
logger: l,
func (s *Service) Handler(c HandlerConfig, l *log.Logger) (alert.Handler, error) {
srcTmpl, err := text.New("source").Parse(c.Source)
if err != nil {
return nil, err
}
return &handler{
s: s,
c: c,
logger: l,
sourceTmpl: srcTmpl,
}, nil
}

func (h *handler) Handle(event alert.Event) {
td := event.TemplateData()
var buf bytes.Buffer
err := h.sourceTmpl.Execute(&buf, td)
if err != nil {
h.logger.Printf("E! failed to evaluate Sensu source template %s: %v", h.c.Source, err)
return
}
sourceStr := buf.String()

if err := h.s.Alert(
event.State.ID,
sourceStr,
event.State.Message,
event.State.Level,
); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
"github.com/influxdata/kapacitor/services/sensu"
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/snmptrap"
Expand Down Expand Up @@ -124,7 +125,7 @@ type TaskMaster struct {
Handler(alerta.HandlerConfig, *log.Logger) (alert.Handler, error)
}
SensuService interface {
Handler(*log.Logger) alert.Handler
Handler(sensu.HandlerConfig, *log.Logger) (alert.Handler, error)
}
TalkService interface {
Handler(*log.Logger) alert.Handler
Expand Down

0 comments on commit ce759fd

Please sign in to comment.