From 165d255f02558121cfbe6b3c27897613e28d79e1 Mon Sep 17 00:00:00 2001 From: Min Pae Date: Mon, 30 Oct 2017 15:55:34 -0700 Subject: [PATCH] implement changes from feedback - changing stopC to interface{} channel - removing extraneous idle>0 check from emitBarrier --- barrier.go | 10 ++++------ integrations/streamer_test.go | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/barrier.go b/barrier.go index a58b24b77..33f9327fe 100644 --- a/barrier.go +++ b/barrier.go @@ -89,7 +89,7 @@ type idleBarrier struct { timer *time.Timer wg sync.WaitGroup outs []edge.StatsEdge - stopC chan bool + stopC chan interface{} } func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs []edge.StatsEdge) *idleBarrier { @@ -101,7 +101,7 @@ func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs timer: time.NewTimer(idle), wg: sync.WaitGroup{}, outs: outs, - stopC: make(chan bool, 1), + stopC: make(chan interface{}, 1), } r.Init() @@ -117,7 +117,7 @@ func (n *idleBarrier) Init() { } func (n *idleBarrier) Stop() { - n.stopC <- true + close(n.stopC) n.timer.Stop() n.wg.Wait() } @@ -158,9 +158,7 @@ func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) { } func (n *idleBarrier) resetTimer() { - if n.idle > 0 { - n.timer.Reset(n.idle) - } + n.timer.Reset(n.idle) } func (n *idleBarrier) emitBarrier() error { diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 938ea906d..fac2d22bd 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -1513,7 +1513,7 @@ stream } }() - // TODO: may have to update if groupedconsumer adds default group or some support for when there are no messages + // groupedconsumers do not run any logic until at least one message is seen data := make([]edge.PointMessage, len(testValues)) for i, testValue := range testValues { data[i] = edge.NewPointMessage( @@ -1924,7 +1924,7 @@ stream } }() - // TODO: may have to update if groupedconsumer adds default group or some support for when there are no messages + // groupedconsumers do not run any logic until at least one message is seen data := make([]edge.PointMessage, len(testValues)) for i, testValue := range testValues { data[i] = edge.NewPointMessage(