diff --git a/alert.go b/alert.go index 2339ef651..a66bd1209 100644 --- a/alert.go +++ b/alert.go @@ -167,6 +167,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err n.IsStateChangesOnly = true } + for _, alerta := range n.AlertaHandlers { + alerta := alerta + an.handlers = append(an.handlers, func(ad *AlertData) { an.handleAlerta(alerta, ad) }) + } + for _, og := range n.OpsGenieHandlers { og := og an.handlers = append(an.handlers, func(ad *AlertData) { an.handleOpsGenie(og, ad) }) @@ -594,6 +599,52 @@ func (a *AlertNode) handleHipChat(hipchat *pipeline.HipChatHandler, ad *AlertDat } } +func (a *AlertNode) handleAlerta(alerta *pipeline.AlertaHandler, ad *AlertData) { + if a.et.tm.AlertaService == nil { + a.logger.Println("E! failed to send Alerta message. Alerta is not enabled") + return + } + + var severity string + var status string + + switch ad.Level { + case OKAlert: + severity = "ok" + status = "closed" + case InfoAlert: + severity = "informational" + status = "open" + case WarnAlert: + severity = "warning" + status = "open" + case CritAlert: + severity = "critical" + status = "open" + default: + severity = "unknown" + status = "unknown" + } + + err := a.et.tm.AlertaService.Alert( + alerta.Token, + alerta.Resource, + alerta.Event, + alerta.Environment, + severity, + status, + alerta.Group, + alerta.Value, + ad.Message, + alerta.Origin, + ad.Data, + ) + if err != nil { + a.logger.Println("E! failed to send alert data to Alerta:", err) + return + } +} + func (a *AlertNode) handleOpsGenie(og *pipeline.OpsGenieHandler, ad *AlertData) { if a.et.tm.OpsGenieService == nil { a.logger.Println("E! failed to send OpsGenie alert. OpsGenie is not enabled") diff --git a/cmd/kapacitord/run/config.go b/cmd/kapacitord/run/config.go index a13edefc3..0ac72fd58 100644 --- a/cmd/kapacitord/run/config.go +++ b/cmd/kapacitord/run/config.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/influxdb" @@ -48,6 +49,7 @@ type Config struct { PagerDuty pagerduty.Config `toml:"pagerduty"` Slack slack.Config `toml:"slack"` HipChat hipchat.Config `toml:"hipchat"` + Alerta alerta.Config `toml:"alerta"` Reporting reporting.Config `toml:"reporting"` Stats stats.Config `toml:"stats"` @@ -74,6 +76,7 @@ func NewConfig() *Config { c.PagerDuty = pagerduty.NewConfig() c.Slack = slack.NewConfig() c.HipChat = hipchat.NewConfig() + c.Alerta = alerta.NewConfig() c.Reporting = reporting.NewConfig() c.Stats = stats.NewConfig() diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index e213f9b5b..9f7c7ac76 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -14,6 +14,7 @@ import ( "time" "github.com/influxdata/kapacitor" + "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/influxdb" @@ -132,6 +133,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (* s.appendVictorOpsService(c.VictorOps) s.appendPagerDutyService(c.PagerDuty) s.appendHipChatService(c.HipChat) + s.appendAlertaService(c.Alerta) s.appendSlackService(c.Slack) // Append InfluxDB services @@ -265,6 +267,16 @@ func (s *Server) appendHipChatService(c hipchat.Config) { } } +func (s *Server) appendAlertaService(c alerta.Config) { + if c.Enabled { + l := s.LogService.NewLogger("[alerta] ", log.LstdFlags) + srv := alerta.NewService(c, l) + s.TaskMaster.AlertaService = srv + + s.Services = append(s.Services, srv) + } +} + func (s *Server) appendCollectdService(c collectd.Config) { if !c.Enabled { return diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index e49705269..0765a5cb0 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -167,6 +167,18 @@ data_dir = "/var/lib/kapacitor" # without explicitly marking them in the TICKscript. global = false +[alerta] + # Configure Alerta. + enabled = false + # The Alerta URL. + url = "" + # Default authentication token. + token = "" + # Default environment. + environment = "" + # Default origin. + origin = "kapacitor" + [reporting] # Send anonymous usage statistics # every 12 hours to Enterprise. diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index c8a54ee2d..9e93d7396 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/kapacitor" "github.com/influxdata/kapacitor/clock" + "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/opsgenie" @@ -1362,6 +1363,101 @@ stream } } +func TestStream_AlertAlerta(t *testing.T) { + requestCount := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ + type postData struct { + Resource string `json:"resource"` + Event string `json:"event"` + Environment string `json:"environment"` + Text string `json:"text"` + Origin string `json:"origin"` + } + pd := postData{} + dec := json.NewDecoder(r.Body) + dec.Decode(&pd) + + if requestCount == 1 { + if exp := "/alert?api-key=testtoken1234567"; r.URL.String() != exp { + t.Errorf("unexpected url got %s exp %s", r.URL.String(), exp) + } + if exp := "production"; pd.Environment != exp { + t.Errorf("unexpected environment got %s exp %s", pd.Environment, exp) + } + if exp := "Kapacitor"; pd.Origin != exp { + t.Errorf("unexpected origin got %s exp %s", pd.Origin, exp) + } + } else { + if exp := "/alert?api-key=anothertesttoken"; r.URL.String() != exp { + t.Errorf("unexpected url got %s exp %s", r.URL.String(), exp) + } + if exp := "development"; pd.Environment != exp { + t.Errorf("unexpected environment got %s exp %s", pd.Environment, exp) + } + if exp := "override"; pd.Origin != exp { + t.Errorf("unexpected origin got %s exp %s", pd.Origin, exp) + } + } + if exp := "serverA"; pd.Resource != exp { + t.Errorf("unexpected resource got %s exp %s", pd.Resource, exp) + } + if exp := "CPU Idle"; pd.Event != exp { + t.Errorf("unexpected event got %s exp %s", pd.Event, exp) + } + if exp := "kapacitor/cpu/serverA is CRITICAL"; pd.Text != exp { + t.Errorf("unexpected text got %s exp %s", pd.Text, exp) + } + })) + defer ts.Close() + + var script = ` +stream + .from().measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + .window() + .period(10s) + .every(10s) + .mapReduce(influxql.count('idle')) + .alert() + .id('{{ index .Tags "host" }}') + .message('kapacitor/{{ .Name }}/{{ index .Tags "host" }} is {{ .Level }}') + .info(lambda: "count" > 6.0) + .warn(lambda: "count" > 7.0) + .crit(lambda: "count" > 8.0) + .alerta() + .token('testtoken1234567') + .resource('serverA') + .event('CPU Idle') + .environment('production') + .alerta() + .token('anothertesttoken') + .resource('serverA') + .event('CPU Idle') + .environment('development') + .origin('override') +` + + clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script) + defer tm.Close() + + c := alerta.NewConfig() + c.URL = ts.URL + c.Origin = "Kapacitor" + sl := alerta.NewService(c, logService.NewLogger("[test_alerta] ", log.LstdFlags)) + tm.AlertaService = sl + + err := fastForwardTask(clock, et, replayErr, tm, 13*time.Second) + if err != nil { + t.Error(err) + } + + if requestCount != 2 { + t.Errorf("unexpected requestCount got %d exp 2", requestCount) + } +} + func TestStream_AlertOpsGenie(t *testing.T) { requestCount := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pipeline/alert.go b/pipeline/alert.go index 8bb5453a9..3678d92da 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -29,6 +29,7 @@ const defaultMessageTmpl = "{{ .ID }} is {{ .Level }}" // * email -- Send and email with alert data. // * exec -- Execute a command passing alert data over STDIN. // * HipChat -- Post alert message to HipChat room. +// * Alerta -- Post alert message to Alerta. // * Slack -- Post alert message to Slack channel. // * OpsGenie -- Send alert to OpsGenie. // * VictorOps -- Send alert to VictorOps. @@ -195,6 +196,10 @@ type AlertNode struct { // tick:ignore HipChatHandlers []*HipChatHandler + // Send alert to Alerta. + // tick:ignore + AlertaHandlers []*AlertaHandler + // Send alert to OpsGenie // tick:ignore OpsGenieHandlers []*OpsGenieHandler @@ -560,6 +565,81 @@ type HipChatHandler struct { Token string } +// Send the alert to Alerta. +// +// Example: +// [alerta] +// enabled = true +// url = "https://alerta.yourdomain" +// token = "9hiWoDOZ9IbmHsOTeST123ABciWTIqXQVFDo63h9" +// environment = "Production" +// origin = "Kapacitor" +// +// In order to not post a message every alert interval +// use AlertNode.StateChangesOnly so that only events +// where the alert changed state are posted to the room. +// +// Example: +// stream... +// .alert() +// .alerta() +// .resource('Hostname or service') +// .event('Something went wrong') +// +// Send alerts to Alerta. Alerta requires a resource and event description. +// +// Example: +// stream... +// .alert() +// .alerta() +// .resource('Hostname or service') +// .event('Something went wrong') +// .environment('Development') +// .status('Open') +// .group('Dev. Servers') +// +// Send alerts to Alerta. Alerta accepts detailed alert information. +// tick:property +func (a *AlertNode) Alerta() *AlertaHandler { + alerta := &AlertaHandler{ + AlertNode: a, + } + a.AlertaHandlers = append(a.AlertaHandlers, alerta) + return alerta +} + +// tick:embedded:AlertNode.Alerta +type AlertaHandler struct { + *AlertNode + + // Alerta authentication token. + // If empty uses the token from the configuration. + Token string + + // Alerta resource. + // This is a required field. + Resource string + + // Alerta event. + // This is a required field. + Event string + + // Alerta environment. + // If empty uses the environment from the configuration. + Environment string + + // Alerta group. + Group string + + // Alerta value. + Value string + + // Alerta origin. + // If empty uses the origin from the configuration. + Origin string + +} + // Send the alert to Slack. // To allow Kapacitor to post to Slack, // go to the URL https://slack.com/services/new/incoming-webhook diff --git a/services/alerta/config.go b/services/alerta/config.go new file mode 100644 index 000000000..344892a7a --- /dev/null +++ b/services/alerta/config.go @@ -0,0 +1,18 @@ +package alerta + +type Config struct { + // Whether Alerta integration is enabled. + Enabled bool `toml:"enabled"` + // The Alerta URL. + URL string `toml:"url"` + // The authentication token for this notification, can be overridden per alert. + Token string `toml:"token"` + // The environment in which to raise the alert. + Environment string `toml:"environment"` + // The origin of the alert. + Origin string `toml:"origin"` +} + +func NewConfig() Config { + return Config{} +} diff --git a/services/alerta/service.go b/services/alerta/service.go new file mode 100644 index 000000000..142262ed7 --- /dev/null +++ b/services/alerta/service.go @@ -0,0 +1,95 @@ +package alerta + +import ( + "bytes" + "encoding/json" + "errors" + "log" + "net/http" + "net/url" +) + +type Service struct { + url string + token string + environment string + origin string + logger *log.Logger +} + +func NewService(c Config, l *log.Logger) *Service { + return &Service{ + url: c.URL, + token: c.Token, + environment: c.Environment, + origin: c.Origin, + logger: l, + } +} + +func (s *Service) Open() error { + return nil +} + +func (s *Service) Close() error { + return nil +} + +func (s *Service) Alert(token, resource, event, environment, severity, status, group, value, message, origin string, data interface{}) error { + if resource == "" || event == "" { + return errors.New("Resource and Event are required to send an alert") + } + + if token == "" { + token = s.token + } + + if environment == "" { + environment = s.environment + } + + if origin == "" { + origin = s.origin + } + + var Url *url.URL + Url, err := url.Parse(s.url + "/alert?api-key=" + token) + if err != nil { + return err + } + + postData := make(map[string]interface{}) + postData["resource"] = resource + postData["event"] = event + postData["environment"] = environment + postData["severity"] = severity + postData["status"] = status + postData["group"] = group + postData["value"] = value + postData["text"] = message + postData["origin"] = origin + postData["data"] = data + + var post bytes.Buffer + enc := json.NewEncoder(&post) + err = enc.Encode(postData) + if err != nil { + return err + } + + resp, err := http.Post(Url.String(), "application/json", &post) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + type response struct { + Error string `json:"error"` + } + r := &response{Error: "failed to understand Alerta response"} + dec := json.NewDecoder(resp.Body) + dec.Decode(r) + return errors.New(r.Error) + } + return nil +} diff --git a/task_master.go b/task_master.go index 3313e2081..9e64958fb 100644 --- a/task_master.go +++ b/task_master.go @@ -55,6 +55,9 @@ type TaskMaster struct { Global() bool Alert(room, token, message string, level AlertLevel) error } + AlertaService interface { + Alert(token, resource, event, environment, severity, status, group, value, message, origin string, data interface{}) error + } LogService LogService // Incoming streams