diff --git a/internal/docker/client.go b/internal/docker/client.go index e2d14f36..7ab46c5f 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -306,8 +306,14 @@ func (d *_client) Events(ctx context.Context, messages chan<- ContainerEvent) <- select { case <-ctx.Done(): return + case err, ok := <-errors: + if !ok { + log.Errorf("docker events channel closed") + } + log.Warnf("error while listening to docker events: %v", err) case message, ok := <-dockerMessages: if !ok { + log.Errorf("docker events channel closed") return } diff --git a/internal/docker/container_store.go b/internal/docker/container_store.go index 9854b5c6..9d3917df 100644 --- a/internal/docker/container_store.go +++ b/internal/docker/container_store.go @@ -10,13 +10,14 @@ type ContainerStore struct { containers map[string]*Container client Client statsCollector *StatsCollector - subscribers []chan ContainerEvent + subscribers map[context.Context]chan ContainerEvent } func NewContainerStore(client Client) *ContainerStore { s := &ContainerStore{ containers: make(map[string]*Container), client: client, + subscribers: make(map[context.Context]chan ContainerEvent), statsCollector: NewStatsCollector(client), } @@ -39,25 +40,12 @@ func (s *ContainerStore) Client() Client { return s.client } -func (s *ContainerStore) Subscribe(events chan ContainerEvent) { - s.subscribers = append(s.subscribers, events) +func (s *ContainerStore) Subscribe(ctx context.Context, events chan ContainerEvent) { + s.subscribers[ctx] = events } -func (s *ContainerStore) Unsubscribe(toRemove chan ContainerEvent) { - for i, sub := range s.subscribers { - if sub == toRemove { - s.subscribers = append(s.subscribers[:i], s.subscribers[i+1:]...) - break - } - } -} - -func (s *ContainerStore) SubscribeStats(stats chan ContainerStat) { - s.statsCollector.Subscribe(stats) -} - -func (s *ContainerStore) UnsubscribeStats(toRemove chan ContainerStat) { - s.statsCollector.Unsubscribe(toRemove) +func (s *ContainerStore) SubscribeStats(ctx context.Context, stats chan ContainerStat) { + s.statsCollector.Subscribe(ctx, stats) } func (s *ContainerStore) init(ctx context.Context) { @@ -75,13 +63,12 @@ func (s *ContainerStore) init(ctx context.Context) { s.client.Events(ctx, events) stats := make(chan ContainerStat) - s.statsCollector.Subscribe(stats) - defer s.statsCollector.Unsubscribe(stats) + s.statsCollector.Subscribe(ctx, stats) for { select { case event := <-events: - log.Debugf("received event: %+v", event) + log.Tracef("received event: %+v", event) switch event.Name { case "start": if container, err := s.client.FindContainer(event.ActorID); err == nil { @@ -107,8 +94,12 @@ func (s *ContainerStore) init(ctx context.Context) { } } - for _, sub := range s.subscribers { - sub <- event + for ctx, sub := range s.subscribers { + select { + case sub <- event: + case <-ctx.Done(): + delete(s.subscribers, ctx) + } } case stat := <-stats: if container, ok := s.containers[stat.ID]; ok { diff --git a/internal/docker/event_generator.go b/internal/docker/event_generator.go index f67d6d7d..8f8b5cca 100644 --- a/internal/docker/event_generator.go +++ b/internal/docker/event_generator.go @@ -90,7 +90,7 @@ func (g *EventGenerator) consumeReader() { if readerError != nil { if readerError != ErrBadHeader { - log.Debugf("reader error: %v", readerError) + log.Tracef("reader error: %v", readerError) g.Errors <- readerError close(g.buffer) break diff --git a/internal/docker/stats_collector.go b/internal/docker/stats_collector.go index 952f42a4..dcbd561e 100644 --- a/internal/docker/stats_collector.go +++ b/internal/docker/stats_collector.go @@ -10,7 +10,7 @@ import ( type StatsCollector struct { stream chan ContainerStat - subscribers []chan ContainerStat + subscribers map[context.Context]chan ContainerStat client Client cancelers map[string]context.CancelFunc } @@ -18,23 +18,14 @@ type StatsCollector struct { func NewStatsCollector(client Client) *StatsCollector { return &StatsCollector{ stream: make(chan ContainerStat), - subscribers: []chan ContainerStat{}, + subscribers: make(map[context.Context]chan ContainerStat), client: client, cancelers: make(map[string]context.CancelFunc), } } -func (c *StatsCollector) Subscribe(stats chan ContainerStat) { - c.subscribers = append(c.subscribers, stats) -} - -func (c *StatsCollector) Unsubscribe(subscriber chan ContainerStat) { - for i, s := range c.subscribers { - if s == subscriber { - c.subscribers = append(c.subscribers[:i], c.subscribers[i+1:]...) - break - } - } +func (c *StatsCollector) Subscribe(ctx context.Context, stats chan ContainerStat) { + c.subscribers[ctx] = stats } func (sc *StatsCollector) StartCollecting(ctx context.Context) { @@ -84,8 +75,12 @@ func (sc *StatsCollector) StartCollecting(ctx context.Context) { case <-ctx.Done(): return case stat := <-sc.stream: - for _, subscriber := range sc.subscribers { - subscriber <- stat + for c, sub := range sc.subscribers { + select { + case sub <- stat: + case <-c.Done(): + delete(sc.subscribers, c) + } } } } diff --git a/internal/web/events.go b/internal/web/events.go index 1cbc3090..5351e67c 100644 --- a/internal/web/events.go +++ b/internal/web/events.go @@ -44,17 +44,10 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { for _, store := range h.stores { allContainers = append(allContainers, store.List()...) - store.SubscribeStats(stats) - store.Subscribe(events) + store.SubscribeStats(ctx, stats) + store.Subscribe(ctx, events) } - defer func() { - for _, store := range h.stores { - store.UnsubscribeStats(stats) - store.Unsubscribe(events) - } - }() - if err := sendContainersJSON(allContainers, w); err != nil { log.Errorf("error writing containers to event stream: %v", err) }