英文 | 🇨🇳中文
ants
是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
- 优雅处理 panic,防止程序崩溃
- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
- 非阻塞机制
- 1.8.x
- 1.9.x
- 1.10.x
- 1.11.x
- 1.12.x
- 1.13.x
go get -u github.com/panjf2000/ants
go get -u github.com/panjf2000/ants/v2
写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 ants
,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
package main
import (
"io/ioutil"
"net/http"
"github.com/panjf2000/ants/v2"
)
type Request struct {
Param []byte
Result chan []byte
}
func main() {
pool, _ := ants.NewPoolWithFunc(100000, func(payload interface{}) {
request, ok := payload.(*Request)
if !ok {
return
}
reverseParam := func(s []byte) []byte {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}(request.Param)
request.Result <- reverseParam
})
defer pool.Release()
http.HandleFunc("/reverse", func(w http.ResponseWriter, r *http.Request) {
param, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "request error", http.StatusInternalServerError)
}
defer r.Body.Close()
request := &Request{Param: param, Result: make(chan []byte)}
// Throttle the requests traffic with ants pool. This process is asynchronous and
// you can receive a result from the channel defined outside.
if err := pool.Invoke(request); err != nil {
http.Error(w, "throttle limit error", http.StatusInternalServerError)
}
w.Write(<-request.Result)
})
http.ListenAndServe(":8080", nil)
}
// Option represents the optional function.
type Option func(opts *Options)
// Options contains all options which will be applied when instantiating a ants pool.
type Options struct {
// ExpiryDuration sets the expired time (second) of every worker.
ExpiryDuration time.Duration
// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
}
// WithOptions accepts the whole options config.
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
// WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
// WithPreAlloc indicates whether it should malloc for workers.
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
// WithNonblocking indicates that pool will return nil when there is no available workers.
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
// WithPanicHandler sets up panic handler.
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
通过在调用NewPool
/NewPoolWithFunc
之时使用各种 optional function,可以设置ants.Options
中各个配置项的值,然后用它来定制化 goroutine pool.
ants
支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 NewPool
方法可以实例化一个新的带有指定容量的 Pool ,如下:
// Set 10000 the size of goroutine pool
p, _ := ants.NewPool(10000)
提交任务通过调用 ants.Submit(func())
方法:
ants.Submit(func(){})
需要动态调整 goroutine 池容量可以通过调用Tune(int)
:
pool.Tune(1000) // Tune its capacity to 1000
pool.Tune(100000) // Tune its capacity to 100000
该方法是线程安全的。
ants
允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。
// ants will pre-malloc the whole capacity of pool when you invoke this function
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
pool.Release()
// 只要调用 Reboot() 方法,就可以重新激活一个之前已经被销毁掉的池,并且投入使用。
pool.Reboot()
ants
并不保证提交的任务被执行的顺序,执行的顺序也不是和提交的顺序保持一致,因为在 ants
是并发地处理所有提交的任务,提交的任务会被分派到正在并发运行的 workers 上去,因此那些任务将会被并发且无序地被执行。
-
BenchmarkGoroutine-4 代表原生 goroutine
-
BenchmarkPoolGroutine-4 代表使用 goroutine 池
ants
这里为了模拟大规模 goroutine 的场景,两次测试的并发次数分别是 100w 和 1000w,前两个测试分别是执行 100w 个并发任务不使用 Pool 和使用了ants
的 Goroutine Pool 的性能,后两个则是 1000w 个任务下的表现,可以直观的看出在执行速度和内存使用上,ants
的 Pool 都占有明显的优势。100w 的任务量,使用ants
,执行速度与原生 goroutine 相当甚至略快,但只实际使用了不到 5w 个 goroutine 完成了全部任务,且内存消耗仅为原生并发的 40%;而当任务量达到 1000w,优势则更加明显了:用了 70w 左右的 goroutine 完成全部任务,执行速度比原生 goroutine 提高了 100%,且内存消耗依旧保持在不使用 Pool 的 40% 左右。
因为PoolWithFunc
这个 Pool 只绑定一个任务函数,也即所有任务都是运行同一个函数,所以相较于Pool
对原生 goroutine 在执行速度和内存消耗的优势更大,上面的结果可以看出,执行速度可以达到原生 goroutine 的 300%,而内存消耗的优势已经达到了两位数的差距,原生 goroutine 的内存消耗达到了ants
的35倍且原生 goroutine 的每次执行的内存分配次数也达到了ants
45倍,1000w 的任务量,ants
的初始分配容量是 5w,因此它完成了所有的任务依旧只使用了 5w 个 goroutine!事实上,ants
的 Goroutine Pool 的容量是可以自定义的,也就是说使用者可以根据不同场景对这个参数进行调优直至达到最高性能。
从该 demo 测试吞吐性能对比可以看出,使用ants
的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。
请在提 PR 之前仔细阅读 Contributing Guidelines,感谢那些为 ants
贡献过代码的开发者!
ants
的源码允许用户在遵循 MIT 开源证书 规则的前提下使用。
- Goroutine 并发调度模型深度解析之手撸一个高性能 goroutine 池
- Visually Understanding Worker Pool
- The Case For A Go Worker Pool
- Go Concurrency - GoRoutines, Worker Pools and Throttling Made Simple
欢迎在这里添加你的案例~~
ants
项目一直以来都是在 JetBrains 公司旗下的 GoLand 集成开发环境中进行开发,基于 free JetBrains Open Source license(s) 正版免费授权,在此表达我的谢意。
如果有意向,可以通过每个月定量的少许捐赠来支持这个项目。
每月定量捐赠 10 刀即可成为本项目的赞助者,届时您的 logo 或者 link 可以展示在本项目的 README 上。