Skip to content

Commit

Permalink
Added alert.io handler
Browse files Browse the repository at this point in the history
Added tests for alerta
Fixed formatting issues
Added additional test, with override
Hardcoded mappings between Kapacitor and Alerta.io severities and status
  • Loading branch information
md14454 committed Jan 12, 2016
1 parent 2e2a4eb commit 6ae3ba1
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 0 deletions.
51 changes: 51 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err
n.IsStateChangesOnly = true
}

for _, alerta := range n.AlertaHandlers {
alerta := alerta
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleAlerta(alerta, ad) })
}

for _, og := range n.OpsGenieHandlers {
og := og
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleOpsGenie(og, ad) })
Expand Down Expand Up @@ -594,6 +599,52 @@ func (a *AlertNode) handleHipChat(hipchat *pipeline.HipChatHandler, ad *AlertDat
}
}

func (a *AlertNode) handleAlerta(alerta *pipeline.AlertaHandler, ad *AlertData) {
if a.et.tm.AlertaService == nil {
a.logger.Println("E! failed to send Alerta message. Alerta is not enabled")
return
}

var severity string
var status string

switch ad.Level {
case OKAlert:
severity = "ok"
status = "closed"
case InfoAlert:
severity = "informational"
status = "open"
case WarnAlert:
severity = "warning"
status = "open"
case CritAlert:
severity = "critical"
status = "open"
default:
severity = "unknown"
status = "unknown"
}

err := a.et.tm.AlertaService.Alert(
alerta.Token,
alerta.Resource,
alerta.Event,
alerta.Environment,
severity,
status,
alerta.Group,
alerta.Value,
ad.Message,
alerta.Origin,
ad.Data,
)
if err != nil {
a.logger.Println("E! failed to send alert data to Alerta:", err)
return
}
}

func (a *AlertNode) handleOpsGenie(og *pipeline.OpsGenieHandler, ad *AlertData) {
if a.et.tm.OpsGenieService == nil {
a.logger.Println("E! failed to send OpsGenie alert. OpsGenie is not enabled")
Expand Down
3 changes: 3 additions & 0 deletions cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/influxdb"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Config struct {
PagerDuty pagerduty.Config `toml:"pagerduty"`
Slack slack.Config `toml:"slack"`
HipChat hipchat.Config `toml:"hipchat"`
Alerta alerta.Config `toml:"alerta"`
Reporting reporting.Config `toml:"reporting"`
Stats stats.Config `toml:"stats"`

Expand All @@ -74,6 +76,7 @@ func NewConfig() *Config {
c.PagerDuty = pagerduty.NewConfig()
c.Slack = slack.NewConfig()
c.HipChat = hipchat.NewConfig()
c.Alerta = alerta.NewConfig()
c.Reporting = reporting.NewConfig()
c.Stats = stats.NewConfig()

Expand Down
12 changes: 12 additions & 0 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/influxdb"
Expand Down Expand Up @@ -132,6 +133,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
s.appendVictorOpsService(c.VictorOps)
s.appendPagerDutyService(c.PagerDuty)
s.appendHipChatService(c.HipChat)
s.appendAlertaService(c.Alerta)
s.appendSlackService(c.Slack)

// Append InfluxDB services
Expand Down Expand Up @@ -265,6 +267,16 @@ func (s *Server) appendHipChatService(c hipchat.Config) {
}
}

func (s *Server) appendAlertaService(c alerta.Config) {
if c.Enabled {
l := s.LogService.NewLogger("[alerta] ", log.LstdFlags)
srv := alerta.NewService(c, l)
s.TaskMaster.AlertaService = srv

s.Services = append(s.Services, srv)
}
}

func (s *Server) appendCollectdService(c collectd.Config) {
if !c.Enabled {
return
Expand Down
12 changes: 12 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ data_dir = "/var/lib/kapacitor"
# without explicitly marking them in the TICKscript.
global = false

[alerta]
# Configure Alerta.
enabled = false
# The Alerta URL.
url = ""
# Default authentication token.
token = ""
# Default environment.
environment = ""
# Default origin.
origin = "kapacitor"

[reporting]
# Send anonymous usage statistics
# every 12 hours to Enterprise.
Expand Down
96 changes: 96 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/clock"
"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/opsgenie"
Expand Down Expand Up @@ -1362,6 +1363,101 @@ stream
}
}

func TestStream_AlertAlerta(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
type postData struct {
Resource string `json:"resource"`
Event string `json:"event"`
Environment string `json:"environment"`
Text string `json:"text"`
Origin string `json:"origin"`
}
pd := postData{}
dec := json.NewDecoder(r.Body)
dec.Decode(&pd)

if requestCount == 1 {
if exp := "/alert?api-key=testtoken1234567"; r.URL.String() != exp {
t.Errorf("unexpected url got %s exp %s", r.URL.String(), exp)
}
if exp := "production"; pd.Environment != exp {
t.Errorf("unexpected environment got %s exp %s", pd.Environment, exp)
}
if exp := "Kapacitor"; pd.Origin != exp {
t.Errorf("unexpected origin got %s exp %s", pd.Origin, exp)
}
} else {
if exp := "/alert?api-key=anothertesttoken"; r.URL.String() != exp {
t.Errorf("unexpected url got %s exp %s", r.URL.String(), exp)
}
if exp := "development"; pd.Environment != exp {
t.Errorf("unexpected environment got %s exp %s", pd.Environment, exp)
}
if exp := "override"; pd.Origin != exp {
t.Errorf("unexpected origin got %s exp %s", pd.Origin, exp)
}
}
if exp := "serverA"; pd.Resource != exp {
t.Errorf("unexpected resource got %s exp %s", pd.Resource, exp)
}
if exp := "CPU Idle"; pd.Event != exp {
t.Errorf("unexpected event got %s exp %s", pd.Event, exp)
}
if exp := "kapacitor/cpu/serverA is CRITICAL"; pd.Text != exp {
t.Errorf("unexpected text got %s exp %s", pd.Text, exp)
}
}))
defer ts.Close()

