Skip to content

Commit

Permalink
Merge pull request influxdata#2192 from influxdata/fix/delete-group-s…
Browse files Browse the repository at this point in the history
…tats-emit

fix(edge): delete group stats on emit not collect
  • Loading branch information
nathanielc authored Apr 15, 2019
2 parents 919c3f3 + 377dde3 commit 9ba7bc6
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions edge/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,8 @@ func (e *batchStatsEdge) Collect(m Message) error {
e.collected.Add(1)
begin := b.Begin()
e.incCollected(begin.GroupID(), begin.GroupInfo, int64(len(b.Points())))
case DeleteGroupMessage:
e.deleteGroup(b.GroupID())
default:
// Do not count other messages
// TODO(nathanielc): How should we count other messages?
}
return nil
}
Expand All @@ -197,9 +194,10 @@ func (e *batchStatsEdge) Emit() (m Message, ok bool) {
e.emitted.Add(1)
begin := b.Begin()
e.incEmitted(begin.GroupID(), begin.GroupInfo, int64(len(b.Points())))
case DeleteGroupMessage:
e.deleteGroup(b.GroupID())
default:
// Do not count other messages
// TODO(nathanielc): How should we count other messages?
}
}
return
Expand All @@ -221,21 +219,22 @@ func (e *streamStatsEdge) Collect(m Message) error {
case PointMessage:
e.collected.Add(1)
e.incCollected(m.GroupID(), m.GroupInfo, 1)
case DeleteGroupMessage:
e.deleteGroup(m.GroupID())
default:
// Do not count other messages
// TODO(nathanielc): How should we count other messages?
}
return nil
}

func (e *streamStatsEdge) Emit() (m Message, ok bool) {
m, ok = e.edge.Emit()
if ok && m.Type() == Point {
switch m := m.(type) {
case PointMessage:
e.emitted.Add(1)
p := m.(GroupInfoer)
e.incEmitted(p.GroupID(), p.GroupInfo, 1)
e.incEmitted(m.GroupID(), m.GroupInfo, 1)
case DeleteGroupMessage:
e.deleteGroup(m.GroupID())
default:
// Do not count other messages
}
return
}
Expand Down

0 comments on commit 9ba7bc6

Please sign in to comment.