Skip to content

Commit

Permalink
fix(edge): delete group stats whena group is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 9, 2019
1 parent 5d7471c commit 7723b21
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- [#2167](https://github.com/influxdata/kapacitor/pull/2167): Use default transport consistently.
- [#2144](https://github.com/influxdata/kapacitor/issues/2144): Fix deadlock in barrier node when delete is used.
- [#2186](https://github.com/influxdata/kapacitor/pull/2186): Make RPM create files with correct ownership on install.
- [#2189](https://github.com/influxdata/kapacitor/pull/2189): Delete group stats when a group is deleted

## v1.5.2 [2018-12-12]

Expand Down
21 changes: 18 additions & 3 deletions edge/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func (e *statsEdge) incEmitted(group models.GroupID, infoF func() GroupInfo, cou
e.mu.Unlock()
}

// deleteGroup removes a group's stats
func (e *statsEdge) deleteGroup(group models.GroupID) {
// Manually unlock below as defer was too much of a performance hit
e.mu.Lock()
delete(e.groupStats, group)
e.mu.Unlock()
}

type batchStatsEdge struct {
statsEdge

Expand Down Expand Up @@ -159,6 +167,8 @@ func (e *batchStatsEdge) Collect(m Message) error {
e.collected.Add(1)
begin := b.Begin()
e.incCollected(begin.GroupID(), begin.GroupInfo, int64(len(b.Points())))
case DeleteGroupMessage:
e.deleteGroup(b.GroupID())
default:
// Do not count other messages
// TODO(nathanielc): How should we count other messages?
Expand Down Expand Up @@ -207,10 +217,15 @@ func (e *streamStatsEdge) Collect(m Message) error {
if err := e.edge.Collect(m); err != nil {
return err
}
if m.Type() == Point {
switch m := m.(type) {
case PointMessage:
e.collected.Add(1)
p := m.(GroupInfoer)
e.incCollected(p.GroupID(), p.GroupInfo, 1)
e.incCollected(m.GroupID(), m.GroupInfo, 1)
case DeleteGroupMessage:
e.deleteGroup(m.GroupID())
default:
// Do not count other messages
// TODO(nathanielc): How should we count other messages?
}
return nil
}
Expand Down

0 comments on commit 7723b21

Please sign in to comment.