diff --git a/go.mod b/go.mod
index 273092c1..e22d62c4 100644
--- a/go.mod
+++ b/go.mod
@@ -31,7 +31,7 @@ require (
github.com/nlopes/slack v0.6.0
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
- github.com/panjf2000/ants/v2 v2.10.0
+ github.com/panjf2000/ants/v2 v2.11.0
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/rabbitmq/amqp091-go v1.10.0
diff --git a/go.sum b/go.sum
index 2061ee03..c474ff3a 100644
--- a/go.sum
+++ b/go.sum
@@ -266,8 +266,8 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk=
github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
-github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8=
-github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
+github.com/panjf2000/ants/v2 v2.11.0 h1:sHrqEwTBQTQ2w6PMvbMfvBtVUuhsaYPzUmAYDLYmJPg=
+github.com/panjf2000/ants/v2 v2.11.0/go.mod h1:V9HhTupTWxcaRmIglJvGwvzqXUTnIZW9uO6q4hAfApw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -310,14 +310,12 @@ github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cma
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf/go.mod h1:RJID2RhlZKId02nZ62WenDCkgHFerpIOmW0iT7GKmXM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
-github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
-github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
@@ -393,7 +391,6 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/vendor/github.com/panjf2000/ants/v2/README.md b/vendor/github.com/panjf2000/ants/v2/README.md
index bdf7487b..fc86e129 100644
--- a/vendor/github.com/panjf2000/ants/v2/README.md
+++ b/vendor/github.com/panjf2000/ants/v2/README.md
@@ -7,6 +7,7 @@
+
@@ -22,10 +23,11 @@ Library `ants` implements a goroutine pool with fixed capacity, managing and rec
- Managing and recycling a massive number of goroutines automatically
- Purging overdue goroutines periodically
-- Abundant APIs: submitting tasks, getting the number of running goroutines, tuning the capacity of the pool dynamically, releasing the pool, rebooting the pool
+- Abundant APIs: submitting tasks, getting the number of running goroutines, tuning the capacity of the pool dynamically, releasing the pool, rebooting the pool, etc.
- Handle panic gracefully to prevent programs from crash
-- Efficient in memory usage and it even achieves [higher performance](#-performance-summary) than unlimited goroutines in Golang
+- Efficient in memory usage and it may even achieve ***higher performance*** than unlimited goroutines in Go
- Nonblocking mechanism
+- Preallocated memory (ring buffer, optional)
## 💡 How `ants` works
@@ -60,205 +62,30 @@ go get -u github.com/panjf2000/ants/v2
```
## 🛠 How to use
-Just imagine that your program starts a massive number of goroutines, resulting in a huge consumption of memory. To mitigate that kind of situation, all you need to do is to import `ants` package and submit all your tasks to a default pool with fixed capacity, activated when package `ants` is imported:
+Check out [the examples](https://pkg.go.dev/github.com/panjf2000/ants/v2#pkg-examples) for basic usage.
-``` go
-package main
+### Functional options for pool
-import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
+`ants.Options`contains all optional configurations of the ants pool, which allows you to customize the goroutine pool by invoking option functions to set up each configuration in `NewPool`/`NewPoolWithFunc`/`NewPoolWithFuncGeneric` method.
- "github.com/panjf2000/ants/v2"
-)
+Check out [ants.Options](https://pkg.go.dev/github.com/panjf2000/ants/v2#Options) and [ants.Option](https://pkg.go.dev/github.com/panjf2000/ants/v2#Option) for more details.
-var sum int32
+### Customize pool capacity
-func myFunc(i interface{}) {
- n := i.(int32)
- atomic.AddInt32(&sum, n)
- fmt.Printf("run with %d\n", n)
-}
-
-func demoFunc() {
- time.Sleep(10 * time.Millisecond)
- fmt.Println("Hello World!")
-}
-
-func main() {
- defer ants.Release()
-
- runTimes := 1000
-
- // Use the common pool.
- var wg sync.WaitGroup
- syncCalculateSum := func() {
- demoFunc()
- wg.Done()
- }
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = ants.Submit(syncCalculateSum)
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", ants.Running())
- fmt.Printf("finish all tasks.\n")
-
- // Use the pool with a function,
- // set 10 to the capacity of goroutine pool and 1 second for expired duration.
- p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
- myFunc(i)
- wg.Done()
- })
- defer p.Release()
- // Submit tasks one by one.
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = p.Invoke(int32(i))
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", p.Running())
- fmt.Printf("finish all tasks, result is %d\n", sum)
- if sum != 499500 {
- panic("the final result is wrong!!!")
- }
-
- // Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
- // If you use -1 as the pool size parameter, the size will be unlimited.
- // There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
- mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
- defer mp.ReleaseTimeout(5 * time.Second)
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = mp.Submit(syncCalculateSum)
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", mp.Running())
- fmt.Printf("finish all tasks.\n")
-
- // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
- mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
- myFunc(i)
- wg.Done()
- }, ants.LeastTasks)
- defer mpf.ReleaseTimeout(5 * time.Second)
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = mpf.Invoke(int32(i))
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", mpf.Running())
- fmt.Printf("finish all tasks, result is %d\n", sum)
- if sum != 499500*2 {
- panic("the final result is wrong!!!")
- }
-}
-```
-
-### Functional options for ants pool
-
-```go
-// Option represents the optional function.
-type Option func(opts *Options)
-
-// Options contains all options which will be applied when instantiating a ants pool.
-type Options struct {
- // ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
- // the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
- // used for more than `ExpiryDuration`.
- ExpiryDuration time.Duration
-
- // PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
- PreAlloc bool
-
- // Max number of goroutine blocking on pool.Submit.
- // 0 (default value) means no such limit.
- MaxBlockingTasks int
-
- // When Nonblocking is true, Pool.Submit will never be blocked.
- // ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
- // When Nonblocking is true, MaxBlockingTasks is inoperative.
- Nonblocking bool
-
- // PanicHandler is used to handle panics from each worker goroutine.
- // if nil, panics will be thrown out again from worker goroutines.
- PanicHandler func(interface{})
-
- // Logger is the customized logger for logging info, if it is not set,
- // default standard logger from log package is used.
- Logger Logger
-}
-
-// WithOptions accepts the whole options config.
-func WithOptions(options Options) Option {
- return func(opts *Options) {
- *opts = options
- }
-}
-
-// WithExpiryDuration sets up the interval time of cleaning up goroutines.
-func WithExpiryDuration(expiryDuration time.Duration) Option {
- return func(opts *Options) {
- opts.ExpiryDuration = expiryDuration
- }
-}
-
-// WithPreAlloc indicates whether it should malloc for workers.
-func WithPreAlloc(preAlloc bool) Option {
- return func(opts *Options) {
- opts.PreAlloc = preAlloc
- }
-}
-
-// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
-func WithMaxBlockingTasks(maxBlockingTasks int) Option {
- return func(opts *Options) {
- opts.MaxBlockingTasks = maxBlockingTasks
- }
-}
-
-// WithNonblocking indicates that pool will return nil when there is no available workers.
-func WithNonblocking(nonblocking bool) Option {
- return func(opts *Options) {
- opts.Nonblocking = nonblocking
- }
-}
-
-// WithPanicHandler sets up panic handler.
-func WithPanicHandler(panicHandler func(interface{})) Option {
- return func(opts *Options) {
- opts.PanicHandler = panicHandler
- }
-}
-
-// WithLogger sets up a customized logger.
-func WithLogger(logger Logger) Option {
- return func(opts *Options) {
- opts.Logger = logger
- }
-}
-```
-
-`ants.Options`contains all optional configurations of the ants pool, which allows you to customize the goroutine pool by invoking option functions to set up each configuration in `NewPool`/`NewPoolWithFunc`method.
-
-### Customize limited pool
-
-`ants` also supports customizing the capacity of the pool. You can invoke the `NewPool` method to instantiate a pool with a given capacity, as follows:
+`ants` supports customizing the capacity of the pool. You can call the `NewPool` method to instantiate a `Pool` with a given capacity, as follows:
``` go
p, _ := ants.NewPool(10000)
```
### Submit tasks
-Tasks can be submitted by calling `ants.Submit(func())`
+Tasks can be submitted by calling `ants.Submit`
```go
ants.Submit(func(){})
```
-### Tune pool capacity in runtime
-You can tune the capacity of `ants` pool in runtime with `Tune(int)`:
+### Tune pool capacity at runtime
+You can tune the capacity of `ants` pool at runtime with `ants.Tune`:
``` go
pool.Tune(1000) // Tune its capacity to 1000
@@ -272,20 +99,26 @@ Don't worry about the contention problems in this case, the method here is threa
`ants` allows you to pre-allocate the memory of the goroutine queue in the pool, which may get a performance enhancement under some special certain circumstances such as the scenario that requires a pool with ultra-large capacity, meanwhile, each task in goroutine lasts for a long time, in this case, pre-mallocing will reduce a lot of memory allocation in goroutine queue.
```go
-// ants will pre-malloc the whole capacity of pool when you invoke this method
+// ants will pre-malloc the whole capacity of pool when calling ants.NewPool.
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
```
-### Release Pool
+### Release pool
```go
pool.Release()
```
-### Reboot Pool
+or
```go
-// A pool that has been released can be still used once you invoke the Reboot().
+pool.ReleaseTimeout(time.Second * 3)
+```
+
+### Reboot pool
+
+```go
+// A pool that has been released can be still used after calling the Reboot().
pool.Reboot()
```
@@ -314,20 +147,20 @@ The source code in `ants` is available under the [MIT License](/LICENSE).
## 🖥 Use cases
-### business companies
+### business corporations
-The following companies/organizations use `ants` in production.
+Trusted by the following corporations/organizations.
-
+
|
-
+
|
@@ -344,12 +177,12 @@ The following companies/organizations use `ants` in production.
||
-
+
|
-
+
|
@@ -367,11 +200,11 @@ The following companies/organizations use `ants` in production.
||
-
+
|
-
+
|
@@ -381,43 +214,56 @@ The following companies/organizations use `ants` in production.
-
+
|
|
|
-
+
|
-
+
|
-
-
+
+
|
-
+
|
-
-
+
+ |
+
+
+
+
+ |
+
+
+
+
|
+|
-
- |
-
-
- |
-
-
- |
-
-
- |
-
-
- |
-
-
- |
-
-
- |
-
diff --git a/vendor/github.com/panjf2000/ants/v2/README_ZH.md b/vendor/github.com/panjf2000/ants/v2/README_ZH.md
index fa041cba..e0c323c1 100644
--- a/vendor/github.com/panjf2000/ants/v2/README_ZH.md
+++ b/vendor/github.com/panjf2000/ants/v2/README_ZH.md
@@ -7,6 +7,7 @@
+
@@ -16,16 +17,17 @@
## 📖 简介
-`ants`是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
+`ants` 是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
## 🚀 功能:
- 自动调度海量的 goroutines,复用 goroutines
- 定期清理过期的 goroutines,进一步节省资源
-- 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
+- 提供了大量实用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool 等
- 优雅处理 panic,防止程序崩溃
-- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有[更高的性能](#-性能小结)
+- 资源复用,极大节省内存使用量;在大规模批量并发任务场景下甚至可能比 Go 语言的无限制 goroutine 并发具有***更高的性能***
- 非阻塞机制
+- 预分配内存 (环形队列,可选)
## 💡 `ants` 是如何运行的
@@ -60,192 +62,17 @@ go get -u github.com/panjf2000/ants/v2
```
## 🛠 使用
-写 go 并发程序的时候如果程序会启动大量的 goroutine ,势必会消耗大量的系统资源(内存,CPU),通过使用 `ants`,可以实例化一个 goroutine 池,复用 goroutine ,节省资源,提升性能:
-
-``` go
-package main
-
-import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/panjf2000/ants/v2"
-)
-
-var sum int32
-
-func myFunc(i interface{}) {
- n := i.(int32)
- atomic.AddInt32(&sum, n)
- fmt.Printf("run with %d\n", n)
-}
-
-func demoFunc() {
- time.Sleep(10 * time.Millisecond)
- fmt.Println("Hello World!")
-}
-
-func main() {
- defer ants.Release()
-
- runTimes := 1000
-
- // Use the common pool.
- var wg sync.WaitGroup
- syncCalculateSum := func() {
- demoFunc()
- wg.Done()
- }
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = ants.Submit(syncCalculateSum)
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", ants.Running())
- fmt.Printf("finish all tasks.\n")
-
- // Use the pool with a function,
- // set 10 to the capacity of goroutine pool and 1 second for expired duration.
- p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
- myFunc(i)
- wg.Done()
- })
- defer p.Release()
- // Submit tasks one by one.
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = p.Invoke(int32(i))
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", p.Running())
- fmt.Printf("finish all tasks, result is %d\n", sum)
- if sum != 499500 {
- panic("the final result is wrong!!!")
- }
-
- // Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
- // If you use -1 as the pool size parameter, the size will be unlimited.
- // There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
- mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
- defer mp.ReleaseTimeout(5 * time.Second)
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = mp.Submit(syncCalculateSum)
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", mp.Running())
- fmt.Printf("finish all tasks.\n")
-
- // Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
- mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
- myFunc(i)
- wg.Done()
- }, ants.LeastTasks)
- defer mpf.ReleaseTimeout(5 * time.Second)
- for i := 0; i < runTimes; i++ {
- wg.Add(1)
- _ = mpf.Invoke(int32(i))
- }
- wg.Wait()
- fmt.Printf("running goroutines: %d\n", mpf.Running())
- fmt.Printf("finish all tasks, result is %d\n", sum)
- if sum != 499500*2 {
- panic("the final result is wrong!!!")
- }
-}
-```
+基本的使用请查看[示例](https://pkg.go.dev/github.com/panjf2000/ants/v2#pkg-examples).
### Pool 配置
-```go
-// Option represents the optional function.
-type Option func(opts *Options)
+通过在调用 `NewPool`/`NewPoolWithFunc`/`NewPoolWithFuncGeneric` 之时使用各种 optional function,可以设置 `ants.Options` 中各个配置项的值,然后用它来定制化 goroutine pool。
-// Options contains all options which will be applied when instantiating a ants pool.
-type Options struct {
- // ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
- // the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
- // used for more than `ExpiryDuration`.
- ExpiryDuration time.Duration
-
- // PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
- PreAlloc bool
-
- // Max number of goroutine blocking on pool.Submit.
- // 0 (default value) means no such limit.
- MaxBlockingTasks int
-
- // When Nonblocking is true, Pool.Submit will never be blocked.
- // ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
- // When Nonblocking is true, MaxBlockingTasks is inoperative.
- Nonblocking bool
-
- // PanicHandler is used to handle panics from each worker goroutine.
- // if nil, panics will be thrown out again from worker goroutines.
- PanicHandler func(interface{})
-
- // Logger is the customized logger for logging info, if it is not set,
- // default standard logger from log package is used.
- Logger Logger
-}
-
-// WithOptions accepts the whole options config.
-func WithOptions(options Options) Option {
- return func(opts *Options) {
- *opts = options
- }
-}
-
-// WithExpiryDuration sets up the interval time of cleaning up goroutines.
-func WithExpiryDuration(expiryDuration time.Duration) Option {
- return func(opts *Options) {
- opts.ExpiryDuration = expiryDuration
- }
-}
-
-// WithPreAlloc indicates whether it should malloc for workers.
-func WithPreAlloc(preAlloc bool) Option {
- return func(opts *Options) {
- opts.PreAlloc = preAlloc
- }
-}
-
-// WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
-func WithMaxBlockingTasks(maxBlockingTasks int) Option {
- return func(opts *Options) {
- opts.MaxBlockingTasks = maxBlockingTasks
- }
-}
-
-// WithNonblocking indicates that pool will return nil when there is no available workers.
-func WithNonblocking(nonblocking bool) Option {
- return func(opts *Options) {
- opts.Nonblocking = nonblocking
- }
-}
-
-// WithPanicHandler sets up panic handler.
-func WithPanicHandler(panicHandler func(interface{})) Option {
- return func(opts *Options) {
- opts.PanicHandler = panicHandler
- }
-}
-
-// WithLogger sets up a customized logger.
-func WithLogger(logger Logger) Option {
- return func(opts *Options) {
- opts.Logger = logger
- }
-}
-```
-
-通过在调用`NewPool`/`NewPoolWithFunc`之时使用各种 optional function,可以设置`ants.Options`中各个配置项的值,然后用它来定制化 goroutine pool.
+更多细节请查看 [ants.Options](https://pkg.go.dev/github.com/panjf2000/ants/v2#Options) 和 [ants.Option](https://pkg.go.dev/github.com/panjf2000/ants/v2#Option)
-### 自定义池
-`ants`支持实例化使用者自己的一个 Pool ,指定具体的池容量;通过调用 `NewPool` 方法可以实例化一个新的带有指定容量的 Pool ,如下:
+### 自定义 pool 容量
+`ants` 支持实例化使用者自己的一个 Pool,指定具体的 pool 容量;通过调用 `NewPool` 方法可以实例化一个新的带有指定容量的 `Pool`,如下:
``` go
p, _ := ants.NewPool(10000)
@@ -253,13 +80,13 @@ p, _ := ants.NewPool(10000)
### 任务提交
-提交任务通过调用 `ants.Submit(func())`方法:
+提交任务通过调用 `ants.Submit` 方法:
```go
ants.Submit(func(){})
```
### 动态调整 goroutine 池容量
-需要动态调整 goroutine 池容量可以通过调用`Tune(int)`:
+需要动态调整 pool 容量可以通过调用 `ants.Tune`:
``` go
pool.Tune(1000) // Tune its capacity to 1000
@@ -270,10 +97,10 @@ pool.Tune(100000) // Tune its capacity to 100000
### 预先分配 goroutine 队列内存
-`ants`允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。
+`ants` 支持预先为 pool 分配容量的内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。
```go
-// ants will pre-malloc the whole capacity of pool when you invoke this function
+// 提前分配的 pool 容量的内存空间
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
```
@@ -283,6 +110,12 @@ p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
pool.Release()
```
+或者
+
+```go
+pool.ReleaseTimeout(time.Second * 3)
+```
+
### 重启 Pool
```go
@@ -323,7 +156,7 @@ pool.Reboot()
+
+
+
+
+
+
+
+|
-
- |
-
-
- |
-
-
- |
-
-
- |
-
-
- |
-
-
- |
-
-
- |
-
diff --git a/vendor/github.com/panjf2000/ants/v2/ants.go b/vendor/github.com/panjf2000/ants/v2/ants.go index 4b61ba2b..eae6a149 100644 --- a/vendor/github.com/panjf2000/ants/v2/ants.go +++ b/vendor/github.com/panjf2000/ants/v2/ants.go @@ -20,15 +20,27 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. +// Package ants implements an efficient and reliable goroutine pool for Go. +// +// With ants, Go applications are able to limit the number of active goroutines, +// recycle goroutines efficiently, and reduce the memory footprint significantly. +// Package ants is extremely useful in the scenarios where a massive number of +// goroutines are created and destroyed frequently, such as highly-concurrent +// batch processing systems, HTTP servers, services of asynchronous tasks, etc. package ants import ( + "context" "errors" "log" "math" "os" "runtime" + "sync" + "sync/atomic" "time" + + syncx "github.com/panjf2000/ants/v2/pkg/sync" ) const ( @@ -72,6 +84,9 @@ var ( // ErrInvalidLoadBalancingStrategy will be returned when trying to create a MultiPool with an invalid load-balancing strategy. ErrInvalidLoadBalancingStrategy = errors.New("invalid load-balancing strategy") + // ErrInvalidMultiPoolSize will be returned when trying to create a MultiPool with an invalid size. + ErrInvalidMultiPoolSize = errors.New("invalid size for multiple pool") + // workerChanCap determines whether the channel of a worker should be a buffered channel // to get the best performance. Inspired by fasthttp at // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139 @@ -88,22 +103,12 @@ var ( return 1 }() - // log.Lmsgprefix is not available in go1.13, just make an identical value for it. - logLmsgprefix = 64 - defaultLogger = Logger(log.New(os.Stderr, "[ants]: ", log.LstdFlags|logLmsgprefix|log.Lmicroseconds)) + defaultLogger = Logger(log.New(os.Stderr, "[ants]: ", log.LstdFlags|log.Lmsgprefix|log.Lmicroseconds)) // Init an instance pool when importing ants. defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) ) -const nowTimeUpdateInterval = 500 * time.Millisecond - -// Logger is used for logging formatted messages. -type Logger interface { - // Printf must have the same semantics as log.Printf. - Printf(format string, args ...interface{}) -} - // Submit submits a task to pool. func Submit(task func()) error { return defaultAntsPool.Submit(task) @@ -138,3 +143,383 @@ func ReleaseTimeout(timeout time.Duration) error { func Reboot() { defaultAntsPool.Reboot() } + +// Logger is used for logging formatted messages. +type Logger interface { + // Printf must have the same semantics as log.Printf. + Printf(format string, args ...any) +} + +// poolCommon contains all common fields for other sophisticated pools. +type poolCommon struct { + // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to + // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool + // which submits a new task to the same pool. + capacity int32 + + // running is the number of the currently running goroutines. + running int32 + + // lock for protecting the worker queue. + lock sync.Locker + + // workers is a slice that store the available workers. + workers workerQueue + + // state is used to notice the pool to closed itself. + state int32 + + // cond for waiting to get an idle worker. + cond *sync.Cond + + // done is used to indicate that all workers are done. + allDone chan struct{} + // once is used to make sure the pool is closed just once. + once *sync.Once + + // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. + workerCache sync.Pool + + // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock + waiting int32 + + purgeDone int32 + purgeCtx context.Context + stopPurge context.CancelFunc + + ticktockDone int32 + ticktockCtx context.Context + stopTicktock context.CancelFunc + + now atomic.Value + + options *Options +} + +func newPool(size int, options ...Option) (*poolCommon, error) { + if size <= 0 { + size = -1 + } + + opts := loadOptions(options...) + + if !opts.DisablePurge { + if expiry := opts.ExpiryDuration; expiry < 0 { + return nil, ErrInvalidPoolExpiry + } else if expiry == 0 { + opts.ExpiryDuration = DefaultCleanIntervalTime + } + } + + if opts.Logger == nil { + opts.Logger = defaultLogger + } + + p := &poolCommon{ + capacity: int32(size), + allDone: make(chan struct{}), + lock: syncx.NewSpinLock(), + once: &sync.Once{}, + options: opts, + } + if p.options.PreAlloc { + if size == -1 { + return nil, ErrInvalidPreAllocSize + } + p.workers = newWorkerQueue(queueTypeLoopQueue, size) + } else { + p.workers = newWorkerQueue(queueTypeStack, 0) + } + + p.cond = sync.NewCond(p.lock) + + p.goPurge() + p.goTicktock() + + return p, nil +} + +// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. +func (p *poolCommon) purgeStaleWorkers() { + ticker := time.NewTicker(p.options.ExpiryDuration) + + defer func() { + ticker.Stop() + atomic.StoreInt32(&p.purgeDone, 1) + }() + + purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() + for { + select { + case <-purgeCtx.Done(): + return + case <-ticker.C: + } + + if p.IsClosed() { + break + } + + var isDormant bool + p.lock.Lock() + staleWorkers := p.workers.refresh(p.options.ExpiryDuration) + n := p.Running() + isDormant = n == 0 || n == len(staleWorkers) + p.lock.Unlock() + + // Clean up the stale workers. + for i := range staleWorkers { + staleWorkers[i].finish() + staleWorkers[i] = nil + } + + // There might be a situation where all workers have been cleaned up (no worker is running), + // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers. + if isDormant && p.Waiting() > 0 { + p.cond.Broadcast() + } + } +} + +const nowTimeUpdateInterval = 500 * time.Millisecond + +// ticktock is a goroutine that updates the current time in the pool regularly. +func (p *poolCommon) ticktock() { + ticker := time.NewTicker(nowTimeUpdateInterval) + defer func() { + ticker.Stop() + atomic.StoreInt32(&p.ticktockDone, 1) + }() + + ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() + for { + select { + case <-ticktockCtx.Done(): + return + case <-ticker.C: + } + + if p.IsClosed() { + break + } + + p.now.Store(time.Now()) + } +} + +func (p *poolCommon) goPurge() { + if p.options.DisablePurge { + return + } + + // Start a goroutine to clean up expired workers periodically. + p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) + go p.purgeStaleWorkers() +} + +func (p *poolCommon) goTicktock() { + p.now.Store(time.Now()) + p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) + go p.ticktock() +} + +func (p *poolCommon) nowTime() time.Time { + return p.now.Load().(time.Time) +} + +// Running returns the number of workers currently running. +func (p *poolCommon) Running() int { + return int(atomic.LoadInt32(&p.running)) +} + +// Free returns the number of available workers, -1 indicates this pool is unlimited. +func (p *poolCommon) Free() int { + c := p.Cap() + if c < 0 { + return -1 + } + return c - p.Running() +} + +// Waiting returns the number of tasks waiting to be executed. +func (p *poolCommon) Waiting() int { + return int(atomic.LoadInt32(&p.waiting)) +} + +// Cap returns the capacity of this pool. +func (p *poolCommon) Cap() int { + return int(atomic.LoadInt32(&p.capacity)) +} + +// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. +func (p *poolCommon) Tune(size int) { + capacity := p.Cap() + if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { + return + } + atomic.StoreInt32(&p.capacity, int32(size)) + if size > capacity { + if size-capacity == 1 { + p.cond.Signal() + return + } + p.cond.Broadcast() + } +} + +// IsClosed indicates whether the pool is closed. +func (p *poolCommon) IsClosed() bool { + return atomic.LoadInt32(&p.state) == CLOSED +} + +// Release closes this pool and releases the worker queue. +func (p *poolCommon) Release() { + if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { + return + } + + if p.stopPurge != nil { + p.stopPurge() + p.stopPurge = nil + } + if p.stopTicktock != nil { + p.stopTicktock() + p.stopTicktock = nil + } + + p.lock.Lock() + p.workers.reset() + p.lock.Unlock() + + // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent + // those callers blocking infinitely. + p.cond.Broadcast() +} + +// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. +func (p *poolCommon) ReleaseTimeout(timeout time.Duration) error { + if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { + return ErrPoolClosed + } + + p.Release() + + var purgeCh <-chan struct{} + if !p.options.DisablePurge { + purgeCh = p.purgeCtx.Done() + } else { + purgeCh = p.allDone + } + + if p.Running() == 0 { + p.once.Do(func() { + close(p.allDone) + }) + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case <-timer.C: + return ErrTimeout + case <-p.allDone: + <-purgeCh + <-p.ticktockCtx.Done() + if p.Running() == 0 && + (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && + atomic.LoadInt32(&p.ticktockDone) == 1 { + return nil + } + } + } +} + +// Reboot reboots a closed pool, it does nothing if the pool is not closed. +// If you intend to reboot a closed pool, use ReleaseTimeout() instead of +// Release() to ensure that all workers are stopped and resource are released +// before rebooting, otherwise you may run into data race. +func (p *poolCommon) Reboot() { + if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { + atomic.StoreInt32(&p.purgeDone, 0) + p.goPurge() + atomic.StoreInt32(&p.ticktockDone, 0) + p.goTicktock() + p.allDone = make(chan struct{}) + p.once = &sync.Once{} + } +} + +func (p *poolCommon) addRunning(delta int) int { + return int(atomic.AddInt32(&p.running, int32(delta))) +} + +func (p *poolCommon) addWaiting(delta int) { + atomic.AddInt32(&p.waiting, int32(delta)) +} + +// retrieveWorker returns an available worker to run the tasks. +func (p *poolCommon) retrieveWorker() (w worker, err error) { + p.lock.Lock() + +retry: + // First try to fetch the worker from the queue. + if w = p.workers.detach(); w != nil { + p.lock.Unlock() + return + } + + // If the worker queue is empty, and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. + if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + p.lock.Unlock() + w = p.workerCache.Get().(worker) + w.run() + return + } + + // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. + if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { + p.lock.Unlock() + return nil, ErrPoolOverload + } + + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + p.addWaiting(1) + p.cond.Wait() // block and wait for an available worker + p.addWaiting(-1) + + if p.IsClosed() { + p.lock.Unlock() + return nil, ErrPoolClosed + } + + goto retry +} + +// revertWorker puts a worker back into free pool, recycling the goroutines. +func (p *poolCommon) revertWorker(worker worker) bool { + if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { + p.cond.Broadcast() + return false + } + + worker.setLastUsedTime(p.nowTime()) + + p.lock.Lock() + // To avoid memory leaks, add a double check in the lock scope. + // Issue: https://github.com/panjf2000/ants/issues/113 + if p.IsClosed() { + p.lock.Unlock() + return false + } + if err := p.workers.insert(worker); err != nil { + p.lock.Unlock() + return false + } + // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. + p.cond.Signal() + p.lock.Unlock() + + return true +} diff --git a/vendor/github.com/panjf2000/ants/v2/multipool.go b/vendor/github.com/panjf2000/ants/v2/multipool.go index 3f78ce2c..342b0383 100644 --- a/vendor/github.com/panjf2000/ants/v2/multipool.go +++ b/vendor/github.com/panjf2000/ants/v2/multipool.go @@ -25,6 +25,7 @@ package ants import ( "errors" "fmt" + "math" "strings" "sync/atomic" "time" @@ -58,6 +59,10 @@ type MultiPool struct { // NewMultiPool instantiates a MultiPool with a size of the pool list and a size // per pool, and the load-balancing strategy. func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...Option) (*MultiPool, error) { + if size <= 0 { + return nil, ErrInvalidMultiPoolSize + } + if lbs != RoundRobin && lbs != LeastTasks { return nil, ErrInvalidLoadBalancingStrategy } @@ -69,16 +74,13 @@ func NewMultiPool(size, sizePerPool int, lbs LoadBalancingStrategy, options ...O } pools[i] = pool } - return &MultiPool{pools: pools, lbs: lbs}, nil + return &MultiPool{pools: pools, index: math.MaxUint32, lbs: lbs}, nil } func (mp *MultiPool) next(lbs LoadBalancingStrategy) (idx int) { switch lbs { case RoundRobin: - if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { - idx = 0 - } - return + return int(atomic.AddUint32(&mp.index, 1) % uint32(len(mp.pools))) case LeastTasks: leastTasks := 1<<31 - 1 for i, pool := range mp.pools { diff --git a/vendor/github.com/panjf2000/ants/v2/multipool_func.go b/vendor/github.com/panjf2000/ants/v2/multipool_func.go index 868c0dea..7b4b6e54 100644 --- a/vendor/github.com/panjf2000/ants/v2/multipool_func.go +++ b/vendor/github.com/panjf2000/ants/v2/multipool_func.go @@ -25,6 +25,7 @@ package ants import ( "errors" "fmt" + "math" "strings" "sync/atomic" "time" @@ -46,7 +47,11 @@ type MultiPoolWithFunc struct { // NewMultiPoolWithFunc instantiates a MultiPoolWithFunc with a size of the pool list and a size // per pool, and the load-balancing strategy. -func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) { +func NewMultiPoolWithFunc(size, sizePerPool int, fn func(any), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFunc, error) { + if size <= 0 { + return nil, ErrInvalidMultiPoolSize + } + if lbs != RoundRobin && lbs != LeastTasks { return nil, ErrInvalidLoadBalancingStrategy } @@ -58,16 +63,13 @@ func NewMultiPoolWithFunc(size, sizePerPool int, fn func(interface{}), lbs LoadB } pools[i] = pool } - return &MultiPoolWithFunc{pools: pools, lbs: lbs}, nil + return &MultiPoolWithFunc{pools: pools, index: math.MaxUint32, lbs: lbs}, nil } func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) { switch lbs { case RoundRobin: - if idx = int((atomic.AddUint32(&mp.index, 1) - 1) % uint32(len(mp.pools))); idx == -1 { - idx = 0 - } - return + return int(atomic.AddUint32(&mp.index, 1) % uint32(len(mp.pools))) case LeastTasks: leastTasks := 1<<31 - 1 for i, pool := range mp.pools { @@ -82,7 +84,7 @@ func (mp *MultiPoolWithFunc) next(lbs LoadBalancingStrategy) (idx int) { } // Invoke submits a task to a pool selected by the load-balancing strategy. -func (mp *MultiPoolWithFunc) Invoke(args interface{}) (err error) { +func (mp *MultiPoolWithFunc) Invoke(args any) (err error) { if mp.IsClosed() { return ErrPoolClosed } diff --git a/vendor/github.com/panjf2000/ants/v2/multipool_func_generic.go b/vendor/github.com/panjf2000/ants/v2/multipool_func_generic.go new file mode 100644 index 00000000..f5931e51 --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/multipool_func_generic.go @@ -0,0 +1,215 @@ +// MIT License + +// Copyright (c) 2025 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "errors" + "fmt" + "math" + "strings" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" +) + +// MultiPoolWithFuncGeneric is the generic version of MultiPoolWithFunc. +type MultiPoolWithFuncGeneric[T any] struct { + pools []*PoolWithFuncGeneric[T] + index uint32 + state int32 + lbs LoadBalancingStrategy +} + +// NewMultiPoolWithFuncGeneric instantiates a MultiPoolWithFunc with a size of the pool list and a size +// per pool, and the load-balancing strategy. +func NewMultiPoolWithFuncGeneric[T any](size, sizePerPool int, fn func(T), lbs LoadBalancingStrategy, options ...Option) (*MultiPoolWithFuncGeneric[T], error) { + if size <= 0 { + return nil, ErrInvalidMultiPoolSize + } + + if lbs != RoundRobin && lbs != LeastTasks { + return nil, ErrInvalidLoadBalancingStrategy + } + pools := make([]*PoolWithFuncGeneric[T], size) + for i := 0; i < size; i++ { + pool, err := NewPoolWithFuncGeneric(sizePerPool, fn, options...) + if err != nil { + return nil, err + } + pools[i] = pool + } + return &MultiPoolWithFuncGeneric[T]{pools: pools, index: math.MaxUint32, lbs: lbs}, nil +} + +func (mp *MultiPoolWithFuncGeneric[T]) next(lbs LoadBalancingStrategy) (idx int) { + switch lbs { + case RoundRobin: + return int(atomic.AddUint32(&mp.index, 1) % uint32(len(mp.pools))) + case LeastTasks: + leastTasks := 1<<31 - 1 + for i, pool := range mp.pools { + if n := pool.Running(); n < leastTasks { + leastTasks = n + idx = i + } + } + return + } + return -1 +} + +// Invoke submits a task to a pool selected by the load-balancing strategy. +func (mp *MultiPoolWithFuncGeneric[T]) Invoke(args T) (err error) { + if mp.IsClosed() { + return ErrPoolClosed + } + + if err = mp.pools[mp.next(mp.lbs)].Invoke(args); err == nil { + return + } + if err == ErrPoolOverload && mp.lbs == RoundRobin { + return mp.pools[mp.next(LeastTasks)].Invoke(args) + } + return +} + +// Running returns the number of the currently running workers across all pools. +func (mp *MultiPoolWithFuncGeneric[T]) Running() (n int) { + for _, pool := range mp.pools { + n += pool.Running() + } + return +} + +// RunningByIndex returns the number of the currently running workers in the specific pool. +func (mp *MultiPoolWithFuncGeneric[T]) RunningByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Running(), nil +} + +// Free returns the number of available workers across all pools. +func (mp *MultiPoolWithFuncGeneric[T]) Free() (n int) { + for _, pool := range mp.pools { + n += pool.Free() + } + return +} + +// FreeByIndex returns the number of available workers in the specific pool. +func (mp *MultiPoolWithFuncGeneric[T]) FreeByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Free(), nil +} + +// Waiting returns the number of the currently waiting tasks across all pools. +func (mp *MultiPoolWithFuncGeneric[T]) Waiting() (n int) { + for _, pool := range mp.pools { + n += pool.Waiting() + } + return +} + +// WaitingByIndex returns the number of the currently waiting tasks in the specific pool. +func (mp *MultiPoolWithFuncGeneric[T]) WaitingByIndex(idx int) (int, error) { + if idx < 0 || idx >= len(mp.pools) { + return -1, ErrInvalidPoolIndex + } + return mp.pools[idx].Waiting(), nil +} + +// Cap returns the capacity of this multi-pool. +func (mp *MultiPoolWithFuncGeneric[T]) Cap() (n int) { + for _, pool := range mp.pools { + n += pool.Cap() + } + return +} + +// Tune resizes each pool in multi-pool. +// +// Note that this method doesn't resize the overall +// capacity of multi-pool. +func (mp *MultiPoolWithFuncGeneric[T]) Tune(size int) { + for _, pool := range mp.pools { + pool.Tune(size) + } +} + +// IsClosed indicates whether the multi-pool is closed. +func (mp *MultiPoolWithFuncGeneric[T]) IsClosed() bool { + return atomic.LoadInt32(&mp.state) == CLOSED +} + +// ReleaseTimeout closes the multi-pool with a timeout, +// it waits all pools to be closed before timing out. +func (mp *MultiPoolWithFuncGeneric[T]) ReleaseTimeout(timeout time.Duration) error { + if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) { + return ErrPoolClosed + } + + errCh := make(chan error, len(mp.pools)) + var wg errgroup.Group + for i, pool := range mp.pools { + func(p *PoolWithFuncGeneric[T], idx int) { + wg.Go(func() error { + err := p.ReleaseTimeout(timeout) + if err != nil { + err = fmt.Errorf("pool %d: %v", idx, err) + } + errCh <- err + return err + }) + }(pool, i) + } + + _ = wg.Wait() + + var errStr strings.Builder + for i := 0; i < len(mp.pools); i++ { + if err := <-errCh; err != nil { + errStr.WriteString(err.Error()) + errStr.WriteString(" | ") + } + } + + if errStr.Len() == 0 { + return nil + } + + return errors.New(strings.TrimSuffix(errStr.String(), " | ")) +} + +// Reboot reboots a released multi-pool. +func (mp *MultiPoolWithFuncGeneric[T]) Reboot() { + if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) { + atomic.StoreUint32(&mp.index, 0) + for _, pool := range mp.pools { + pool.Reboot() + } + } +} diff --git a/vendor/github.com/panjf2000/ants/v2/options.go b/vendor/github.com/panjf2000/ants/v2/options.go index 90d1ad51..182cd5af 100644 --- a/vendor/github.com/panjf2000/ants/v2/options.go +++ b/vendor/github.com/panjf2000/ants/v2/options.go @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2018. Andy Pan. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package ants import "time" @@ -34,7 +56,7 @@ type Options struct { // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. - PanicHandler func(interface{}) + PanicHandler func(any) // Logger is the customized logger for logging info, if it is not set, // default standard logger from log package is used. @@ -80,7 +102,7 @@ func WithNonblocking(nonblocking bool) Option { } // WithPanicHandler sets up panic handler. -func WithPanicHandler(panicHandler func(interface{})) Option { +func WithPanicHandler(panicHandler func(any)) Option { return func(opts *Options) { opts.PanicHandler = panicHandler } diff --git a/vendor/github.com/panjf2000/ants/v2/internal/sync/spinlock.go b/vendor/github.com/panjf2000/ants/v2/pkg/sync/spinlock.go similarity index 100% rename from vendor/github.com/panjf2000/ants/v2/internal/sync/spinlock.go rename to vendor/github.com/panjf2000/ants/v2/pkg/sync/spinlock.go diff --git a/vendor/github.com/panjf2000/ants/v2/pkg/sync/sync.go b/vendor/github.com/panjf2000/ants/v2/pkg/sync/sync.go new file mode 100644 index 00000000..d66192f0 --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/pkg/sync/sync.go @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2025. Andy Pan. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +// Package sync provides some handy implementations for synchronization access. +// At the moment, there is only an implementation of spin-lock. +package sync diff --git a/vendor/github.com/panjf2000/ants/v2/pool.go b/vendor/github.com/panjf2000/ants/v2/pool.go index 33e46ed9..b1dfa991 100644 --- a/vendor/github.com/panjf2000/ants/v2/pool.go +++ b/vendor/github.com/panjf2000/ants/v2/pool.go @@ -22,206 +22,13 @@ package ants -import ( - "context" - "sync" - "sync/atomic" - "time" - - syncx "github.com/panjf2000/ants/v2/internal/sync" -) - -type poolCommon struct { - // capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to - // avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool - // which submits a new task to the same pool. - capacity int32 - - // running is the number of the currently running goroutines. - running int32 - - // lock for protecting the worker queue. - lock sync.Locker - - // workers is a slice that store the available workers. - workers workerQueue - - // state is used to notice the pool to closed itself. - state int32 - - // cond for waiting to get an idle worker. - cond *sync.Cond - - // done is used to indicate that all workers are done. - allDone chan struct{} - // once is used to make sure the pool is closed just once. - once *sync.Once - - // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. - workerCache sync.Pool - - // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock - waiting int32 - - purgeDone int32 - purgeCtx context.Context - stopPurge context.CancelFunc - - ticktockDone int32 - ticktockCtx context.Context - stopTicktock context.CancelFunc - - now atomic.Value - - options *Options -} - -// Pool accepts the tasks and process them concurrently, -// it limits the total of goroutines to a given number by recycling goroutines. +// Pool is a goroutine pool that limits and recycles a mass of goroutines. +// The pool capacity can be fixed or unlimited. type Pool struct { - poolCommon + *poolCommon } -// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. -func (p *Pool) purgeStaleWorkers() { - ticker := time.NewTicker(p.options.ExpiryDuration) - - defer func() { - ticker.Stop() - atomic.StoreInt32(&p.purgeDone, 1) - }() - - purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() - for { - select { - case <-purgeCtx.Done(): - return - case <-ticker.C: - } - - if p.IsClosed() { - break - } - - var isDormant bool - p.lock.Lock() - staleWorkers := p.workers.refresh(p.options.ExpiryDuration) - n := p.Running() - isDormant = n == 0 || n == len(staleWorkers) - p.lock.Unlock() - - // Notify obsolete workers to stop. - // This notification must be outside the p.lock, since w.task - // may be blocking and may consume a lot of time if many workers - // are located on non-local CPUs. - for i := range staleWorkers { - staleWorkers[i].finish() - staleWorkers[i] = nil - } - - // There might be a situation where all workers have been cleaned up (no worker is running), - // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers. - if isDormant && p.Waiting() > 0 { - p.cond.Broadcast() - } - } -} - -// ticktock is a goroutine that updates the current time in the pool regularly. -func (p *Pool) ticktock() { - ticker := time.NewTicker(nowTimeUpdateInterval) - defer func() { - ticker.Stop() - atomic.StoreInt32(&p.ticktockDone, 1) - }() - - ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() - for { - select { - case <-ticktockCtx.Done(): - return - case <-ticker.C: - } - - if p.IsClosed() { - break - } - - p.now.Store(time.Now()) - } -} - -func (p *Pool) goPurge() { - if p.options.DisablePurge { - return - } - - // Start a goroutine to clean up expired workers periodically. - p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) - go p.purgeStaleWorkers() -} - -func (p *Pool) goTicktock() { - p.now.Store(time.Now()) - p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) - go p.ticktock() -} - -func (p *Pool) nowTime() time.Time { - return p.now.Load().(time.Time) -} - -// NewPool instantiates a Pool with customized options. -func NewPool(size int, options ...Option) (*Pool, error) { - if size <= 0 { - size = -1 - } - - opts := loadOptions(options...) - - if !opts.DisablePurge { - if expiry := opts.ExpiryDuration; expiry < 0 { - return nil, ErrInvalidPoolExpiry - } else if expiry == 0 { - opts.ExpiryDuration = DefaultCleanIntervalTime - } - } - - if opts.Logger == nil { - opts.Logger = defaultLogger - } - - p := &Pool{poolCommon: poolCommon{ - capacity: int32(size), - allDone: make(chan struct{}), - lock: syncx.NewSpinLock(), - once: &sync.Once{}, - options: opts, - }} - p.workerCache.New = func() interface{} { - return &goWorker{ - pool: p, - task: make(chan func(), workerChanCap), - } - } - if p.options.PreAlloc { - if size == -1 { - return nil, ErrInvalidPreAllocSize - } - p.workers = newWorkerQueue(queueTypeLoopQueue, size) - } else { - p.workers = newWorkerQueue(queueTypeStack, 0) - } - - p.cond = sync.NewCond(p.lock) - - p.goPurge() - p.goTicktock() - - return p, nil -} - -// Submit submits a task to this pool. +// Submit submits a task to the pool. // // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), // but what calls for special attention is that you will get blocked with the last @@ -239,198 +46,20 @@ func (p *Pool) Submit(task func()) error { return err } -// Running returns the number of workers currently running. -func (p *Pool) Running() int { - return int(atomic.LoadInt32(&p.running)) -} - -// Free returns the number of available workers, -1 indicates this pool is unlimited. -func (p *Pool) Free() int { - c := p.Cap() - if c < 0 { - return -1 - } - return c - p.Running() -} - -// Waiting returns the number of tasks waiting to be executed. -func (p *Pool) Waiting() int { - return int(atomic.LoadInt32(&p.waiting)) -} - -// Cap returns the capacity of this pool. -func (p *Pool) Cap() int { - return int(atomic.LoadInt32(&p.capacity)) -} - -// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. -func (p *Pool) Tune(size int) { - capacity := p.Cap() - if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { - return - } - atomic.StoreInt32(&p.capacity, int32(size)) - if size > capacity { - if size-capacity == 1 { - p.cond.Signal() - return - } - p.cond.Broadcast() - } -} - -// IsClosed indicates whether the pool is closed. -func (p *Pool) IsClosed() bool { - return atomic.LoadInt32(&p.state) == CLOSED -} - -// Release closes this pool and releases the worker queue. -func (p *Pool) Release() { - if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { - return +// NewPool instantiates a Pool with customized options. +func NewPool(size int, options ...Option) (*Pool, error) { + pc, err := newPool(size, options...) + if err != nil { + return nil, err } - if p.stopPurge != nil { - p.stopPurge() - p.stopPurge = nil - } - if p.stopTicktock != nil { - p.stopTicktock() - p.stopTicktock = nil - } - - p.lock.Lock() - p.workers.reset() - p.lock.Unlock() - // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent - // those callers blocking infinitely. - p.cond.Broadcast() -} - -// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. -func (p *Pool) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { - return ErrPoolClosed - } - - p.Release() - - var purgeCh <-chan struct{} - if !p.options.DisablePurge { - purgeCh = p.purgeCtx.Done() - } else { - purgeCh = p.allDone - } - - if p.Running() == 0 { - p.once.Do(func() { - close(p.allDone) - }) - } - - timer := time.NewTimer(timeout) - defer timer.Stop() - for { - select { - case <-timer.C: - return ErrTimeout - case <-p.allDone: - <-purgeCh - <-p.ticktockCtx.Done() - if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } + pool := &Pool{poolCommon: pc} + pool.workerCache.New = func() any { + return &goWorker{ + pool: pool, + task: make(chan func(), workerChanCap), } } -} - -// Reboot reboots a closed pool, it does nothing if the pool is not closed. -// If you intend to reboot a closed pool, use ReleaseTimeout() instead of -// Release() to ensure that all workers are stopped and resource are released -// before rebooting, otherwise you may run into data race. -func (p *Pool) Reboot() { - if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - atomic.StoreInt32(&p.purgeDone, 0) - p.goPurge() - atomic.StoreInt32(&p.ticktockDone, 0) - p.goTicktock() - p.allDone = make(chan struct{}) - p.once = &sync.Once{} - } -} - -func (p *Pool) addRunning(delta int) int { - return int(atomic.AddInt32(&p.running, int32(delta))) -} - -func (p *Pool) addWaiting(delta int) { - atomic.AddInt32(&p.waiting, int32(delta)) -} - -// retrieveWorker returns an available worker to run the tasks. -func (p *Pool) retrieveWorker() (w worker, err error) { - p.lock.Lock() - -retry: - // First try to fetch the worker from the queue. - if w = p.workers.detach(); w != nil { - p.lock.Unlock() - return - } - - // If the worker queue is empty, and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. - if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - p.lock.Unlock() - w = p.workerCache.Get().(*goWorker) - w.run() - return - } - - // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. - if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { - p.lock.Unlock() - return nil, ErrPoolOverload - } - - // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) - - if p.IsClosed() { - p.lock.Unlock() - return nil, ErrPoolClosed - } - - goto retry -} - -// revertWorker puts a worker back into free pool, recycling the goroutines. -func (p *Pool) revertWorker(worker *goWorker) bool { - if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { - p.cond.Broadcast() - return false - } - - worker.lastUsed = p.nowTime() - - p.lock.Lock() - // To avoid memory leaks, add a double check in the lock scope. - // Issue: https://github.com/panjf2000/ants/issues/113 - if p.IsClosed() { - p.lock.Unlock() - return false - } - if err := p.workers.insert(worker); err != nil { - p.lock.Unlock() - return false - } - // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. - p.cond.Signal() - p.lock.Unlock() - - return true + + return pool, nil } diff --git a/vendor/github.com/panjf2000/ants/v2/pool_func.go b/vendor/github.com/panjf2000/ants/v2/pool_func.go index 140d5fe0..a181b43b 100644 --- a/vendor/github.com/panjf2000/ants/v2/pool_func.go +++ b/vendor/github.com/panjf2000/ants/v2/pool_func.go @@ -22,379 +22,54 @@ package ants -import ( - "context" - "sync" - "sync/atomic" - "time" - - syncx "github.com/panjf2000/ants/v2/internal/sync" -) - -// PoolWithFunc accepts the tasks and process them concurrently, -// it limits the total of goroutines to a given number by recycling goroutines. +// PoolWithFunc is like Pool but accepts a unified function for all goroutines to execute. type PoolWithFunc struct { - poolCommon + *poolCommon - // poolFunc is the function for processing tasks. - poolFunc func(interface{}) + // fn is the unified function for processing tasks. + fn func(any) } -// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. -func (p *PoolWithFunc) purgeStaleWorkers() { - ticker := time.NewTicker(p.options.ExpiryDuration) - defer func() { - ticker.Stop() - atomic.StoreInt32(&p.purgeDone, 1) - }() - - purgeCtx := p.purgeCtx // copy to the local variable to avoid race from Reboot() - for { - select { - case <-purgeCtx.Done(): - return - case <-ticker.C: - } - - if p.IsClosed() { - break - } - - var isDormant bool - p.lock.Lock() - staleWorkers := p.workers.refresh(p.options.ExpiryDuration) - n := p.Running() - isDormant = n == 0 || n == len(staleWorkers) - p.lock.Unlock() - - // Notify obsolete workers to stop. - // This notification must be outside the p.lock, since w.task - // may be blocking and may consume a lot of time if many workers - // are located on non-local CPUs. - for i := range staleWorkers { - staleWorkers[i].finish() - staleWorkers[i] = nil - } - - // There might be a situation where all workers have been cleaned up (no worker is running), - // while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers. - if isDormant && p.Waiting() > 0 { - p.cond.Broadcast() - } - } -} - -// ticktock is a goroutine that updates the current time in the pool regularly. -func (p *PoolWithFunc) ticktock() { - ticker := time.NewTicker(nowTimeUpdateInterval) - defer func() { - ticker.Stop() - atomic.StoreInt32(&p.ticktockDone, 1) - }() - - ticktockCtx := p.ticktockCtx // copy to the local variable to avoid race from Reboot() - for { - select { - case <-ticktockCtx.Done(): - return - case <-ticker.C: - } - - if p.IsClosed() { - break - } - - p.now.Store(time.Now()) - } -} - -func (p *PoolWithFunc) goPurge() { - if p.options.DisablePurge { - return - } - - // Start a goroutine to clean up expired workers periodically. - p.purgeCtx, p.stopPurge = context.WithCancel(context.Background()) - go p.purgeStaleWorkers() -} - -func (p *PoolWithFunc) goTicktock() { - p.now.Store(time.Now()) - p.ticktockCtx, p.stopTicktock = context.WithCancel(context.Background()) - go p.ticktock() -} - -func (p *PoolWithFunc) nowTime() time.Time { - return p.now.Load().(time.Time) -} - -// NewPoolWithFunc instantiates a PoolWithFunc with customized options. -func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { - if size <= 0 { - size = -1 - } - - if pf == nil { - return nil, ErrLackPoolFunc - } - - opts := loadOptions(options...) - - if !opts.DisablePurge { - if expiry := opts.ExpiryDuration; expiry < 0 { - return nil, ErrInvalidPoolExpiry - } else if expiry == 0 { - opts.ExpiryDuration = DefaultCleanIntervalTime - } - } - - if opts.Logger == nil { - opts.Logger = defaultLogger - } - - p := &PoolWithFunc{ - poolCommon: poolCommon{ - capacity: int32(size), - allDone: make(chan struct{}), - lock: syncx.NewSpinLock(), - once: &sync.Once{}, - options: opts, - }, - poolFunc: pf, - } - p.workerCache.New = func() interface{} { - return &goWorkerWithFunc{ - pool: p, - args: make(chan interface{}, workerChanCap), - } - } - if p.options.PreAlloc { - if size == -1 { - return nil, ErrInvalidPreAllocSize - } - p.workers = newWorkerQueue(queueTypeLoopQueue, size) - } else { - p.workers = newWorkerQueue(queueTypeStack, 0) - } - - p.cond = sync.NewCond(p.lock) - - p.goPurge() - p.goTicktock() - - return p, nil -} - -// Invoke submits a task to pool. +// Invoke passes arguments to the pool. // // Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(), // but what calls for special attention is that you will get blocked with the last // Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this, // you should instantiate a PoolWithFunc with ants.WithNonblocking(true). -func (p *PoolWithFunc) Invoke(args interface{}) error { +func (p *PoolWithFunc) Invoke(arg any) error { if p.IsClosed() { return ErrPoolClosed } w, err := p.retrieveWorker() if w != nil { - w.inputParam(args) + w.inputArg(arg) } return err } -// Running returns the number of workers currently running. -func (p *PoolWithFunc) Running() int { - return int(atomic.LoadInt32(&p.running)) -} - -// Free returns the number of available workers, -1 indicates this pool is unlimited. -func (p *PoolWithFunc) Free() int { - c := p.Cap() - if c < 0 { - return -1 - } - return c - p.Running() -} - -// Waiting returns the number of tasks waiting to be executed. -func (p *PoolWithFunc) Waiting() int { - return int(atomic.LoadInt32(&p.waiting)) -} - -// Cap returns the capacity of this pool. -func (p *PoolWithFunc) Cap() int { - return int(atomic.LoadInt32(&p.capacity)) -} - -// Tune changes the capacity of this pool, note that it is noneffective to the infinite or pre-allocation pool. -func (p *PoolWithFunc) Tune(size int) { - capacity := p.Cap() - if capacity == -1 || size <= 0 || size == capacity || p.options.PreAlloc { - return - } - atomic.StoreInt32(&p.capacity, int32(size)) - if size > capacity { - if size-capacity == 1 { - p.cond.Signal() - return - } - p.cond.Broadcast() - } -} - -// IsClosed indicates whether the pool is closed. -func (p *PoolWithFunc) IsClosed() bool { - return atomic.LoadInt32(&p.state) == CLOSED -} - -// Release closes this pool and releases the worker queue. -func (p *PoolWithFunc) Release() { - if !atomic.CompareAndSwapInt32(&p.state, OPENED, CLOSED) { - return +// NewPoolWithFunc instantiates a PoolWithFunc with customized options. +func NewPoolWithFunc(size int, pf func(any), options ...Option) (*PoolWithFunc, error) { + if pf == nil { + return nil, ErrLackPoolFunc } - if p.stopPurge != nil { - p.stopPurge() - p.stopPurge = nil - } - if p.stopTicktock != nil { - p.stopTicktock() - p.stopTicktock = nil + pc, err := newPool(size, options...) + if err != nil { + return nil, err } - p.lock.Lock() - p.workers.reset() - p.lock.Unlock() - // There might be some callers waiting in retrieveWorker(), so we need to wake them up to prevent - // those callers blocking infinitely. - p.cond.Broadcast() -} - -// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out. -func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { - if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil { - return ErrPoolClosed + pool := &PoolWithFunc{ + poolCommon: pc, + fn: pf, } - p.Release() - - var purgeCh <-chan struct{} - if !p.options.DisablePurge { - purgeCh = p.purgeCtx.Done() - } else { - purgeCh = p.allDone - } - - if p.Running() == 0 { - p.once.Do(func() { - close(p.allDone) - }) - } - - timer := time.NewTimer(timeout) - defer timer.Stop() - for { - select { - case <-timer.C: - return ErrTimeout - case <-p.allDone: - <-purgeCh - <-p.ticktockCtx.Done() - if p.Running() == 0 && - (p.options.DisablePurge || atomic.LoadInt32(&p.purgeDone) == 1) && - atomic.LoadInt32(&p.ticktockDone) == 1 { - return nil - } + pool.workerCache.New = func() any { + return &goWorkerWithFunc{ + pool: pool, + arg: make(chan any, workerChanCap), } } -} - -// Reboot reboots a closed pool, it does nothing if the pool is not closed. -// If you intend to reboot a closed pool, use ReleaseTimeout() instead of -// Release() to ensure that all workers are stopped and resource are released -// before rebooting, otherwise you may run into data race. -func (p *PoolWithFunc) Reboot() { - if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) { - atomic.StoreInt32(&p.purgeDone, 0) - p.goPurge() - atomic.StoreInt32(&p.ticktockDone, 0) - p.goTicktock() - p.allDone = make(chan struct{}) - p.once = &sync.Once{} - } -} - -func (p *PoolWithFunc) addRunning(delta int) int { - return int(atomic.AddInt32(&p.running, int32(delta))) -} - -func (p *PoolWithFunc) addWaiting(delta int) { - atomic.AddInt32(&p.waiting, int32(delta)) -} - -// retrieveWorker returns an available worker to run the tasks. -func (p *PoolWithFunc) retrieveWorker() (w worker, err error) { - p.lock.Lock() - -retry: - // First try to fetch the worker from the queue. - if w = p.workers.detach(); w != nil { - p.lock.Unlock() - return - } - - // If the worker queue is empty, and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. - if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - p.lock.Unlock() - w = p.workerCache.Get().(*goWorkerWithFunc) - w.run() - return - } - - // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. - if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { - p.lock.Unlock() - return nil, ErrPoolOverload - } - - // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) - - if p.IsClosed() { - p.lock.Unlock() - return nil, ErrPoolClosed - } - - goto retry -} - -// revertWorker puts a worker back into free pool, recycling the goroutines. -func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { - if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() { - p.cond.Broadcast() - return false - } - - worker.lastUsed = p.nowTime() - - p.lock.Lock() - // To avoid memory leaks, add a double check in the lock scope. - // Issue: https://github.com/panjf2000/ants/issues/113 - if p.IsClosed() { - p.lock.Unlock() - return false - } - if err := p.workers.insert(worker); err != nil { - p.lock.Unlock() - return false - } - // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue. - p.cond.Signal() - p.lock.Unlock() - - return true + + return pool, nil } diff --git a/vendor/github.com/panjf2000/ants/v2/pool_func_generic.go b/vendor/github.com/panjf2000/ants/v2/pool_func_generic.go new file mode 100644 index 00000000..06ed3cad --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/pool_func_generic.go @@ -0,0 +1,71 @@ +// MIT License + +// Copyright (c) 2025 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +// PoolWithFuncGeneric is the generic version of PoolWithFunc. +type PoolWithFuncGeneric[T any] struct { + *poolCommon + + // fn is the unified function for processing tasks. + fn func(T) +} + +// Invoke passes the argument to the pool to start a new task. +func (p *PoolWithFuncGeneric[T]) Invoke(arg T) error { + if p.IsClosed() { + return ErrPoolClosed + } + + w, err := p.retrieveWorker() + if w != nil { + w.(*goWorkerWithFuncGeneric[T]).arg <- arg + } + return err +} + +// NewPoolWithFuncGeneric instantiates a PoolWithFuncGeneric[T] with customized options. +func NewPoolWithFuncGeneric[T any](size int, pf func(T), options ...Option) (*PoolWithFuncGeneric[T], error) { + if pf == nil { + return nil, ErrLackPoolFunc + } + + pc, err := newPool(size, options...) + if err != nil { + return nil, err + } + + pool := &PoolWithFuncGeneric[T]{ + poolCommon: pc, + fn: pf, + } + + pool.workerCache.New = func() any { + return &goWorkerWithFuncGeneric[T]{ + pool: pool, + arg: make(chan T, workerChanCap), + exit: make(chan struct{}, 1), + } + } + + return pool, nil +} diff --git a/vendor/github.com/panjf2000/ants/v2/worker.go b/vendor/github.com/panjf2000/ants/v2/worker.go index 73166f80..03b4bd70 100644 --- a/vendor/github.com/panjf2000/ants/v2/worker.go +++ b/vendor/github.com/panjf2000/ants/v2/worker.go @@ -31,6 +31,8 @@ import ( // it starts a goroutine that accepts tasks and // performs function calls. type goWorker struct { + worker + // pool who owns this worker. pool *Pool @@ -64,11 +66,11 @@ func (w *goWorker) run() { w.pool.cond.Signal() }() - for f := range w.task { - if f == nil { + for fn := range w.task { + if fn == nil { return } - f() + fn() if ok := w.pool.revertWorker(w); !ok { return } @@ -84,10 +86,10 @@ func (w *goWorker) lastUsedTime() time.Time { return w.lastUsed } +func (w *goWorker) setLastUsedTime(t time.Time) { + w.lastUsed = t +} + func (w *goWorker) inputFunc(fn func()) { w.task <- fn } - -func (w *goWorker) inputParam(interface{}) { - panic("unreachable") -} diff --git a/vendor/github.com/panjf2000/ants/v2/worker_func.go b/vendor/github.com/panjf2000/ants/v2/worker_func.go index a25f4f9e..8437e40d 100644 --- a/vendor/github.com/panjf2000/ants/v2/worker_func.go +++ b/vendor/github.com/panjf2000/ants/v2/worker_func.go @@ -31,11 +31,13 @@ import ( // it starts a goroutine that accepts tasks and // performs function calls. type goWorkerWithFunc struct { + worker + // pool who owns this worker. pool *PoolWithFunc - // args is a job should be done. - args chan interface{} + // arg is the argument for the function. + arg chan any // lastUsed will be updated when putting a worker back into queue. lastUsed time.Time @@ -64,11 +66,11 @@ func (w *goWorkerWithFunc) run() { w.pool.cond.Signal() }() - for args := range w.args { - if args == nil { + for arg := range w.arg { + if arg == nil { return } - w.pool.poolFunc(args) + w.pool.fn(arg) if ok := w.pool.revertWorker(w); !ok { return } @@ -77,17 +79,17 @@ func (w *goWorkerWithFunc) run() { } func (w *goWorkerWithFunc) finish() { - w.args <- nil + w.arg <- nil } func (w *goWorkerWithFunc) lastUsedTime() time.Time { return w.lastUsed } -func (w *goWorkerWithFunc) inputFunc(func()) { - panic("unreachable") +func (w *goWorkerWithFunc) setLastUsedTime(t time.Time) { + w.lastUsed = t } -func (w *goWorkerWithFunc) inputParam(arg interface{}) { - w.args <- arg +func (w *goWorkerWithFunc) inputArg(arg any) { + w.arg <- arg } diff --git a/vendor/github.com/panjf2000/ants/v2/worker_func_generic.go b/vendor/github.com/panjf2000/ants/v2/worker_func_generic.go new file mode 100644 index 00000000..a76d109c --- /dev/null +++ b/vendor/github.com/panjf2000/ants/v2/worker_func_generic.go @@ -0,0 +1,96 @@ +// MIT License + +// Copyright (c) 2025 Andy Pan + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package ants + +import ( + "runtime/debug" + "time" +) + +// goWorkerWithFunc is the actual executor who runs the tasks, +// it starts a goroutine that accepts tasks and +// performs function calls. +type goWorkerWithFuncGeneric[T any] struct { + worker + + // pool who owns this worker. + pool *PoolWithFuncGeneric[T] + + // arg is a job should be done. + arg chan T + + // exit signals the goroutine to exit. + exit chan struct{} + + // lastUsed will be updated when putting a worker back into queue. + lastUsed time.Time +} + +// run starts a goroutine to repeat the process +// that performs the function calls. +func (w *goWorkerWithFuncGeneric[T]) run() { + w.pool.addRunning(1) + go func() { + defer func() { + if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() { + w.pool.once.Do(func() { + close(w.pool.allDone) + }) + } + w.pool.workerCache.Put(w) + if p := recover(); p != nil { + if ph := w.pool.options.PanicHandler; ph != nil { + ph(p) + } else { + w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack()) + } + } + // Call Signal() here in case there are goroutines waiting for available workers. + w.pool.cond.Signal() + }() + + for { + select { + case <-w.exit: + return + case arg := <-w.arg: + w.pool.fn(arg) + if ok := w.pool.revertWorker(w); !ok { + return + } + } + } + }() +} + +func (w *goWorkerWithFuncGeneric[T]) finish() { + w.exit <- struct{}{} +} + +func (w *goWorkerWithFuncGeneric[T]) lastUsedTime() time.Time { + return w.lastUsed +} + +func (w *goWorkerWithFuncGeneric[T]) setLastUsedTime(t time.Time) { + w.lastUsed = t +} diff --git a/vendor/github.com/panjf2000/ants/v2/worker_loop_queue.go b/vendor/github.com/panjf2000/ants/v2/worker_loop_queue.go index a5451ab5..b3729839 100644 --- a/vendor/github.com/panjf2000/ants/v2/worker_loop_queue.go +++ b/vendor/github.com/panjf2000/ants/v2/worker_loop_queue.go @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2019. Ants Authors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package ants import "time" @@ -12,6 +34,9 @@ type loopQueue struct { } func newWorkerLoopQueue(size int) *loopQueue { + if size <= 0 { + return nil + } return &loopQueue{ items: make([]worker, size), size: size, @@ -39,10 +64,6 @@ func (wq *loopQueue) isEmpty() bool { } func (wq *loopQueue) insert(w worker) error { - if wq.size == 0 { - return errQueueIsReleased - } - if wq.isFull { return errQueueIsFull } diff --git a/vendor/github.com/panjf2000/ants/v2/worker_queue.go b/vendor/github.com/panjf2000/ants/v2/worker_queue.go index bcb74807..948bc914 100644 --- a/vendor/github.com/panjf2000/ants/v2/worker_queue.go +++ b/vendor/github.com/panjf2000/ants/v2/worker_queue.go @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2019. Ants Authors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package ants import ( @@ -5,20 +27,16 @@ import ( "time" ) -var ( - // errQueueIsFull will be returned when the worker queue is full. - errQueueIsFull = errors.New("the queue is full") - - // errQueueIsReleased will be returned when trying to insert item to a released worker queue. - errQueueIsReleased = errors.New("the queue length is zero") -) +// errQueueIsFull will be returned when the worker queue is full. +var errQueueIsFull = errors.New("the queue is full") type worker interface { run() finish() lastUsedTime() time.Time + setLastUsedTime(t time.Time) inputFunc(func()) - inputParam(interface{}) + inputArg(any) } type workerQueue interface { diff --git a/vendor/github.com/panjf2000/ants/v2/worker_stack.go b/vendor/github.com/panjf2000/ants/v2/worker_stack.go index 6b01abcd..18dcd23b 100644 --- a/vendor/github.com/panjf2000/ants/v2/worker_stack.go +++ b/vendor/github.com/panjf2000/ants/v2/worker_stack.go @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2019. Ants Authors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package ants import "time" @@ -13,57 +35,57 @@ func newWorkerStack(size int) *workerStack { } } -func (wq *workerStack) len() int { - return len(wq.items) +func (ws *workerStack) len() int { + return len(ws.items) } -func (wq *workerStack) isEmpty() bool { - return len(wq.items) == 0 +func (ws *workerStack) isEmpty() bool { + return len(ws.items) == 0 } -func (wq *workerStack) insert(w worker) error { - wq.items = append(wq.items, w) +func (ws *workerStack) insert(w worker) error { + ws.items = append(ws.items, w) return nil } -func (wq *workerStack) detach() worker { - l := wq.len() +func (ws *workerStack) detach() worker { + l := ws.len() if l == 0 { return nil } - w := wq.items[l-1] - wq.items[l-1] = nil // avoid memory leaks - wq.items = wq.items[:l-1] + w := ws.items[l-1] + ws.items[l-1] = nil // avoid memory leaks + ws.items = ws.items[:l-1] return w } -func (wq *workerStack) refresh(duration time.Duration) []worker { - n := wq.len() +func (ws *workerStack) refresh(duration time.Duration) []worker { + n := ws.len() if n == 0 { return nil } expiryTime := time.Now().Add(-duration) - index := wq.binarySearch(0, n-1, expiryTime) + index := ws.binarySearch(0, n-1, expiryTime) - wq.expiry = wq.expiry[:0] + ws.expiry = ws.expiry[:0] if index != -1 { - wq.expiry = append(wq.expiry, wq.items[:index+1]...) - m := copy(wq.items, wq.items[index+1:]) + ws.expiry = append(ws.expiry, ws.items[:index+1]...) + m := copy(ws.items, ws.items[index+1:]) for i := m; i < n; i++ { - wq.items[i] = nil + ws.items[i] = nil } - wq.items = wq.items[:m] + ws.items = ws.items[:m] } - return wq.expiry + return ws.expiry } -func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int { +func (ws *workerStack) binarySearch(l, r int, expiryTime time.Time) int { for l <= r { mid := l + ((r - l) >> 1) // avoid overflow when computing mid - if expiryTime.Before(wq.items[mid].lastUsedTime()) { + if expiryTime.Before(ws.items[mid].lastUsedTime()) { r = mid - 1 } else { l = mid + 1 @@ -72,10 +94,10 @@ func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int { return r } -func (wq *workerStack) reset() { - for i := 0; i < wq.len(); i++ { - wq.items[i].finish() - wq.items[i] = nil +func (ws *workerStack) reset() { + for i := 0; i < ws.len(); i++ { + ws.items[i].finish() + ws.items[i] = nil } - wq.items = wq.items[:0] + ws.items = ws.items[:0] } diff --git a/vendor/modules.txt b/vendor/modules.txt index a308c625..24bb950f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -408,10 +408,10 @@ github.com/opencontainers/image-spec/specs-go/v1 # github.com/opencontainers/runtime-spec v1.2.0 ## explicit github.com/opencontainers/runtime-spec/specs-go -# github.com/panjf2000/ants/v2 v2.10.0 -## explicit; go 1.13 +# github.com/panjf2000/ants/v2 v2.11.0 +## explicit; go 1.18 github.com/panjf2000/ants/v2 -github.com/panjf2000/ants/v2/internal/sync +github.com/panjf2000/ants/v2/pkg/sync # github.com/pkg/errors v0.9.1 ## explicit github.com/pkg/errors