Skip to content

Commit

Permalink
using channel to signal timer reset
Browse files Browse the repository at this point in the history
  • Loading branch information
sputnik13 committed Nov 28, 2017
1 parent 04aafb7 commit b511631
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,26 @@ type idleBarrier struct {
name string
group edge.GroupInfo

idle time.Duration
lastT atomic.Value
timer *time.Timer
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
idle time.Duration
lastT atomic.Value
timer *time.Timer
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan struct{}
resetTimerC chan struct{}
}

func newIdleBarrier(name string, group edge.GroupInfo, idle time.Duration, outs []edge.StatsEdge) *idleBarrier {
r := &idleBarrier{
name: name,
group: group,
idle: idle,
lastT: atomic.Value{},
timer: time.NewTimer(idle),
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}, 1),
name: name,
group: group,
idle: idle,
lastT: atomic.Value{},
timer: time.NewTimer(idle),
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
resetTimerC: make(chan struct{}),
}

r.Init()
Expand Down Expand Up @@ -158,8 +160,7 @@ func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) {
}

func (n *idleBarrier) resetTimer() {
n.timer.Stop()
n.timer.Reset(n.idle)
n.resetTimerC <- struct{}{}
}

func (n *idleBarrier) emitBarrier() error {
Expand All @@ -172,9 +173,14 @@ func (n *idleBarrier) idleHandler() {
defer n.wg.Done()
for {
select {
case <-n.resetTimerC:
if !n.timer.Stop() {
<-n.timer.C
}
n.timer.Reset(n.idle)
case <-n.timer.C:
n.emitBarrier()
n.resetTimer()
n.timer.Reset(n.idle)
case <-n.stopC:
return
}
Expand All @@ -189,7 +195,7 @@ type periodicBarrier struct {
ticker *time.Ticker
wg sync.WaitGroup
outs []edge.StatsEdge
stopC chan bool
stopC chan struct{}
}

func newPeriodicBarrier(name string, group edge.GroupInfo, period time.Duration, outs []edge.StatsEdge) *periodicBarrier {
Expand All @@ -200,7 +206,7 @@ func newPeriodicBarrier(name string, group edge.GroupInfo, period time.Duration,
ticker: time.NewTicker(period),
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan bool, 1),
stopC: make(chan struct{}),
}

r.Init()
Expand All @@ -216,7 +222,7 @@ func (n *periodicBarrier) Init() {
}

func (n *periodicBarrier) Stop() {
n.stopC <- true
close(n.stopC)
n.ticker.Stop()
n.wg.Wait()
}
Expand Down

0 comments on commit b511631

Please sign in to comment.