Skip to content

Commit

Permalink
Alerta support for timeout, tags and service template
Browse files Browse the repository at this point in the history
  • Loading branch information
Matteo Cerutti committed Sep 3, 2017
1 parent 9aea3c3 commit d73ed51
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
The breaking change is that the Combine and Flatten nodes previously, but erroneously, operated across batch boundaries; this has been fixed.
- [#1497](https://github.com/influxdata/kapacitor/pull/1497): Add support for Docker Swarm autoscaling services.
- [#1485](https://github.com/influxdata/kapacitor/issues/1485): Add bools field types to UDFs.
- [#1545](https://github.com/influxdata/kapacitor/pull/1545): Add support for timeout, tags and service template in the Alerta AlertNode

### Bugfixes

Expand Down
3 changes: 3 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
if len(a.Service) != 0 {
c.Service = a.Service
}
if a.Timeout != 0 {
c.Timeout = a.Timeout
}
h, err := et.tm.AlertaService.Handler(c, l)
if err != nil {
return nil, errors.Wrap(err, "failed to create Alerta handler")
Expand Down
7 changes: 5 additions & 2 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7616,6 +7616,7 @@ stream
.alerta()
.token('testtoken1234567')
.environment('production')
.timeout(1h)
.alerta()
.token('anothertesttoken')
.resource('resource: {{ index .Tags "host" }}')
Expand All @@ -7624,7 +7625,7 @@ stream
.origin('override')
.group('{{ .ID }}')
.value('{{ index .Fields "count" }}')
.services('serviceA', 'serviceB')
.services('serviceA', 'serviceB', '{{ .Name }}')
`
tmInit := func(tm *kapacitor.TaskMaster) {
c := alerta.NewConfig()
Expand All @@ -7648,6 +7649,7 @@ stream
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "Kapacitor",
Service: []string{"cpu"},
Timeout: 3600,
},
},
alertatest.Request{
Expand All @@ -7660,8 +7662,9 @@ stream
Environment: "serverA",
Text: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
Origin: "override",
Service: []string{"serviceA", "serviceB"},
Service: []string{"serviceA", "serviceB", "cpu"},
Value: "10",
Timeout: 86400,
},
},
}
Expand Down
5 changes: 5 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ type HipChatHandler struct {
// .event('Something went wrong')
// .environment('Development')
// .group('Dev. Servers')
// .timeout(5m)
//
// NOTE: Alerta cannot be configured globally because of its required properties.
// tick:property
Expand Down Expand Up @@ -996,6 +997,10 @@ type AlertaHandler struct {
// List of effected Services
// tick:ignore
Service []string `tick:"Services"`

// Alerta timeout.
// Default: 24h
Timeout time.Duration
}

// List of effected services.
Expand Down
12 changes: 10 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5918,6 +5918,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "0s",
},
Redacted: []string{
"token",
Expand All @@ -5934,6 +5935,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "0s",
},
Redacted: []string{
"token",
Expand All @@ -5943,8 +5945,9 @@ func TestServer_UpdateConfig(t *testing.T) {
{
updateAction: client.ConfigUpdateAction{
Set: map[string]interface{}{
"token": "token",
"origin": "kapacitor",
"token": "token",
"origin": "kapacitor",
"timeout": "3h",
},
},
expSection: client.ConfigSection{
Expand All @@ -5959,6 +5962,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "3h0m0s",
},
Redacted: []string{
"token",
Expand All @@ -5975,6 +5979,7 @@ func TestServer_UpdateConfig(t *testing.T) {
"token-prefix": "",
"url": "http://alerta.example.com",
"insecure-skip-verify": false,
"timeout": "3h0m0s",
},
Redacted: []string{
"token",
Expand Down Expand Up @@ -7352,6 +7357,7 @@ func TestServer_ListServiceTests(t *testing.T) {
"testServiceA",
"testServiceB",
},
"timeout": "24h0m0s",
},
},
{
Expand Down Expand Up @@ -8066,6 +8072,7 @@ func TestServer_AlertHandlers(t *testing.T) {
"origin": "kapacitor",
"group": "test",
"environment": "env",
"timeout": time.Duration(24 * time.Hour),
},
},
setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) {
Expand All @@ -8091,6 +8098,7 @@ func TestServer_AlertHandlers(t *testing.T) {
Text: "message",
Origin: "kapacitor",
Service: []string{"alert"},
Timeout: 86400,
},
}}
if !reflect.DeepEqual(exp, got) {
Expand Down
1 change: 1 addition & 0 deletions services/alerta/alertatest/alertatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ type PostData struct {
Origin string `json:"origin"`
Service []string `json:"service"`
Value string `json:"value"`
Timeout int64 `json:"timeout"`
}
3 changes: 3 additions & 0 deletions services/alerta/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package alerta
import (
"net/url"

"github.com/influxdata/influxdb/toml"
"github.com/pkg/errors"
)

Expand All @@ -22,6 +23,8 @@ type Config struct {
Environment string `toml:"environment" override:"environment"`
// The origin of the alert.
Origin string `toml:"origin" override:"origin"`
// Optional timeout, can be overridden per alert.
Timeout toml.Duration `toml:"timeout" override:"timeout"`
}

func NewConfig() Config {
Expand Down
54 changes: 49 additions & 5 deletions services/alerta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path"
"sync/atomic"
text "text/template"
"time"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/models"
Expand All @@ -22,6 +23,7 @@ const (
defaultResource = "{{ .Name }}"
defaultEvent = "{{ .ID }}"
defaultGroup = "{{ .Group }}"
defaultTimeout = time.Duration(24 * time.Hour)
defaultTokenPrefix = "Bearer"
)

Expand Down Expand Up @@ -55,6 +57,7 @@ type testOptions struct {
Message string `json:"message"`
Origin string `json:"origin"`
Service []string `json:"service"`
Timeout string `json:"timeout"`
}

func (s *Service) TestOptions() interface{} {
Expand All @@ -69,6 +72,7 @@ func (s *Service) TestOptions() interface{} {
Message: "test alerta message",
Origin: c.Origin,
Service: []string{"testServiceA", "testServiceB"},
Timeout: "24h0m0s",
}
}

Expand All @@ -78,6 +82,7 @@ func (s *Service) Test(options interface{}) error {
return fmt.Errorf("unexpected options type %T", options)
}
c := s.config()
timeout, _ := time.ParseDuration(o.Timeout)
return s.Alert(
c.Token,
c.TokenPrefix,
Expand All @@ -90,6 +95,8 @@ func (s *Service) Test(options interface{}) error {
o.Message,
o.Origin,
o.Service,
timeout,
map[string]string{},
models.Result{},
)
}
Expand Down Expand Up @@ -125,12 +132,12 @@ func (s *Service) Update(newConfig []interface{}) error {
return nil
}

func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, data models.Result) error {
func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, tags map[string]string, data models.Result) error {
if resource == "" || event == "" {
return errors.New("Resource and Event are required to send an alert")
}

req, err := s.preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin, service, data)
req, err := s.preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin, service, timeout, tags, data)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,7 +165,7 @@ func (s *Service) Alert(token, tokenPrefix, resource, event, environment, severi
return nil
}

func (s *Service) preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, data models.Result) (*http.Request, error) {
func (s *Service) preparePost(token, tokenPrefix, resource, event, environment, severity, group, value, message, origin string, service []string, timeout time.Duration, tags map[string]string, data models.Result) (*http.Request, error) {
c := s.config()

if !c.Enabled {
Expand Down Expand Up @@ -204,6 +211,13 @@ func (s *Service) preparePost(token, tokenPrefix, resource, event, environment,
if len(service) > 0 {
postData["service"] = service
}
postData["timeout"] = int64(timeout / time.Second)

tagList := make([]string, 0)
for k, v := range tags {
tagList = append(tagList, fmt.Sprintf("%s=%s", k, v))
}
postData["tags"] = tagList

var post bytes.Buffer
enc := json.NewEncoder(&post)
Expand Down Expand Up @@ -262,6 +276,10 @@ type HandlerConfig struct {

// List of effected Services
Service []string `mapstructure:"service"`

// Alerta timeout.
// Default: 24h
Timeout time.Duration `mapstructure:"timeout"`
}

type handler struct {
Expand All @@ -274,13 +292,15 @@ type handler struct {
environmentTmpl *text.Template
valueTmpl *text.Template
groupTmpl *text.Template
serviceTmpl []*text.Template
}

func (s *Service) DefaultHandlerConfig() HandlerConfig {
return HandlerConfig{
Resource: defaultResource,
Event: defaultEvent,
Group: defaultGroup,
Timeout: defaultTimeout,
}
}

Expand All @@ -306,6 +326,16 @@ func (s *Service) Handler(c HandlerConfig, l *log.Logger) (alert.Handler, error)
if err != nil {
return nil, err
}

var stmpl []*text.Template
for _, service := range c.Service {
tmpl, err := text.New("service").Parse(service)
if err != nil {
return nil, err
}
stmpl = append(stmpl, tmpl)
}

return &handler{
s: s,
c: c,
Expand All @@ -315,6 +345,7 @@ func (s *Service) Handler(c HandlerConfig, l *log.Logger) (alert.Handler, error)
environmentTmpl: etmpl,
groupTmpl: gtmpl,
valueTmpl: vtmpl,
serviceTmpl: stmpl,
}, nil
}

Expand Down Expand Up @@ -382,10 +413,21 @@ func (h *handler) Handle(event alert.Event) {
return
}
value := buf.String()
buf.Reset()

service := h.c.Service
if len(service) == 0 {
var service []string
if len(h.serviceTmpl) == 0 {
service = []string{td.Name}
} else {
for _, tmpl := range h.serviceTmpl {
err = tmpl.Execute(&buf, td)
if err != nil {
h.logger.Printf("E! failed to evaluate Alerta Service template: %v", err)
return
}
service = append(service, buf.String())
buf.Reset()
}
}

var severity string
Expand Down Expand Up @@ -415,6 +457,8 @@ func (h *handler) Handle(event alert.Event) {
event.State.Message,
h.c.Origin,
service,
h.c.Timeout,
event.Data.Tags,
event.Data.Result,
); err != nil {
h.logger.Printf("E! failed to send event to Alerta: %v", err)
Expand Down

0 comments on commit d73ed51

Please sign in to comment.