var script = `
stream
.from().measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
.window()
.period(10s)
.every(10s)
.mapReduce(influxql.count('idle'))
.alert()
.id('{{ index .Tags "host" }}')
.message('kapacitor/{{ .Name }}/{{ index .Tags "host" }} is {{ .Level }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.alerta()
.token('testtoken1234567')
.resource('serverA')
.event('CPU Idle')
.environment('production')
.alerta()
.token('anothertesttoken')
.resource('serverA')
.event('CPU Idle')
.environment('development')
.origin('override')
`

clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script)
defer tm.Close()

c := alerta.NewConfig()
c.URL = ts.URL
c.Origin = "Kapacitor"
sl := alerta.NewService(c, logService.NewLogger("[test_alerta] ", log.LstdFlags))
tm.AlertaService = sl

err := fastForwardTask(clock, et, replayErr, tm, 13*time.Second)
if err != nil {
t.Error(err)
}

if requestCount != 2 {
t.Errorf("unexpected requestCount got %d exp 2", requestCount)
}
}

func TestStream_AlertOpsGenie(t *testing.T) {
requestCount := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
80 changes: 80 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const defaultMessageTmpl = "{{ .ID }} is {{ .Level }}"
// * email -- Send and email with alert data.
// * exec -- Execute a command passing alert data over STDIN.
// * HipChat -- Post alert message to HipChat room.
// * Alerta -- Post alert message to Alerta.
// * Slack -- Post alert message to Slack channel.
// * OpsGenie -- Send alert to OpsGenie.
// * VictorOps -- Send alert to VictorOps.
Expand Down Expand Up @@ -195,6 +196,10 @@ type AlertNode struct {
// tick:ignore
HipChatHandlers []*HipChatHandler

// Send alert to Alerta.
// tick:ignore
AlertaHandlers []*AlertaHandler

// Send alert to OpsGenie
// tick:ignore
OpsGenieHandlers []*OpsGenieHandler
Expand Down Expand Up @@ -560,6 +565,81 @@ type HipChatHandler struct {
Token string
}

// Send the alert to Alerta.
//
// Example:
// [alerta]
// enabled = true
// url = "https://alerta.yourdomain"
// token = "9hiWoDOZ9IbmHsOTeST123ABciWTIqXQVFDo63h9"
// environment = "Production"
// origin = "Kapacitor"
//
// In order to not post a message every alert interval
// use AlertNode.StateChangesOnly so that only events
// where the alert changed state are posted to the room.
//
// Example:
// stream...
// .alert()
// .alerta()
// .resource('Hostname or service')
// .event('Something went wrong')
//
// Send alerts to Alerta. Alerta requires a resource and event description.
//
// Example:
// stream...
// .alert()
// .alerta()
// .resource('Hostname or service')
// .event('Something went wrong')
// .environment('Development')
// .status('Open')
// .group('Dev. Servers')
//
// Send alerts to Alerta. Alerta accepts detailed alert information.
// tick:property
func (a *AlertNode) Alerta() *AlertaHandler {
alerta := &AlertaHandler{
AlertNode: a,
}
a.AlertaHandlers = append(a.AlertaHandlers, alerta)
return alerta
}

// tick:embedded:AlertNode.Alerta
type AlertaHandler struct {
*AlertNode

// Alerta authentication token.
// If empty uses the token from the configuration.
Token string

// Alerta resource.
// This is a required field.
Resource string

// Alerta event.
// This is a required field.
Event string

// Alerta environment.
// If empty uses the environment from the configuration.
Environment string

// Alerta group.
Group string

// Alerta value.
Value string

// Alerta origin.
// If empty uses the origin from the configuration.
Origin string

}

// Send the alert to Slack.
// To allow Kapacitor to post to Slack,
// go to the URL https://slack.com/services/new/incoming-webhook
Expand Down
18 changes: 18 additions & 0 deletions services/alerta/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package alerta

type Config struct {
// Whether Alerta integration is enabled.
Enabled bool `toml:"enabled"`
// The Alerta URL.
URL string `toml:"url"`
// The authentication token for this notification, can be overridden per alert.
Token string `toml:"token"`
// The environment in which to raise the alert.
Environment string `toml:"environment"`
// The origin of the alert.
Origin string `toml:"origin"`
}

func NewConfig() Config {
return Config{}
}
Loading

0 comments on commit 6ae3ba1

Please sign in to comment.