From e8e4e5061a82f007ae3e7dc22ca4d400e9d921a0 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 17 Feb 2017 15:15:24 -0700 Subject: [PATCH] add concept of external vs internal handlers --- alert/topics.go | 8 ++++-- alert/types.go | 1 + services/alert/handlers.go | 5 ++++ services/alert/service.go | 50 ++++++++++++++++++++++++++------------ 4 files changed, 46 insertions(+), 18 deletions(-) diff --git a/alert/topics.go b/alert/topics.go index 597cbf075..d172a5656 100644 --- a/alert/topics.go +++ b/alert/topics.go @@ -85,6 +85,7 @@ func (s *Topics) EventState(topic, event string) (EventState, bool) { return t.EventState(event) } +// Collect collects an event and handles the event. func (s *Topics) Collect(event Event) error { s.mu.RLock() topic := s.topics[event.Topic] @@ -102,7 +103,7 @@ func (s *Topics) Collect(event Event) error { s.mu.Unlock() } - return topic.handleEvent(event) + return topic.collect(event) } func (s *Topics) DeleteTopic(topic string) { @@ -338,14 +339,17 @@ func (t *Topic) close() { vars.DeleteStatistic(t.statsKey) } -func (t *Topic) handleEvent(event Event) error { +func (t *Topic) collect(event Event) error { prev, ok := t.updateEvent(event.State) if ok { event.previousState = prev } t.collected.Add(1) + return t.handleEvent(event) +} +func (t *Topic) handleEvent(event Event) error { t.mu.RLock() defer t.mu.RUnlock() diff --git a/alert/types.go b/alert/types.go index 7ebd0e21e..b3a02f19c 100644 --- a/alert/types.go +++ b/alert/types.go @@ -13,6 +13,7 @@ type Event struct { Topic string State EventState Data EventData + NoExternal bool previousState EventState } diff --git a/services/alert/handlers.go b/services/alert/handlers.go index 8e744de7a..691684dab 100644 --- a/services/alert/handlers.go +++ b/services/alert/handlers.go @@ -265,12 +265,15 @@ func (h *aggregateHandler) run() { ticker := time.NewTicker(h.interval) defer ticker.Stop() var events []alert.Event + // Keep track if this batch of events should be external. + external := false for { select { case <-h.closing: return case e := <-h.events: events = append(events, e) + external = external || !e.NoExternal case <-ticker.C: if len(events) == 0 { continue @@ -281,6 +284,7 @@ func (h *aggregateHandler) run() { ID: "aggregate", Message: fmt.Sprintf("Received %d events in the last %v.", len(events), h.interval), }, + NoExternal: !external, } for i, e := range events { agg.Topic = e.Topic @@ -298,6 +302,7 @@ func (h *aggregateHandler) run() { agg.State.Details = strings.Join(details, "\n") h.next.Handle(agg) events = events[0:0] + external = false } } } diff --git a/services/alert/service.go b/services/alert/service.go index 95ac017ff..acc7a966a 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -986,7 +986,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle if err != nil { return nil, err } - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "exec": c := ExecHandlerConfig{ Commander: s.Commander, @@ -996,7 +996,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := NewExecHandler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "hipchat": c := hipchat.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1004,7 +1004,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.HipChatService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "log": c := DefaultLogHandlerConfig() err = decodeOptions(spec.Options, &c) @@ -1015,7 +1015,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle if err != nil { return nil, err } - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "opsgenie": c := opsgenie.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1023,7 +1023,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.OpsGenieService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "pagerduty": c := pagerduty.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1031,7 +1031,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.PagerDutyService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "pushover": c := pushover.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1039,7 +1039,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.PushoverService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "post": c := PostHandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1047,7 +1047,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := NewPostHandler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "publish": c := PublishHandlerConfig{ topics: s.topics, @@ -1060,7 +1060,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle ha = newPassThroughHandler(h) case "sensu": h := s.SensuService.Handler(s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "slack": c := slack.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1068,7 +1068,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.SlackService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "smtp": c := smtp.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1076,7 +1076,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.SMTPService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "snmptrap": c := snmptrap.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1087,7 +1087,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle if err != nil { return nil, err } - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "stateChangesOnly": c := StateChangesOnlyHandlerConfig{ topics: s.topics, @@ -1095,7 +1095,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle ha = NewStateChangesOnlyHandler(c, s.logger) case "talk": h := s.TalkService.Handler(s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "tcp": c := TCPHandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1103,7 +1103,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := NewTCPHandler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "telegram": c := telegram.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1111,7 +1111,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.TelegramService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) case "victorops": c := victorops.HandlerConfig{} err = decodeOptions(spec.Options, &c) @@ -1119,7 +1119,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle return } h := s.VictorOpsService.Handler(c, s.logger) - ha = newPassThroughHandler(h) + ha = newPassThroughHandler(newExternalHandler(h)) default: err = fmt.Errorf("unsupported action kind %q", spec.Kind) } @@ -1153,3 +1153,21 @@ func (h *passThroughHandler) Close() { type noopHandler struct{} func (h noopHandler) Handle(event alert.Event) {} + +// ExternalHandler wraps an existing handler that calls out to external services. +// The events are checked for the NoExternal flag before being passed onto the external handler. +type externalHandler struct { + h alert.Handler +} + +func (h *externalHandler) Handle(event alert.Event) { + if !event.NoExternal { + h.h.Handle(event) + } +} + +func newExternalHandler(h alert.Handler) *externalHandler { + return &externalHandler{ + h: h, + } +}