mirror of
https://github.com/amir20/dozzle.git
synced 2025-12-24 22:39:18 +01:00
perf: uses a semaphore to fetch containers faster in parallel (#3161)
This commit is contained in:
@@ -8,8 +8,8 @@ import (
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"github.com/samber/lo"
|
||||
lop "github.com/samber/lo/parallel"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
type ContainerStore struct {
|
||||
@@ -43,7 +43,10 @@ func NewContainerStore(ctx context.Context, client Client) *ContainerStore {
|
||||
return s
|
||||
}
|
||||
|
||||
var ErrContainerNotFound = errors.New("container not found")
|
||||
var (
|
||||
ErrContainerNotFound = errors.New("container not found")
|
||||
maxFetchParallelism = int64(30)
|
||||
)
|
||||
|
||||
func (s *ContainerStore) checkConnectivity() error {
|
||||
if s.connected.CompareAndSwap(false, true) {
|
||||
@@ -69,14 +72,26 @@ func (s *ContainerStore) checkConnectivity() error {
|
||||
return item.State == "running"
|
||||
})
|
||||
|
||||
chunks := lo.Chunk(running, 100)
|
||||
sem := semaphore.NewWeighted(maxFetchParallelism)
|
||||
|
||||
for _, chunk := range chunks {
|
||||
lop.ForEach(chunk, func(c Container, _ int) {
|
||||
container, _ := s.client.FindContainer(c.ID)
|
||||
s.containers.Store(c.ID, &container)
|
||||
})
|
||||
for i, c := range running {
|
||||
if err := sem.Acquire(s.ctx, 1); err != nil {
|
||||
log.Errorf("failed to acquire semaphore: %v", err)
|
||||
break
|
||||
}
|
||||
go func(c Container, i int) {
|
||||
defer sem.Release(1)
|
||||
if container, err := s.client.FindContainer(c.ID); err == nil {
|
||||
s.containers.Store(c.ID, &container)
|
||||
}
|
||||
}(c, i)
|
||||
}
|
||||
|
||||
if err := sem.Acquire(s.ctx, maxFetchParallelism); err != nil {
|
||||
log.Errorf("failed to acquire semaphore: %v", err)
|
||||
}
|
||||
|
||||
log.Debugf("finished initializing container store with %d containers", len(containers))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user