Skip to content

Commit

Permalink
count number of events collected per topic
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 12, 2017
1 parent 7c56d8f commit ff8688d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 19 deletions.
46 changes: 40 additions & 6 deletions alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"
"sort"
"sync"
"sync/atomic"
)

const (
Expand Down Expand Up @@ -163,16 +164,19 @@ func (s *Topics) ReplaceHandler(oldTopics, newTopics []string, oldH, newH Handle

// TopicStatus returns the max alert level for each topic matching 'pattern', not returning
// any topics with max alert levels less severe than 'minLevel'
func (s *Topics) TopicStatus(pattern string, minLevel Level) map[string]Level {
func (s *Topics) TopicStatus(pattern string, minLevel Level) map[string]TopicStatus {
s.mu.RLock()
res := make(map[string]Level, len(s.topics))
res := make(map[string]TopicStatus, len(s.topics))
for _, topic := range s.topics {
if !match(pattern, topic.ID()) {
continue
}
level := topic.MaxLevel()
if level >= minLevel {
res[topic.ID()] = level
res[topic.ID()] = TopicStatus{
Level: level,
Collected: topic.Collected(),
}
}
}
s.mu.RUnlock()
Expand Down Expand Up @@ -216,6 +220,8 @@ type Topic struct {
events map[string]*EventState
sorted []*EventState

collected int64

handlers []*bufHandler
}

Expand All @@ -229,6 +235,12 @@ func (t *Topic) ID() string {
return t.id
}

func (t *Topic) Status() TopicStatus {
return TopicStatus{
Level: t.MaxLevel(),
Collected: t.Collected(),
}
}
func (t *Topic) MaxLevel() Level {
level := OK
t.mu.RLock()
Expand Down Expand Up @@ -315,7 +327,20 @@ func (t *Topic) close() {
}

func (t *Topic) handleEvent(event Event) error {
t.updateEvent(event.State)
prev, ok := t.updateEvent(event.State)
if ok {
event.previousState = prev
}

// Count collected event
for {
old := atomic.LoadInt64(&t.collected)
new := old + 1
if atomic.CompareAndSwapInt64(&t.collected, old, new) {
break
}
}

t.mu.RLock()
defer t.mu.RUnlock()

Expand All @@ -330,12 +355,17 @@ func (t *Topic) handleEvent(event Event) error {
if len(errs) != 0 {
return errs
}

return nil
}

func (t *Topic) Collected() int64 {
return atomic.LoadInt64(&t.collected)
}

// updateEvent will store the latest state for the given ID.
func (t *Topic) updateEvent(state EventState) {
var needSort bool
func (t *Topic) updateEvent(state EventState) (EventState, bool) {
var hasPrev, needSort bool
t.mu.Lock()
defer t.mu.Unlock()
cur := t.events[state.ID]
Expand All @@ -344,14 +374,18 @@ func (t *Topic) updateEvent(state EventState) {
cur = new(EventState)
t.events[state.ID] = cur
t.sorted = append(t.sorted, cur)
} else {
hasPrev = true
}
needSort = needSort || cur.Level != state.Level

prev := *cur
*cur = state

if needSort {
sort.Sort(sortedStates(t.sorted))
}
return prev, hasPrev
}

type sortedStates []*EventState
Expand Down
16 changes: 13 additions & 3 deletions alert/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import (
)

type Event struct {
Topic string
State EventState
Data EventData
Topic string
State EventState
Data EventData
previousState EventState
}

func (e Event) PreviousState() EventState {
return e.previousState
}

func (e Event) TemplateData() TemplateData {
Expand Down Expand Up @@ -143,3 +148,8 @@ func ParseLevel(s string) (l Level, err error) {
err = l.UnmarshalText([]byte(strings.ToUpper(s)))
return
}

type TopicStatus struct {
Level Level
Collected int64
}
18 changes: 18 additions & 0 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,7 @@ type Topic struct {
Link Link `json:"link"`
ID string `json:"id"`
Level string `json:"level"`
Collected int64 `json:"collected"`
EventsLink Link `json:"events-link"`
HandlersLink Link `json:"handlers-link"`
}
Expand Down Expand Up @@ -1794,6 +1795,23 @@ func (c *Client) ListTopics(opt *ListTopicsOptions) (Topics, error) {
return topics, nil
}

func (c *Client) Topic(link Link) (Topic, error) {
var t Topic
if link.Href == "" {
return t, fmt.Errorf("invalid link %v", link)
}
u := *c.url
u.Path = link.Href

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return t, err
}

_, err = c.Do(req, &t, http.StatusOK)
return t, err
}

func (c *Client) DeleteTopic(link Link) error {
if link.Href == "" {
return fmt.Errorf("invalid link %v", link)
Expand Down
15 changes: 12 additions & 3 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,10 @@ func doShowTopic(args []string) error {
os.Exit(2)
}

topic, err := cli.Topic(cli.TopicLink(args[0]))
if err != nil {
return err
}
te, err := cli.ListTopicEvents(cli.TopicEventsLink(args[0]), nil)
if err != nil {
return err
Expand All @@ -1503,7 +1507,12 @@ func doShowTopic(args []string) error {
}

sort.Sort(topicEvents(te.Events))

outFmt := fmt.Sprintf("%%-%ds%%-9s%%-%ds%%-23s\n", maxEvent+1, maxMessage+1)
fmt.Println("ID:", topic.ID)
fmt.Println("Level:", topic.Level)
fmt.Println("Collected:", topic.Collected)
fmt.Println("Events:")
fmt.Printf(outFmt, "Event", "Level", "Message", "Date")
for _, e := range te.Events {
fmt.Printf(outFmt, e.ID, e.State.Level, e.State.Message, e.State.Time.Local().Format(time.RFC822))
Expand Down Expand Up @@ -1783,10 +1792,10 @@ func doList(args []string) error {
}
}
}
outFmt := fmt.Sprintf("%%-%dv%%-%dv\n", maxID+1, maxLevel+1)
fmt.Fprintf(os.Stdout, outFmt, "ID", "Level")
outFmt := fmt.Sprintf("%%-%dv%%-%dv%%10v\n", maxID+1, maxLevel+1)
fmt.Fprintf(os.Stdout, outFmt, "ID", "Level", "Collected")
for _, t := range allTopics {
fmt.Fprintf(os.Stdout, outFmt, t.ID, t.Level)
fmt.Fprintf(os.Stdout, outFmt, t.ID, t.Level, t.Collected)
}
default:
return fmt.Errorf("cannot list '%s' did you mean 'tasks', 'recordings', 'replays', 'topics', 'handlers' or 'service-tests'?", kind)
Expand Down
3 changes: 1 addition & 2 deletions services/alert/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@ func NewStateChangesOnlyHandler(c StateChangesOnlyHandlerConfig, l *log.Logger)
}

func (h *stateChangesOnlyHandler) Handle(event alert.Event) {
state, ok := h.topics.EventState(event.Topic, event.State.ID)
if !ok || (ok && state.Level != event.State.Level) {
if event.State.Level != event.PreviousState().Level {
h.next.Handle(event)
}
}
Expand Down
11 changes: 6 additions & 5 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,19 @@ func (s *Service) topicHandlerLink(topic, handler string) client.Link {
return client.Link{Relation: client.Self, Href: path.Join(topicsBasePath, topic, topicHandlersPath, handler)}
}

func (s *Service) createClientTopic(topic string, level alert.Level) client.Topic {
func (s *Service) createClientTopic(topic string, status alert.TopicStatus) client.Topic {
return client.Topic{
ID: topic,
Link: s.topicLink(topic),
Level: level.String(),
Level: status.Level.String(),
Collected: status.Collected,
EventsLink: s.topicEventsLink(topic, eventsRelation),
HandlersLink: s.topicHandlersLink(topic, handlersRelation),
}
}

func (s *Service) handleTopic(t *alert.Topic, w http.ResponseWriter, r *http.Request) {
topic := s.createClientTopic(t.ID(), t.MaxLevel())
topic := s.createClientTopic(t.ID(), t.Status())

w.WriteHeader(http.StatusOK)
w.Write(httpd.MarshalJSON(topic, true))
Expand Down Expand Up @@ -878,8 +879,8 @@ func (s sortedTopics) Swap(i int, j int) { s[i], s[j] = s[j], s[i] }
func (s *Service) TopicStatus(pattern string, minLevel alert.Level) []client.Topic {
statuses := s.topics.TopicStatus(pattern, minLevel)
topics := make([]client.Topic, 0, len(statuses))
for topic, level := range statuses {
topics = append(topics, s.createClientTopic(topic, level))
for topic, status := range statuses {
topics = append(topics, s.createClientTopic(topic, status))
}
sort.Sort(sortedTopics(topics))
return topics
Expand Down

0 comments on commit ff8688d

Please sign in to comment.