Skip to content

Commit

Permalink
Merged pull request influxdata#1120 from influxdata/nc-restore-anon-t…
Browse files Browse the repository at this point in the history
…opics

Restore anonymous topics
  • Loading branch information
nathanielc committed Jan 11, 2017
2 parents 950cfe1 + ba96a38 commit a4175e4
Show file tree
Hide file tree
Showing 10 changed files with 462 additions and 76 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ stream
.channel('#alerts')
```


With this change alert nodes will have an anonymous topic created for them.
This topic is managed like all other topics preserving state etc. across restarts.
As a result existing alert nodes will now remember the state of alerts after restarts and disiabling/enabling a task.

### Features

- [#327](https://github.com/influxdata/kapacitor/issues/327): You can now window based on count in addition to time.
Expand All @@ -59,6 +64,7 @@ stream
- [#1095](https://github.com/influxdata/kapacitor/pull/1095): Add new alert API, with support for configuring handlers and topics.
- [#929](https://github.com/influxdata/kapacitor/pull/929): Add SNMP trap service for alerting
- [#1110](https://github.com/influxdata/kapacitor/pull/1110): Add new query property for aligning group by intervals to start times.
- [#744](https://github.com/influxdata/kapacitor/issues/744): Preserve alert state across restarts and disable/enable actions.


### Bugfixes
Expand Down
102 changes: 74 additions & 28 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, h)
}

// Register Handlers on topic
for _, h := range an.handlers {
et.tm.AlertService.RegisterHandler([]string{an.anonTopic}, h)
}

// Parse level expressions
an.levels = make([]stateful.Expression, alert.Critical+1)
an.scopePools = make([]stateful.ScopePool, alert.Critical+1)
Expand Down Expand Up @@ -425,6 +420,19 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
}

func (a *AlertNode) runAlert([]byte) error {
// Register delete hook
if a.hasAnonTopic() {
a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic))

// Register Handlers on topic
for _, h := range a.handlers {
a.et.tm.AlertService.RegisterHandler([]string{a.anonTopic}, h)
}
// Restore anonTopic
a.et.tm.AlertService.RestoreTopic(a.anonTopic)
}

// Setup stats
a.alertsTriggered = &expvar.Int{}
a.statMap.Set(statsAlertsTriggered, a.alertsTriggered)

Expand Down Expand Up @@ -455,14 +463,8 @@ func (a *AlertNode) runAlert([]byte) error {
if state, ok := a.states[p.Group]; ok {
currentLevel = state.currentLevel()
} else {
// Check for pre-existing level on topic.
// Anon Topics do not preserve state as they are deleted when a task stops,
// so we only check the explict topic.
if a.topic != "" {
if state, ok := a.et.tm.AlertService.EventState(a.topic, id); ok {
currentLevel = state.Level
}
}
// Check for previous state
currentLevel = a.restoreEventState(id)
if currentLevel != alert.OK {
// Update the state with the restored state
a.updateState(p.Time, currentLevel, p.Group)
Expand Down Expand Up @@ -551,17 +553,8 @@ func (a *AlertNode) runAlert([]byte) error {
if state, ok := a.states[b.Group]; ok {
currentLevel = state.currentLevel()
} else {
// Check for pre-existing level on topics
if len(a.handlers) > 0 {
if state, ok := a.et.tm.AlertService.EventState(a.anonTopic, id); ok {
currentLevel = state.Level
}
}
if a.topic != "" {
if state, ok := a.et.tm.AlertService.EventState(a.topic, id); ok {
currentLevel = state.Level
}
}
// Check for previous state
currentLevel = a.restoreEventState(id)
if currentLevel != alert.OK {
// Update the state with the restored state
a.updateState(b.TMax, currentLevel, b.Group)
Expand Down Expand Up @@ -668,11 +661,64 @@ func (a *AlertNode) runAlert([]byte) error {
a.timer.Stop()
}
}
// Delete the anonymous topic, which will also deregister its handlers
a.et.tm.AlertService.DeleteTopic(a.anonTopic)
// Close the anonymous topic.
a.et.tm.AlertService.CloseTopic(a.anonTopic)
// Deregister Handlers on topic
for _, h := range a.handlers {
a.et.tm.AlertService.DeregisterHandler([]string{a.anonTopic}, h)
}
return nil
}

func deleteAlertHook(anonTopic string) deleteHook {
return func(tm *TaskMaster) {
tm.AlertService.DeleteTopic(anonTopic)
}
}

func (a *AlertNode) hasAnonTopic() bool {
return len(a.handlers) > 0
}
func (a *AlertNode) hasTopic() bool {
return a.topic != ""
}

func (a *AlertNode) restoreEventState(id string) alert.Level {
var topicState, anonTopicState alert.EventState
var anonFound, topicFound bool
// Check for previous state on anonTopic
if a.hasAnonTopic() {
if state, ok := a.et.tm.AlertService.EventState(a.anonTopic, id); ok {
anonTopicState = state
anonFound = true
}
}
// Check for previous state on topic.
if a.hasTopic() {
if state, ok := a.et.tm.AlertService.EventState(a.topic, id); ok {
topicState = state
topicFound = true
}
}
if topicState.Level != anonTopicState.Level {
if anonFound && topicFound {
// Anon topic takes precedence
if err := a.et.tm.AlertService.UpdateEvent(a.topic, anonTopicState); err != nil {
a.logger.Printf("E! failed to update topic %q event state for event %q", a.topic, id)
}
} else if topicFound && a.hasAnonTopic() {
// Update event state for topic
if err := a.et.tm.AlertService.UpdateEvent(a.anonTopic, topicState); err != nil {
a.logger.Printf("E! failed to update topic %q event state for event %q", a.topic, id)
}
} // else nothing was found, nothing to do
}
if anonFound {
return anonTopicState.Level
}
return topicState.Level
}

func (a *AlertNode) handleEvent(event alert.Event) {
a.alertsTriggered.Add(1)
switch event.State.Level {
Expand All @@ -688,7 +734,7 @@ func (a *AlertNode) handleEvent(event alert.Event) {
a.logger.Printf("D! %v alert triggered id:%s msg:%s data:%v", event.State.Level, event.State.ID, event.State.Message, event.Data.Result.Series[0])

// If we have anon handlers, emit event to the anonTopic
if len(a.handlers) > 0 {
if a.hasAnonTopic() {
event.Topic = a.anonTopic
err := a.et.tm.AlertService.Collect(event)
if err != nil {
Expand All @@ -698,7 +744,7 @@ func (a *AlertNode) handleEvent(event alert.Event) {
}

// If we have a user define topic, emit event to the topic.
if a.topic != "" {
if a.hasTopic() {
event.Topic = a.topic
err := a.et.tm.AlertService.Collect(event)
if err != nil {
Expand Down
19 changes: 17 additions & 2 deletions alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ func (s *Topics) RestoreTopic(id string, eventStates map[string]EventState) {
t.restoreEventStates(eventStates)
}

func (s *Topics) UpdateEvent(id string, event EventState) {
s.mu.Lock()
defer s.mu.Unlock()
t, ok := s.topics[id]
if !ok {
t = newTopic(id)
s.topics[id] = t
}
t.updateEvent(event)
}

func (s *Topics) EventState(topic, event string) (EventState, bool) {
s.mu.RLock()
t, ok := s.topics[topic]
Expand Down Expand Up @@ -126,7 +137,9 @@ func (s *Topics) DeregisterHandler(topics []string, h Handler) {
defer s.mu.Unlock()

for _, topic := range topics {
s.topics[topic].removeHandler(h)
if t := s.topics[topic]; t != nil {
t.removeHandler(h)
}
}
}

Expand All @@ -135,7 +148,9 @@ func (s *Topics) ReplaceHandler(oldTopics, newTopics []string, oldH, newH Handle
defer s.mu.Unlock()

for _, topic := range oldTopics {
s.topics[topic].removeHandler(oldH)
if t := s.topics[topic]; t != nil {
t.removeHandler(oldH)
}
}

for _, topic := range newTopics {
Expand Down
53 changes: 53 additions & 0 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Commands:
show Display detailed information about a task.
show-template Display detailed information about a template.
show-handler Display detailed information about an alert handler.
show-topic Display detailed information about an alert topic.
level Sets the logging level on the kapacitord server.
stats Display various stats about Kapacitor.
version Displays the Kapacitor version info.
Expand Down Expand Up @@ -176,6 +177,9 @@ func main() {
case "show-handler":
commandArgs = args
commandF = doShowHandler
case "show-topic":
commandArgs = args
commandF = doShowTopic
case "level":
commandArgs = args
commandF = doLevel
Expand Down Expand Up @@ -272,6 +276,8 @@ func doHelp(args []string) error {
showTemplateUsage()
case "show-handler":
showHandlerUsage()
case "show-topic":
showTopicUsage()
case "level":
levelUsage()
case "help":
Expand Down Expand Up @@ -1458,6 +1464,53 @@ func doShowHandler(args []string) error {
return nil
}

// Show Topic

func showTopicUsage() {
var u = `Usage: kapacitor show-topic [topic ID]
Show details about a specific topic.
`
fmt.Fprintln(os.Stderr, u)
}

type topicEvents []client.TopicEvent

func (t topicEvents) Len() int { return len(t) }
func (t topicEvents) Less(i int, j int) bool { return t[i].State.Time.Before(t[j].State.Time) }
func (t topicEvents) Swap(i int, j int) { t[i], t[j] = t[j], t[i] }

func doShowTopic(args []string) error {
if len(args) != 1 {
fmt.Fprintln(os.Stderr, "Must specify one topic ID")
showHandlerUsage()
os.Exit(2)
}

te, err := cli.ListTopicEvents(cli.TopicEventsLink(args[0]), nil)
if err != nil {
return err
}
maxEvent := 5 // len("Event")
maxMessage := 7 // len("Message")
for _, e := range te.Events {
if l := len(e.ID); l > maxEvent {
maxEvent = l
}
if l := len(e.State.Message); l > maxMessage {
maxMessage = l
}
}

sort.Sort(topicEvents(te.Events))
outFmt := fmt.Sprintf("%%-%ds%%-9s%%-%ds%%-23s\n", maxEvent+1, maxMessage+1)
fmt.Printf(outFmt, "Event", "Level", "Message", "Date")
for _, e := range te.Events {
fmt.Printf(outFmt, e.ID, e.State.Level, e.State.Message, e.State.Time.Local().Format(time.RFC822))
}
return nil
}

// List

func listUsage() {
Expand Down
Loading

0 comments on commit a4175e4

Please sign in to comment.