Skip to content

Commit

Permalink
add sync.Pool, reduce gc cost
Browse files Browse the repository at this point in the history
  • Loading branch information
rfyiamcool committed Sep 29, 2019
1 parent f65716f commit db1c5e4
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 36 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ tw.Start()
tw.Stop()
```

safe ticker

```
tw, _ := NewTimeWheel(1 * time.Second, 360, TickSafeMode())
```

use sync.Pool

```
tw, _ := NewTimeWheel(1 * time.Second, 360, SetSyncPool(true))
```

add delay task

```
Expand Down
35 changes: 19 additions & 16 deletions example/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ func main() {
count := 500000
queue := make(chan bool, count)

start := time.Now()
for index := 0; index < count; index++ {
tw.Add(time.Duration(1*time.Second), func() {
queue <- true
})
}
fmt.Println("add timer cost: ", time.Since(start))

start = time.Now()
incr := 0
for {
if incr == count {
fmt.Println("recv sig cost: ", time.Since(start))
return
// loop 3
for index := 0; index < 3; index++ {
start := time.Now()
for index := 0; index < count; index++ {
tw.Add(time.Duration(1*time.Second), func() {
queue <- true
})
}
fmt.Println("add timer cost: ", time.Since(start))

<-queue
incr++
start = time.Now()
incr := 0
for {
if incr == count {
fmt.Println("recv sig cost: ", time.Since(start))
break
}

<-queue
incr++
}
}
}
34 changes: 34 additions & 0 deletions task_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package timewheel

import (
"sync"
)

var incr = 0

var (
defaultTaskPool = newTaskPool()
)

type taskPool struct {
bp *sync.Pool
}

func newTaskPool() *taskPool {
return &taskPool{
bp: &sync.Pool{
New: func() interface{} {
return &Task{}
},
},
}
}

func (pool *taskPool) get() *Task {
return pool.bp.Get().(*Task)
}

func (pool *taskPool) put(obj *Task) {
obj.Reset()
pool.bp.Put(obj)
}
59 changes: 42 additions & 17 deletions timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ type Task struct {
// circleNum int
}

// for sync.Pool
func (t *Task) Reset() {
t.round = 0
t.callback = nil

t.async = false
t.stop = false
t.circle = false
}

type optionCall func(*TimeWheel) error

func TickSafeMode() optionCall {
Expand All @@ -43,6 +53,13 @@ func TickSafeMode() optionCall {
}
}

func SetSyncPool(state bool) optionCall {
return func(o *TimeWheel) error {
o.syncPool = state
return nil
}
}

type TimeWheel struct {
randomID int64

Expand All @@ -59,10 +76,11 @@ type TimeWheel struct {
onceStart sync.Once

addC chan *Task
removeC chan taskID
removeC chan *Task
stopC chan struct{}

exited bool
exited bool
syncPool bool
}

// NewTimeWheel create new time wheel
Expand All @@ -87,7 +105,7 @@ func NewTimeWheel(tick time.Duration, bucketsNum int, options ...optionCall) (*T

// signal
addC: make(chan *Task, 1024*5),
removeC: make(chan taskID, 1024*2),
removeC: make(chan *Task, 1024*2),
stopC: make(chan struct{}),
}

Expand Down Expand Up @@ -159,8 +177,12 @@ func (tw *TimeWheel) Stop() {
}

func (tw *TimeWheel) collectTask(task *Task) {
delete(tw.buckets[tw.currentIndex], task.id)
delete(tw.bucketIndexes, task.id)
delete(tw.buckets[tw.currentIndex], task.id)

if tw.syncPool {
defaultTaskPool.put(task)
}
}

func (tw *TimeWheel) handleTick() {
Expand Down Expand Up @@ -217,13 +239,20 @@ func (tw *TimeWheel) addAny(delay time.Duration, callback func(), circle, async
}

id := tw.genUniqueID()
task := &Task{
delay: delay,
id: id,
callback: callback,
circle: circle,
async: async, // refer to src/runtime/time.go

var task *Task
if tw.syncPool {
task = defaultTaskPool.get()
} else {
task = new(Task)
}

task.delay = delay
task.id = id
task.callback = callback
task.circle = circle
task.async = async // refer to src/runtime/time.go

tw.addC <- task
return task
}
Expand Down Expand Up @@ -265,16 +294,12 @@ func (tw *TimeWheel) calculateIndex(delay time.Duration) (index int) {
}

func (tw *TimeWheel) Remove(task *Task) error {
tw.removeC <- task.id
tw.removeC <- task
return nil
}

func (tw *TimeWheel) remove(id taskID) {
if index, ok := tw.bucketIndexes[id]; ok {
delete(tw.bucketIndexes, id)
delete(tw.buckets[index], id)
}
return
func (tw *TimeWheel) remove(task *Task) {
tw.collectTask(task)
}

func (tw *TimeWheel) NewTimer(delay time.Duration) *Timer {
Expand Down
10 changes: 7 additions & 3 deletions timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCalcPos(t *testing.T) {
}

func TestAddFunc(t *testing.T) {
tw, _ := NewTimeWheel(100*time.Millisecond, 5, TickSafeMode())
tw, _ := NewTimeWheel(100*time.Millisecond, 5, TickSafeMode(), SetSyncPool(true))
tw.Start()
defer tw.Stop()

Expand Down Expand Up @@ -269,10 +269,14 @@ func TestRemove(t *testing.T) {
defer tw.Stop()

queue := make(chan bool, 0)
task := tw.Add(time.Millisecond*200, func() {
task := tw.Add(time.Millisecond*500, func() {
queue <- true
})
tw.Remove(task)

// remove action after add action
time.AfterFunc(time.Millisecond*10, func() {
tw.Remove(task)
})

exitTimer := time.NewTimer(1 * time.Second)
select {
Expand Down

0 comments on commit db1c5e4

Please sign in to comment.