Skip to content

Commit

Permalink
implement changes from feedback
Browse files Browse the repository at this point in the history
- changing stopC to interface{} channel
- removing extraneous idle>0 check from emitBarrier
  • Loading branch information
sputnik13 committed Nov 28, 2017
1 parent 8fef276 commit 165d255
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 8 deletions.
10 changes: 4 additions & 6 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type idleBarrier struct {
timer *time.Timer
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan bool
stopC chan interface{}
}

func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs []edge.StatsEdge) *idleBarrier {
Expand All @@ -101,7 +101,7 @@ func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs
timer: time.NewTimer(idle),
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan bool, 1),
stopC: make(chan interface{}, 1),
}

r.Init()
Expand All @@ -117,7 +117,7 @@ func (n *idleBarrier) Init() {
}

func (n *idleBarrier) Stop() {
n.stopC <- true
close(n.stopC)
n.timer.Stop()
n.wg.Wait()
}
Expand Down Expand Up @@ -158,9 +158,7 @@ func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) {
}

func (n *idleBarrier) resetTimer() {
if n.idle > 0 {
n.timer.Reset(n.idle)
}
n.timer.Reset(n.idle)
}

func (n *idleBarrier) emitBarrier() error {
Expand Down
4 changes: 2 additions & 2 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,7 +1513,7 @@ stream
}
}()

// TODO: may have to update if groupedconsumer adds default group or some support for when there are no messages
// groupedconsumers do not run any logic until at least one message is seen
data := make([]edge.PointMessage, len(testValues))
for i, testValue := range testValues {
data[i] = edge.NewPointMessage(
Expand Down Expand Up @@ -1924,7 +1924,7 @@ stream
}
}()

// TODO: may have to update if groupedconsumer adds default group or some support for when there are no messages
// groupedconsumers do not run any logic until at least one message is seen
data := make([]edge.PointMessage, len(testValues))
for i, testValue := range testValues {
data[i] = edge.NewPointMessage(
Expand Down

0 comments on commit 165d255

Please sign in to comment.