Skip to content

Commit

Permalink
feat: merge pr
Browse files Browse the repository at this point in the history
  • Loading branch information
userpro committed Oct 27, 2022
1 parent a9aaf85 commit 555c370
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
18 changes: 17 additions & 1 deletion timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (tw *TimeWheel) Start() {
tw.onceStart.Do(
func() {
tw.ticker = time.NewTicker(tw.tick)
tw.exited = false
go tw.schduler()
go tw.tickGenerator()
},
Expand Down Expand Up @@ -237,7 +238,7 @@ func (tw *TimeWheel) AddCron(delay time.Duration, callback func()) *Task {
}

func (tw *TimeWheel) addAny(delay time.Duration, callback func(), circle, async bool) *Task {
if delay <= 0 {
if delay < tw.tick {
delay = tw.tick
}

Expand Down Expand Up @@ -465,6 +466,21 @@ type Ticker struct {
Ctx context.Context
}

func (t *Ticker) Reset(delay time.Duration) {
// first stop old task
t.task.stop = true

// make new task
t.task = t.tw.addAny(
delay,
func() {
notfiyChannel(t.C)
},
modeIsCircle,
modeNotAsync,
)
}

func (t *Ticker) Stop() {
t.task.stop = true
t.cancel()
Expand Down
65 changes: 65 additions & 0 deletions timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,32 @@ func TestTickerSecond(t *testing.T) {
assert.Greater(t, incr, 100)
}

func TestTickerSecondLess(t *testing.T) {
tw, err := NewTimeWheel(10*time.Millisecond, 10000)
assert.Nil(t, err)

tw.Start()
defer tw.Stop()

var (
timeout = time.After(1100 * time.Millisecond)
ticker = tw.NewTicker(9 * time.Millisecond)
incr int
)

for run := true; run; {
select {
case <-timeout:
run = false

case <-ticker.C:
incr++
}
}

assert.Greater(t, incr, 100)
}

func TestBatchTicker(t *testing.T) {
tw, err := NewTimeWheel(100*time.Millisecond, 60)
assert.Nil(t, err)
Expand Down Expand Up @@ -337,6 +363,45 @@ func TestTimerReset(t *testing.T) {
}
}

func TestTickerReset(t *testing.T) {
tw, _ := NewTimeWheel(100*time.Millisecond, 50)
tw.Start()
defer tw.Stop()

ticker := tw.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for index := 1; index < 6; index++ {
now := time.Now()
<-ticker.C

checkTimeCost(t, now, time.Now(), 80, 220)
fmt.Println(time.Since(now).String())
}

fmt.Println()
ticker.Reset(300 * time.Millisecond)

for index := 1; index < 6; index++ {
now := time.Now()
<-ticker.C

checkTimeCost(t, now, time.Now(), 280, 420)
fmt.Println(time.Since(now).String())
}

fmt.Println()
ticker.Reset(200 * time.Millisecond)

for index := 1; index < 6; index++ {
now := time.Now()
<-ticker.C

checkTimeCost(t, now, time.Now(), 180, 320)
fmt.Println(time.Since(now).String())
}
}

func TestRemove(t *testing.T) {
tw, _ := NewTimeWheel(100*time.Millisecond, 5)
tw.Start()
Expand Down
2 changes: 1 addition & 1 deletion timewheel_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewTimeWheelPool(size int, tick time.Duration, bucketsNum int, options ...o
size: int64(size),
}

for index := 0; index < bucketsNum; index++ {
for index := 0; index < size; index++ {
tw, err := NewTimeWheel(tick, bucketsNum, options...)
if err != nil {
return twp, err
Expand Down

0 comments on commit 555c370

Please sign in to comment.