Skip to content

Commit

Permalink
add concept of external vs internal handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 15, 2017
1 parent 92df21d commit e8e4e50
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 18 deletions.
8 changes: 6 additions & 2 deletions alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (s *Topics) EventState(topic, event string) (EventState, bool) {
return t.EventState(event)
}

// Collect collects an event and handles the event.
func (s *Topics) Collect(event Event) error {
s.mu.RLock()
topic := s.topics[event.Topic]
Expand All @@ -102,7 +103,7 @@ func (s *Topics) Collect(event Event) error {
s.mu.Unlock()
}

return topic.handleEvent(event)
return topic.collect(event)
}

func (s *Topics) DeleteTopic(topic string) {
Expand Down Expand Up @@ -338,14 +339,17 @@ func (t *Topic) close() {
vars.DeleteStatistic(t.statsKey)
}

func (t *Topic) handleEvent(event Event) error {
func (t *Topic) collect(event Event) error {
prev, ok := t.updateEvent(event.State)
if ok {
event.previousState = prev
}

t.collected.Add(1)
return t.handleEvent(event)
}

func (t *Topic) handleEvent(event Event) error {
t.mu.RLock()
defer t.mu.RUnlock()

Expand Down
1 change: 1 addition & 0 deletions alert/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Event struct {
Topic string
State EventState
Data EventData
NoExternal bool
previousState EventState
}

Expand Down
5 changes: 5 additions & 0 deletions services/alert/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,15 @@ func (h *aggregateHandler) run() {
ticker := time.NewTicker(h.interval)
defer ticker.Stop()
var events []alert.Event
// Keep track if this batch of events should be external.
external := false
for {
select {
case <-h.closing:
return
case e := <-h.events:
events = append(events, e)
external = external || !e.NoExternal
case <-ticker.C:
if len(events) == 0 {
continue
Expand All @@ -281,6 +284,7 @@ func (h *aggregateHandler) run() {
ID: "aggregate",
Message: fmt.Sprintf("Received %d events in the last %v.", len(events), h.interval),
},
NoExternal: !external,
}
for i, e := range events {
agg.Topic = e.Topic
Expand All @@ -298,6 +302,7 @@ func (h *aggregateHandler) run() {
agg.State.Details = strings.Join(details, "\n")
h.next.Handle(agg)
events = events[0:0]
external = false
}
}
}
Expand Down
50 changes: 34 additions & 16 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle
if err != nil {
return nil, err
}
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "exec":
c := ExecHandlerConfig{
Commander: s.Commander,
Expand All @@ -996,15 +996,15 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle
return
}
h := NewExecHandler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "hipchat":
c := hipchat.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.HipChatService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "log":
c := DefaultLogHandlerConfig()
err = decodeOptions(spec.Options, &c)
Expand All @@ -1015,39 +1015,39 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle
if err != nil {
return nil, err
}
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "opsgenie":
c := opsgenie.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.OpsGenieService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "pagerduty":
c := pagerduty.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.PagerDutyService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "pushover":
c := pushover.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.PushoverService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "post":
c := PostHandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := NewPostHandler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "publish":
c := PublishHandlerConfig{
topics: s.topics,
Expand All @@ -1060,23 +1060,23 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle
ha = newPassThroughHandler(h)
case "sensu":
h := s.SensuService.Handler(s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "slack":
c := slack.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.SlackService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "smtp":
c := smtp.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.SMTPService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "snmptrap":
c := snmptrap.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
Expand All @@ -1087,39 +1087,39 @@ func (s *Service) createHandlerActionFromSpec(spec HandlerActionSpec) (ha handle
if err != nil {
return nil, err
}
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "stateChangesOnly":
c := StateChangesOnlyHandlerConfig{
topics: s.topics,
}
ha = NewStateChangesOnlyHandler(c, s.logger)
case "talk":
h := s.TalkService.Handler(s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "tcp":
c := TCPHandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := NewTCPHandler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "telegram":
c := telegram.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.TelegramService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
case "victorops":
c := victorops.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return
}
h := s.VictorOpsService.Handler(c, s.logger)
ha = newPassThroughHandler(h)
ha = newPassThroughHandler(newExternalHandler(h))
default:
err = fmt.Errorf("unsupported action kind %q", spec.Kind)
}
Expand Down Expand Up @@ -1153,3 +1153,21 @@ func (h *passThroughHandler) Close() {
type noopHandler struct{}

func (h noopHandler) Handle(event alert.Event) {}

// ExternalHandler wraps an existing handler that calls out to external services.
// The events are checked for the NoExternal flag before being passed onto the external handler.
type externalHandler struct {
h alert.Handler
}

func (h *externalHandler) Handle(event alert.Event) {
if !event.NoExternal {
h.h.Handle(event)
}
}

func newExternalHandler(h alert.Handler) *externalHandler {
return &externalHandler{
h: h,
}
}

0 comments on commit e8e4e50

Please sign in to comment.