diff --git a/go.mod b/go.mod index 53c1b769..cae16fd3 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/samber/lo v1.46.0 github.com/wk8/go-ordered-map/v2 v2.1.8 github.com/yuin/goldmark v1.7.4 + golang.org/x/sync v0.7.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 ) diff --git a/go.sum b/go.sum index b54c7527..dc99ccff 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/docker/container_store.go b/internal/docker/container_store.go index 3559fce1..1af0d2ac 100644 --- a/internal/docker/container_store.go +++ b/internal/docker/container_store.go @@ -8,8 +8,8 @@ import ( "github.com/puzpuzpuz/xsync/v3" "github.com/samber/lo" - lop "github.com/samber/lo/parallel" log "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) type ContainerStore struct { @@ -43,7 +43,10 @@ func NewContainerStore(ctx context.Context, client Client) *ContainerStore { return s } -var ErrContainerNotFound = errors.New("container not found") +var ( + ErrContainerNotFound = errors.New("container not found") + maxFetchParallelism = int64(30) +) func (s *ContainerStore) checkConnectivity() error { if s.connected.CompareAndSwap(false, true) { @@ -69,14 +72,26 @@ func (s *ContainerStore) checkConnectivity() error { return item.State == "running" }) - chunks := lo.Chunk(running, 100) + sem := semaphore.NewWeighted(maxFetchParallelism) - for _, chunk := range chunks { - lop.ForEach(chunk, func(c Container, _ int) { - container, _ := s.client.FindContainer(c.ID) - s.containers.Store(c.ID, &container) - }) + for i, c := range running { + if err := sem.Acquire(s.ctx, 1); err != nil { + log.Errorf("failed to acquire semaphore: %v", err) + break + } + go func(c Container, i int) { + defer sem.Release(1) + if container, err := s.client.FindContainer(c.ID); err == nil { + s.containers.Store(c.ID, &container) + } + }(c, i) } + + if err := sem.Acquire(s.ctx, maxFetchParallelism); err != nil { + log.Errorf("failed to acquire semaphore: %v", err) + } + + log.Debugf("finished initializing container store with %d containers", len(containers)) } }