diff --git a/go.mod b/go.mod index 6888ddf0..d912a056 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/go-chi/chi/v5 v5.2.1 github.com/go-chi/jwtauth/v5 v5.3.3 github.com/gorilla/websocket v1.5.3 - github.com/puzpuzpuz/xsync/v3 v3.5.1 github.com/puzpuzpuz/xsync/v4 v4.0.0 github.com/rs/zerolog v1.34.0 github.com/samber/lo v1.49.1 diff --git a/go.sum b/go.sum index 557e8153..ddf63ad4 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg= -github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/puzpuzpuz/xsync/v4 v4.0.0 h1:F1za+MBXzDQtQq+OVgFsojSX4w66rsNDmQNebPFAncA= github.com/puzpuzpuz/xsync/v4 v4.0.0/go.mod h1:VJDmTCJMBt8igNxnkQd86r+8KUeN1quSfNKu5bLYFQo= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= diff --git a/internal/container/container_store.go b/internal/container/container_store.go index 5efee819..30bb3f74 100644 --- a/internal/container/container_store.go +++ b/internal/container/container_store.go @@ -7,7 +7,7 @@ import ( "sync/atomic" "time" - "github.com/puzpuzpuz/xsync/v3" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog/log" "github.com/samber/lo" "golang.org/x/sync/semaphore" @@ -20,9 +20,9 @@ type StatsCollector interface { } type ContainerStore struct { - containers *xsync.MapOf[string, *Container] - subscribers *xsync.MapOf[context.Context, chan<- ContainerEvent] - newContainerSubscribers *xsync.MapOf[context.Context, chan<- Container] + containers *xsync.Map[string, *Container] + subscribers *xsync.Map[context.Context, chan<- ContainerEvent] + newContainerSubscribers *xsync.Map[context.Context, chan<- Container] client Client statsCollector StatsCollector wg sync.WaitGroup @@ -38,10 +38,10 @@ func NewContainerStore(ctx context.Context, client Client, statsCollect StatsCol log.Debug().Str("host", client.Host().Name).Interface("labels", labels).Msg("initializing container store") s := &ContainerStore{ - containers: xsync.NewMapOf[string, *Container](), + containers: xsync.NewMap[string, *Container](), client: client, - subscribers: xsync.NewMapOf[context.Context, chan<- ContainerEvent](), - newContainerSubscribers: xsync.NewMapOf[context.Context, chan<- Container](), + subscribers: xsync.NewMap[context.Context, chan<- ContainerEvent](), + newContainerSubscribers: xsync.NewMap[context.Context, chan<- Container](), statsCollector: statsCollect, wg: sync.WaitGroup{}, events: make(chan ContainerEvent), @@ -174,18 +174,18 @@ func (s *ContainerStore) FindContainer(id string, labels ContainerLabels) (Conta if container, ok := s.containers.Load(id); ok { if !container.FullyLoaded { log.Debug().Str("id", id).Msg("container is not fully loaded, fetching it") - if newContainer, ok := s.containers.Compute(id, func(c *Container, loaded bool) (*Container, bool) { + if newContainer, ok := s.containers.Compute(id, func(c *Container, loaded bool) (*Container, xsync.ComputeOp) { if loaded { ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() if newContainer, err := s.client.FindContainer(ctx, id); err == nil { - return &newContainer, false + return &newContainer, xsync.UpdateOp } else { log.Error().Err(err).Msg("failed to fetch container") - return c, false + return c, xsync.CancelOp } } - return c, false + return c, xsync.CancelOp }); ok { go func() { event := ContainerEvent{ @@ -314,7 +314,7 @@ func (s *ContainerStore) init() { case "update": started := false - updatedContainer, _ := s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) { + updatedContainer, _ := s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, xsync.ComputeOp) { if loaded { newContainer := event.Container if newContainer.State == "running" && c.State != "running" { @@ -327,9 +327,9 @@ func (s *ContainerStore) init() { c.FinishedAt = newContainer.FinishedAt c.Created = newContainer.Created c.Host = newContainer.Host - return c, false + return c, xsync.UpdateOp } else { - return c, true + return c, xsync.CancelOp } }) @@ -349,14 +349,14 @@ func (s *ContainerStore) init() { } case "die": - s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) { + s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, xsync.ComputeOp) { if loaded { log.Debug().Str("id", c.ID).Msg("container died") c.State = "exited" c.FinishedAt = time.Now() - return c, false + return c, xsync.UpdateOp } else { - return c, true + return c, xsync.CancelOp } }) case "health_status: healthy", "health_status: unhealthy": @@ -365,24 +365,24 @@ func (s *ContainerStore) init() { healthy = "healthy" } - s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) { + s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, xsync.ComputeOp) { if loaded { log.Debug().Str("id", c.ID).Str("health", healthy).Msg("container health status changed") c.Health = healthy - return c, false + return c, xsync.UpdateOp } else { - return c, true + return c, xsync.CancelOp } }) case "rename": - s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) { + s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, xsync.ComputeOp) { if loaded { log.Debug().Str("id", event.ActorID).Str("name", event.ActorAttributes["name"]).Msg("container renamed") c.Name = event.ActorAttributes["name"] - return c, false + return c, xsync.UpdateOp } else { - return c, true + return c, xsync.CancelOp } }) } diff --git a/internal/docker/stats_collector.go b/internal/docker/stats_collector.go index 47b337d2..f16b4462 100644 --- a/internal/docker/stats_collector.go +++ b/internal/docker/stats_collector.go @@ -9,15 +9,15 @@ import ( "time" "github.com/amir20/dozzle/internal/container" - "github.com/puzpuzpuz/xsync/v3" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog/log" ) type DockerStatsCollector struct { stream chan container.ContainerStat - subscribers *xsync.MapOf[context.Context, chan<- container.ContainerStat] + subscribers *xsync.Map[context.Context, chan<- container.ContainerStat] client container.Client - cancelers *xsync.MapOf[string, context.CancelFunc] + cancelers *xsync.Map[string, context.CancelFunc] stopper context.CancelFunc timer *time.Timer mu sync.Mutex @@ -30,9 +30,9 @@ var timeToStop = 6 * time.Hour func NewDockerStatsCollector(client container.Client, labels container.ContainerLabels) *DockerStatsCollector { return &DockerStatsCollector{ stream: make(chan container.ContainerStat), - subscribers: xsync.NewMapOf[context.Context, chan<- container.ContainerStat](), + subscribers: xsync.NewMap[context.Context, chan<- container.ContainerStat](), client: client, - cancelers: xsync.NewMapOf[string, context.CancelFunc](), + cancelers: xsync.NewMap[string, context.CancelFunc](), labels: labels, } } diff --git a/internal/k8s/stats_collector.go b/internal/k8s/stats_collector.go index d19bab45..902d07e6 100644 --- a/internal/k8s/stats_collector.go +++ b/internal/k8s/stats_collector.go @@ -7,7 +7,7 @@ import ( "time" "github.com/amir20/dozzle/internal/container" - "github.com/puzpuzpuz/xsync/v3" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metricsclient "k8s.io/metrics/pkg/client/clientset/versioned" @@ -18,7 +18,7 @@ var timeToStop = 2 * time.Hour type K8sStatsCollector struct { client *K8sClient metrics *metricsclient.Clientset - subscribers *xsync.MapOf[context.Context, chan<- container.ContainerStat] + subscribers *xsync.Map[context.Context, chan<- container.ContainerStat] stopper context.CancelFunc timer *time.Timer mu sync.Mutex @@ -32,7 +32,7 @@ func NewK8sStatsCollector(client *K8sClient, labels container.ContainerLabels) ( return nil, err } return &K8sStatsCollector{ - subscribers: xsync.NewMapOf[context.Context, chan<- container.ContainerStat](), + subscribers: xsync.NewMap[context.Context, chan<- container.ContainerStat](), client: client, labels: labels, metrics: metricsClient, diff --git a/internal/support/docker/retriable_client_manager.go b/internal/support/docker/retriable_client_manager.go index 37b35f83..d40a2e20 100644 --- a/internal/support/docker/retriable_client_manager.go +++ b/internal/support/docker/retriable_client_manager.go @@ -11,7 +11,7 @@ import ( "github.com/amir20/dozzle/internal/container" container_support "github.com/amir20/dozzle/internal/support/container" - "github.com/puzpuzpuz/xsync/v3" + "github.com/puzpuzpuz/xsync/v4" "github.com/samber/lo" lop "github.com/samber/lo/parallel" @@ -23,7 +23,7 @@ type RetriableClientManager struct { failedAgents []string certs tls.Certificate mu sync.RWMutex - subscribers *xsync.MapOf[context.Context, chan<- container.Host] + subscribers *xsync.Map[context.Context, chan<- container.Host] timeout time.Duration } @@ -74,7 +74,7 @@ func NewRetriableClientManager(agents []string, timeout time.Duration, certs tls clients: clientMap, failedAgents: failed, certs: certs, - subscribers: xsync.NewMapOf[context.Context, chan<- container.Host](), + subscribers: xsync.NewMap[context.Context, chan<- container.Host](), timeout: timeout, } } diff --git a/internal/support/docker/swarm_client_manager.go b/internal/support/docker/swarm_client_manager.go index 2b9bbd70..24d45f03 100644 --- a/internal/support/docker/swarm_client_manager.go +++ b/internal/support/docker/swarm_client_manager.go @@ -14,7 +14,7 @@ import ( "github.com/amir20/dozzle/internal/docker" container_support "github.com/amir20/dozzle/internal/support/container" - "github.com/puzpuzpuz/xsync/v3" + "github.com/puzpuzpuz/xsync/v4" "github.com/samber/lo" lop "github.com/samber/lo/parallel" @@ -25,7 +25,7 @@ type SwarmClientManager struct { clients map[string]container_support.ClientService certs tls.Certificate mu sync.RWMutex - subscribers *xsync.MapOf[context.Context, chan<- container.Host] + subscribers *xsync.Map[context.Context, chan<- container.Host] localClient container.Client localIPs []string name string @@ -75,7 +75,7 @@ func NewSwarmClientManager(localClient *docker.DockerClient, certs tls.Certifica localClient: localClient, clients: clientMap, certs: certs, - subscribers: xsync.NewMapOf[context.Context, chan<- container.Host](), + subscribers: xsync.NewMap[context.Context, chan<- container.Host](), localIPs: localIPs(), name: serviceName, timeout: timeout,