diff --git a/CHANGELOG.md b/CHANGELOG.md index 28ba67376..29cd99914 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,11 @@ stream .channel('#alerts') ``` + +With this change alert nodes will have an anonymous topic created for them. +This topic is managed like all other topics preserving state etc. across restarts. +As a result existing alert nodes will now remember the state of alerts after restarts and disiabling/enabling a task. + ### Features - [#327](https://github.com/influxdata/kapacitor/issues/327): You can now window based on count in addition to time. @@ -59,6 +64,7 @@ stream - [#1095](https://github.com/influxdata/kapacitor/pull/1095): Add new alert API, with support for configuring handlers and topics. - [#929](https://github.com/influxdata/kapacitor/pull/929): Add SNMP trap service for alerting - [#1110](https://github.com/influxdata/kapacitor/pull/1110): Add new query property for aligning group by intervals to start times. +- [#744](https://github.com/influxdata/kapacitor/issues/744): Preserve alert state across restarts and disable/enable actions. ### Bugfixes diff --git a/alert.go b/alert.go index 33b487c7c..3b874c613 100644 --- a/alert.go +++ b/alert.go @@ -344,11 +344,6 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * an.handlers = append(an.handlers, h) } - // Register Handlers on topic - for _, h := range an.handlers { - et.tm.AlertService.RegisterHandler([]string{an.anonTopic}, h) - } - // Parse level expressions an.levels = make([]stateful.Expression, alert.Critical+1) an.scopePools = make([]stateful.ScopePool, alert.Critical+1) @@ -425,6 +420,19 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * } func (a *AlertNode) runAlert([]byte) error { + // Register delete hook + if a.hasAnonTopic() { + a.et.tm.registerDeleteHookForTask(a.et.Task.ID, deleteAlertHook(a.anonTopic)) + + // Register Handlers on topic + for _, h := range a.handlers { + a.et.tm.AlertService.RegisterHandler([]string{a.anonTopic}, h) + } + // Restore anonTopic + a.et.tm.AlertService.RestoreTopic(a.anonTopic) + } + + // Setup stats a.alertsTriggered = &expvar.Int{} a.statMap.Set(statsAlertsTriggered, a.alertsTriggered) @@ -455,14 +463,8 @@ func (a *AlertNode) runAlert([]byte) error { if state, ok := a.states[p.Group]; ok { currentLevel = state.currentLevel() } else { - // Check for pre-existing level on topic. - // Anon Topics do not preserve state as they are deleted when a task stops, - // so we only check the explict topic. - if a.topic != "" { - if state, ok := a.et.tm.AlertService.EventState(a.topic, id); ok { - currentLevel = state.Level - } - } + // Check for previous state + currentLevel = a.restoreEventState(id) if currentLevel != alert.OK { // Update the state with the restored state a.updateState(p.Time, currentLevel, p.Group) @@ -551,17 +553,8 @@ func (a *AlertNode) runAlert([]byte) error { if state, ok := a.states[b.Group]; ok { currentLevel = state.currentLevel() } else { - // Check for pre-existing level on topics - if len(a.handlers) > 0 { - if state, ok := a.et.tm.AlertService.EventState(a.anonTopic, id); ok { - currentLevel = state.Level - } - } - if a.topic != "" { - if state, ok := a.et.tm.AlertService.EventState(a.topic, id); ok { - currentLevel = state.Level - } - } + // Check for previous state + currentLevel = a.restoreEventState(id) if currentLevel != alert.OK { // Update the state with the restored state a.updateState(b.TMax, currentLevel, b.Group) @@ -668,11 +661,64 @@ func (a *AlertNode) runAlert([]byte) error { a.timer.Stop() } } - // Delete the anonymous topic, which will also deregister its handlers - a.et.tm.AlertService.DeleteTopic(a.anonTopic) + // Close the anonymous topic. + a.et.tm.AlertService.CloseTopic(a.anonTopic) + // Deregister Handlers on topic + for _, h := range a.handlers { + a.et.tm.AlertService.DeregisterHandler([]string{a.anonTopic}, h) + } return nil } +func deleteAlertHook(anonTopic string) deleteHook { + return func(tm *TaskMaster) { + tm.AlertService.DeleteTopic(anonTopic) + } +} + +func (a *AlertNode) hasAnonTopic() bool { + return len(a.handlers) > 0 +} +func (a *AlertNode) hasTopic() bool { + return a.topic != "" +} + +func (a *AlertNode) restoreEventState(id string) alert.Level { + var topicState, anonTopicState alert.EventState + var anonFound, topicFound bool + // Check for previous state on anonTopic + if a.hasAnonTopic() { + if state, ok := a.et.tm.AlertService.EventState(a.anonTopic, id); ok { + anonTopicState = state + anonFound = true + } + } + // Check for previous state on topic. + if a.hasTopic() { + if state, ok := a.et.tm.AlertService.EventState(a.topic, id); ok { + topicState = state + topicFound = true + } + } + if topicState.Level != anonTopicState.Level { + if anonFound && topicFound { + // Anon topic takes precedence + if err := a.et.tm.AlertService.UpdateEvent(a.topic, anonTopicState); err != nil { + a.logger.Printf("E! failed to update topic %q event state for event %q", a.topic, id) + } + } else if topicFound && a.hasAnonTopic() { + // Update event state for topic + if err := a.et.tm.AlertService.UpdateEvent(a.anonTopic, topicState); err != nil { + a.logger.Printf("E! failed to update topic %q event state for event %q", a.topic, id) + } + } // else nothing was found, nothing to do + } + if anonFound { + return anonTopicState.Level + } + return topicState.Level +} + func (a *AlertNode) handleEvent(event alert.Event) { a.alertsTriggered.Add(1) switch event.State.Level { @@ -688,7 +734,7 @@ func (a *AlertNode) handleEvent(event alert.Event) { a.logger.Printf("D! %v alert triggered id:%s msg:%s data:%v", event.State.Level, event.State.ID, event.State.Message, event.Data.Result.Series[0]) // If we have anon handlers, emit event to the anonTopic - if len(a.handlers) > 0 { + if a.hasAnonTopic() { event.Topic = a.anonTopic err := a.et.tm.AlertService.Collect(event) if err != nil { @@ -698,7 +744,7 @@ func (a *AlertNode) handleEvent(event alert.Event) { } // If we have a user define topic, emit event to the topic. - if a.topic != "" { + if a.hasTopic() { event.Topic = a.topic err := a.et.tm.AlertService.Collect(event) if err != nil { diff --git a/alert/topics.go b/alert/topics.go index 13203674a..369a876ee 100644 --- a/alert/topics.go +++ b/alert/topics.go @@ -61,6 +61,17 @@ func (s *Topics) RestoreTopic(id string, eventStates map[string]EventState) { t.restoreEventStates(eventStates) } +func (s *Topics) UpdateEvent(id string, event EventState) { + s.mu.Lock() + defer s.mu.Unlock() + t, ok := s.topics[id] + if !ok { + t = newTopic(id) + s.topics[id] = t + } + t.updateEvent(event) +} + func (s *Topics) EventState(topic, event string) (EventState, bool) { s.mu.RLock() t, ok := s.topics[topic] @@ -126,7 +137,9 @@ func (s *Topics) DeregisterHandler(topics []string, h Handler) { defer s.mu.Unlock() for _, topic := range topics { - s.topics[topic].removeHandler(h) + if t := s.topics[topic]; t != nil { + t.removeHandler(h) + } } } @@ -135,7 +148,9 @@ func (s *Topics) ReplaceHandler(oldTopics, newTopics []string, oldH, newH Handle defer s.mu.Unlock() for _, topic := range oldTopics { - s.topics[topic].removeHandler(oldH) + if t := s.topics[topic]; t != nil { + t.removeHandler(oldH) + } } for _, topic := range newTopics { diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index 5db993fd1..84fe24ca9 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -61,6 +61,7 @@ Commands: show Display detailed information about a task. show-template Display detailed information about a template. show-handler Display detailed information about an alert handler. + show-topic Display detailed information about an alert topic. level Sets the logging level on the kapacitord server. stats Display various stats about Kapacitor. version Displays the Kapacitor version info. @@ -176,6 +177,9 @@ func main() { case "show-handler": commandArgs = args commandF = doShowHandler + case "show-topic": + commandArgs = args + commandF = doShowTopic case "level": commandArgs = args commandF = doLevel @@ -272,6 +276,8 @@ func doHelp(args []string) error { showTemplateUsage() case "show-handler": showHandlerUsage() + case "show-topic": + showTopicUsage() case "level": levelUsage() case "help": @@ -1458,6 +1464,53 @@ func doShowHandler(args []string) error { return nil } +// Show Topic + +func showTopicUsage() { + var u = `Usage: kapacitor show-topic [topic ID] + + Show details about a specific topic. +` + fmt.Fprintln(os.Stderr, u) +} + +type topicEvents []client.TopicEvent + +func (t topicEvents) Len() int { return len(t) } +func (t topicEvents) Less(i int, j int) bool { return t[i].State.Time.Before(t[j].State.Time) } +func (t topicEvents) Swap(i int, j int) { t[i], t[j] = t[j], t[i] } + +func doShowTopic(args []string) error { + if len(args) != 1 { + fmt.Fprintln(os.Stderr, "Must specify one topic ID") + showHandlerUsage() + os.Exit(2) + } + + te, err := cli.ListTopicEvents(cli.TopicEventsLink(args[0]), nil) + if err != nil { + return err + } + maxEvent := 5 // len("Event") + maxMessage := 7 // len("Message") + for _, e := range te.Events { + if l := len(e.ID); l > maxEvent { + maxEvent = l + } + if l := len(e.State.Message); l > maxMessage { + maxMessage = l + } + } + + sort.Sort(topicEvents(te.Events)) + outFmt := fmt.Sprintf("%%-%ds%%-9s%%-%ds%%-23s\n", maxEvent+1, maxMessage+1) + fmt.Printf(outFmt, "Event", "Level", "Message", "Date") + for _, e := range te.Events { + fmt.Printf(outFmt, e.ID, e.State.Level, e.State.Message, e.State.Time.Local().Format(time.RFC822)) + } + return nil +} + // List func listUsage() { diff --git a/server/server_test.go b/server/server_test.go index c06f0d52c..7ec681e13 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -7711,7 +7711,7 @@ stream } } -func TestServer_AlertTopic_PersistedState(t *testing.T) { +func TestServer_AlertAnonTopic(t *testing.T) { // Setup test TCP server ts, err := alerttest.NewTCPServer() if err != nil { @@ -7725,30 +7725,20 @@ func TestServer_AlertTopic_PersistedState(t *testing.T) { cli := Client(s) defer s.Close() - if _, err := cli.CreateHandler(client.HandlerOptions{ - ID: "testAlertHandler", - Topics: []string{"test"}, - Actions: []client.HandlerAction{{ - Kind: "tcp", - Options: map[string]interface{}{"address": ts.Addr}, - }}, - }); err != nil { - t.Fatal(err) - } - tick := ` stream |from() .measurement('alert') |alert() - .topic('test') .id('id') .message('message') .details('details') - .warn(lambda: TRUE) + .warn(lambda: "value" <= 1.0) + .crit(lambda: "value" > 1.0) + .tcp('` + ts.Addr + `') ` - if _, err := cli.CreateTask(client.CreateTaskOptions{ + task, err := cli.CreateTask(client.CreateTaskOptions{ ID: "testAlertHandlers", Type: client.StreamTask, DBRPs: []client.DBRP{{ @@ -7757,10 +7747,12 @@ stream }}, TICKscript: tick, Status: client.Enabled, - }); err != nil { + }) + if err != nil { t.Fatal(err) } + // Write warning point point := "alert value=1 0000000000" v := url.Values{} v.Add("precision", "s") @@ -7769,12 +7761,13 @@ stream // Restart the server s.Restart() - l := cli.TopicEventsLink("test") + topic := "main:testAlertHandlers:alert2" + l := cli.TopicEventsLink(topic) expTopicEvents := client.TopicEvents{ Link: l, - Topic: "test", + Topic: topic, Events: []client.TopicEvent{{ - Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1preview/alerts/topics/test/events/id"}, + Link: client.Link{Relation: client.Self, Href: fmt.Sprintf("/kapacitor/v1preview/alerts/topics/%s/events/id", topic)}, ID: "id", State: client.EventState{ Message: "message", @@ -7791,34 +7784,186 @@ stream t.Fatal(err) } if !reflect.DeepEqual(te, expTopicEvents) { - t.Errorf("unexpected topic events:\ngot\n%+v\nexp\n%+v\n", te, expTopicEvents) + t.Errorf("unexpected topic events for anonymous topic:\ngot\n%+v\nexp\n%+v\n", te, expTopicEvents) } event, err := cli.TopicEvent(expTopicEvents.Events[0].Link) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(event, expTopicEvents.Events[0]) { - t.Errorf("unexpected topic event:\ngot\n%+v\nexp\n%+v\n", event, expTopicEvents.Events[0]) + t.Errorf("unexpected topic event for anonymous topic:\ngot\n%+v\nexp\n%+v\n", event, expTopicEvents.Events[0]) } - te, err = cli.ListTopicEvents(l, &client.ListTopicEventsOptions{ - MinLevel: "CRITICAL", + // Disable task + task, err = cli.UpdateTask(task.Link, client.UpdateTaskOptions{ + Status: client.Disabled, }) if err != nil { t.Fatal(err) } - expTopicEvents.Events = expTopicEvents.Events[0:0] - if !reflect.DeepEqual(te, expTopicEvents) { - t.Errorf("unexpected topic events with minLevel:\ngot\n%+v\nexp\n%+v\n", te, expTopicEvents) + + if _, err := cli.ListTopicEvents(l, nil); err == nil { + t.Fatal("expected error listing anonymous topic for disabled task") + } else if got, exp := err.Error(), fmt.Sprintf("topic %q does not exist", topic); got != exp { + t.Errorf("unexpected error message for nonexistent anonymous topic: got %q exp %q", got, exp) } - l = cli.TopicLink("test") - if err := cli.DeleteTopic(l); err != nil { + // Enable task + task, err = cli.UpdateTask(task.Link, client.UpdateTaskOptions{ + Status: client.Enabled, + }) + if err != nil { t.Fatal(err) } te, err = cli.ListTopicEvents(l, nil) - if err == nil { - t.Fatal("expected error for deleted topic") + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(te, expTopicEvents) { + t.Errorf("unexpected topic events for anonymous topic after re-enable:\ngot\n%+v\nexp\n%+v\n", te, expTopicEvents) + } + + // Restart the server, again and ensure that the anonymous topic state is restored + s.Restart() + te, err = cli.ListTopicEvents(l, nil) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(te, expTopicEvents) { + t.Errorf("unexpected topic events for anonymous topic after re-enable and restart:\ngot\n%+v\nexp\n%+v\n", te, expTopicEvents) + } + + // Delete task + if err := cli.DeleteTask(task.Link); err != nil { + t.Fatal(err) + } + + if _, err := cli.ListTopicEvents(l, nil); err == nil { + t.Fatal("expected error listing anonymous topic for deleted task") + } else if got, exp := err.Error(), fmt.Sprintf("topic %q does not exist", topic); got != exp { + t.Errorf("unexpected error message for nonexistent anonymous topic: got %q exp %q", got, exp) + } +} + +func TestServer_AlertTopic_PersistedState(t *testing.T) { + // Setup test TCP server + ts, err := alerttest.NewTCPServer() + if err != nil { + t.Fatal(err) + } + defer ts.Close() + + tmpDir := MustTempDir() + defer os.RemoveAll(tmpDir) + tmpPath := path.Join(tmpDir, "alert.log") + + // Create default config + c := NewConfig() + s := OpenServer(c) + cli := Client(s) + defer s.Close() + + if _, err := cli.CreateHandler(client.HandlerOptions{ + ID: "testAlertHandler", + Topics: []string{"test"}, + Actions: []client.HandlerAction{{ + Kind: "tcp", + Options: map[string]interface{}{"address": ts.Addr}, + }}, + }); err != nil { + t.Fatal(err) + } + + tick := ` +stream + |from() + .measurement('alert') + |alert() + .topic('test') + .id('id') + .message('message') + .details('details') + .warn(lambda: TRUE) + .log('` + tmpPath + `') +` + + if _, err := cli.CreateTask(client.CreateTaskOptions{ + ID: "testAlertHandlers", + Type: client.StreamTask, + DBRPs: []client.DBRP{{ + Database: "mydb", + RetentionPolicy: "myrp", + }}, + TICKscript: tick, + Status: client.Enabled, + }); err != nil { + t.Fatal(err) + } + + point := "alert value=1 0000000000" + v := url.Values{} + v.Add("precision", "s") + s.MustWrite("mydb", "myrp", point, v) + + // Restart the server + s.Restart() + + topics := []string{ + "test", + "main:testAlertHandlers:alert2", + } + for _, topic := range topics { + l := cli.TopicEventsLink(topic) + expTopicEvents := client.TopicEvents{ + Link: l, + Topic: topic, + Events: []client.TopicEvent{{ + Link: client.Link{Relation: client.Self, Href: fmt.Sprintf("/kapacitor/v1preview/alerts/topics/%s/events/id", topic)}, + ID: "id", + State: client.EventState{ + Message: "message", + Details: "details", + Time: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), + Duration: 0, + Level: "WARNING", + }, + }}, + } + + te, err := cli.ListTopicEvents(l, nil) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(te, expTopicEvents) { + t.Errorf("unexpected topic events for topic %q:\ngot\n%+v\nexp\n%+v\n", topic, te, expTopicEvents) + } + event, err := cli.TopicEvent(expTopicEvents.Events[0].Link) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(event, expTopicEvents.Events[0]) { + t.Errorf("unexpected topic event for topic %q:\ngot\n%+v\nexp\n%+v\n", topic, event, expTopicEvents.Events[0]) + } + + te, err = cli.ListTopicEvents(l, &client.ListTopicEventsOptions{ + MinLevel: "CRITICAL", + }) + if err != nil { + t.Fatal(err) + } + expTopicEvents.Events = expTopicEvents.Events[0:0] + if !reflect.DeepEqual(te, expTopicEvents) { + t.Errorf("unexpected topic events with minLevel for topic %q:\ngot\n%+v\nexp\n%+v\n", topic, te, expTopicEvents) + } + + l = cli.TopicLink(topic) + if err := cli.DeleteTopic(l); err != nil { + t.Fatal(err) + } + te, err = cli.ListTopicEvents(l, nil) + if err == nil { + t.Fatalf("expected error for deleted topic %q", topic) + } } } diff --git a/services/alert/dao.go b/services/alert/dao.go index f6ebe67ce..8ba9f4ebf 100644 --- a/services/alert/dao.go +++ b/services/alert/dao.go @@ -77,6 +77,15 @@ func (h HandlerSpec) Validate() error { return nil } +func (h HandlerSpec) HasTopic(topic string) bool { + for _, t := range h.Topics { + if t == topic { + return true + } + } + return false +} + // HandlerActionSpec defines an action an handler can take. type HandlerActionSpec struct { Kind string `json:"kind"` diff --git a/services/alert/service.go b/services/alert/service.go index ec29da0c8..b9f80b78c 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -74,6 +74,8 @@ type Service struct { handlers map[string]handler + closedTopics map[string]bool + topics *alert.Topics routes []httpd.Route @@ -127,9 +129,10 @@ type Service struct { func NewService(c Config, l *log.Logger) *Service { s := &Service{ - handlers: make(map[string]handler), - topics: alert.NewTopics(l), - logger: l, + handlers: make(map[string]handler), + closedTopics: make(map[string]bool), + topics: alert.NewTopics(l), + logger: l, } return s } @@ -488,14 +491,7 @@ func (s *Service) handleTopicEvent(t *alert.Topic, w http.ResponseWriter, r *htt func (s *Service) handleListTopicHandlers(t *alert.Topic, w http.ResponseWriter, r *http.Request) { var handlers []client.Handler for _, h := range s.handlers { - match := false - for _, topic := range h.Spec.Topics { - if topic == t.ID() { - match = true - break - } - } - if match { + if h.Spec.HasTopic(t.ID()) { handlers = append(handlers, s.convertHandlerSpec(h.Spec)) } } @@ -664,28 +660,104 @@ func (s *Service) EventState(topic, event string) (alert.EventState, bool) { } func (s *Service) Collect(event alert.Event) error { + s.mu.RLock() + closed := s.closedTopics[event.Topic] + s.mu.RUnlock() + if closed { + // Restore topic + if err := s.restoreClosedTopic(event.Topic); err != nil { + return err + } + } + err := s.topics.Collect(event) if err != nil { return err } - t, ok := s.topics.Topic(event.Topic) + return s.persistTopicState(event.Topic) +} + +func (s *Service) persistTopicState(topic string) error { + t, ok := s.topics.Topic(topic) if !ok { - // Topic was deleted, nothing to do + // Topic was deleted since event was collected, nothing to do. return nil } ts := TopicState{ - Topic: event.Topic, + Topic: topic, EventStates: s.convertEventStatesFromAlert(t.EventStates(alert.OK)), } return s.topicsDAO.Put(ts) } +func (s *Service) restoreClosedTopic(topic string) error { + s.mu.Lock() + defer s.mu.Unlock() + if !s.closedTopics[topic] { + // Topic already restored + return nil + } + if err := s.restoreTopic(topic); err != nil { + return err + } + // Topic no longer closed + delete(s.closedTopics, topic) + return nil +} + +// restoreTopic restores a topic's state from the storage and registers any handlers. +// Caller must have lock to call. +func (s *Service) restoreTopic(topic string) error { + // Restore events state from storage + ts, err := s.topicsDAO.Get(topic) + if err != nil && err != ErrNoTopicStateExists { + return err + } else if err != ErrNoTopicStateExists { + s.topics.RestoreTopic(topic, s.convertEventStatesToAlert(ts.EventStates)) + } // else nothing to restore + + // Re-Register all handlers + topics := []string{topic} + for _, h := range s.handlers { + if h.Spec.HasTopic(topic) { + s.topics.RegisterHandler(topics, h.Handler) + } + } + return nil +} + +func (s *Service) RestoreTopic(topic string) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.restoreTopic(topic) +} + +func (s *Service) CloseTopic(topic string) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Delete running topic + s.topics.DeleteTopic(topic) + s.closedTopics[topic] = true + + // Save the final topic state + return s.persistTopicState(topic) +} + func (s *Service) DeleteTopic(topic string) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.closedTopics, topic) s.topics.DeleteTopic(topic) return s.topicsDAO.Delete(topic) } +func (s *Service) UpdateEvent(topic string, event alert.EventState) error { + s.topics.UpdateEvent(topic, event) + return s.persistTopicState(topic) +} + func (s *Service) RegisterHandler(topics []string, h alert.Handler) { s.topics.RegisterHandler(topics, h) } diff --git a/services/task_store/service.go b/services/task_store/service.go index b54ba7118..94e776406 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -1317,7 +1317,7 @@ func (ts *Service) deleteTask(id string) error { kapacitor.NumTasksVar.Add(-1) if task.Status == Enabled { kapacitor.NumEnabledTasksVar.Add(-1) - ts.stopTask(id) + ts.TaskMasterLookup.Main().DeleteTask(id) } return ts.tasks.Delete(id) } diff --git a/task_master.go b/task_master.go index 8f1cdad7b..f9bee600b 100644 --- a/task_master.go +++ b/task_master.go @@ -49,6 +49,8 @@ type UDFService interface { var ErrTaskMasterClosed = errors.New("TaskMaster is closed") var ErrTaskMasterOpen = errors.New("TaskMaster is open") +type deleteHook func(*TaskMaster) + // An execution framework for a set of tasks. type TaskMaster struct { // Unique id for this task master instance @@ -70,9 +72,12 @@ type TaskMaster struct { AlertService interface { EventState(topic, event string) (alert.EventState, bool) + UpdateEvent(topic string, event alert.EventState) error Collect(event alert.Event) error RegisterHandler(topics []string, h alert.Handler) DeregisterHandler(topics []string, h alert.Handler) + RestoreTopic(topic string) error + CloseTopic(topic string) error DeleteTopic(topic string) error } InfluxDBService interface { @@ -157,6 +162,9 @@ type TaskMaster struct { // Executing tasks tasks map[string]*ExecutingTask + // DeleteHooks for tasks + deleteHooks map[string][]deleteHook + logger *log.Logger closed bool @@ -180,6 +188,7 @@ func NewTaskMaster(id string, l LogService) *TaskMaster { taskToForkKeys: make(map[string][]forkKey), batches: make(map[string][]BatchCollector), tasks: make(map[string]*ExecutingTask), + deleteHooks: make(map[string][]deleteHook), LogService: l, logger: l.NewLogger(fmt.Sprintf("[task_master:%s] ", id), log.LstdFlags), closed: true, @@ -446,17 +455,30 @@ func (tm *TaskMaster) StopTask(id string) error { return tm.stopTask(id) } +func (tm *TaskMaster) DeleteTask(id string) error { + tm.mu.Lock() + defer tm.mu.Unlock() + if err := tm.stopTask(id); err != nil { + return err + } + tm.deleteTask(id) + return nil +} + // internal stopTask function. The caller must have acquired // the lock in order to call this function func (tm *TaskMaster) stopTask(id string) (err error) { if et, ok := tm.tasks[id]; ok { + delete(tm.tasks, id) + switch et.Task.Type { case StreamTask: tm.delFork(id) case BatchTask: delete(tm.batches, id) } + err = et.stop() if err != nil { tm.logger.Println("E! Stopped task:", id, err) @@ -467,6 +489,21 @@ func (tm *TaskMaster) stopTask(id string) (err error) { return } +// internal deleteTask function. The caller must have acquired +// the lock in order to call this function +func (tm *TaskMaster) deleteTask(id string) { + hooks := tm.deleteHooks[id] + for _, deleteHook := range hooks { + deleteHook(tm) + } +} + +func (tm *TaskMaster) registerDeleteHookForTask(id string, hook deleteHook) { + tm.mu.Lock() + defer tm.mu.Unlock() + tm.deleteHooks[id] = append(tm.deleteHooks[id], hook) +} + func (tm *TaskMaster) IsExecuting(id string) bool { tm.mu.RLock() defer tm.mu.RUnlock() diff --git a/usr/share/bash-completion/completions/kapacitor b/usr/share/bash-completion/completions/kapacitor index 993181968..d7dc2bbca 100644 --- a/usr/share/bash-completion/completions/kapacitor +++ b/usr/share/bash-completion/completions/kapacitor @@ -165,6 +165,9 @@ _kapacitor() show-handler) words=$(_kapacitor_list handlers "$cur") ;; + show-topic) + words=$(_kapacitor_list topics "$cur") + ;; level) words='debug info warn error' ;; @@ -179,7 +182,7 @@ _kapacitor() ;; *) words='record define define-template define-handler replay replay-live enable disable \ - reload delete list show show-template show-handler level stats version vars service-tests help' + reload delete list show show-template show-handler show-topic level stats version vars service-tests help' ;; esac if [ -z "$COMPREPLY" ]