mirror of
https://github.com/amir20/dozzle.git
synced 2025-12-30 17:47:28 +01:00
fix: fixes a potential deadlock when reading stats (#2745)
This commit is contained in:
@@ -306,8 +306,14 @@ func (d *_client) Events(ctx context.Context, messages chan<- ContainerEvent) <-
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err, ok := <-errors:
|
||||
if !ok {
|
||||
log.Errorf("docker events channel closed")
|
||||
}
|
||||
log.Warnf("error while listening to docker events: %v", err)
|
||||
case message, ok := <-dockerMessages:
|
||||
if !ok {
|
||||
log.Errorf("docker events channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -10,13 +10,14 @@ type ContainerStore struct {
|
||||
containers map[string]*Container
|
||||
client Client
|
||||
statsCollector *StatsCollector
|
||||
subscribers []chan ContainerEvent
|
||||
subscribers map[context.Context]chan ContainerEvent
|
||||
}
|
||||
|
||||
func NewContainerStore(client Client) *ContainerStore {
|
||||
s := &ContainerStore{
|
||||
containers: make(map[string]*Container),
|
||||
client: client,
|
||||
subscribers: make(map[context.Context]chan ContainerEvent),
|
||||
statsCollector: NewStatsCollector(client),
|
||||
}
|
||||
|
||||
@@ -39,25 +40,12 @@ func (s *ContainerStore) Client() Client {
|
||||
return s.client
|
||||
}
|
||||
|
||||
func (s *ContainerStore) Subscribe(events chan ContainerEvent) {
|
||||
s.subscribers = append(s.subscribers, events)
|
||||
func (s *ContainerStore) Subscribe(ctx context.Context, events chan ContainerEvent) {
|
||||
s.subscribers[ctx] = events
|
||||
}
|
||||
|
||||
func (s *ContainerStore) Unsubscribe(toRemove chan ContainerEvent) {
|
||||
for i, sub := range s.subscribers {
|
||||
if sub == toRemove {
|
||||
s.subscribers = append(s.subscribers[:i], s.subscribers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ContainerStore) SubscribeStats(stats chan ContainerStat) {
|
||||
s.statsCollector.Subscribe(stats)
|
||||
}
|
||||
|
||||
func (s *ContainerStore) UnsubscribeStats(toRemove chan ContainerStat) {
|
||||
s.statsCollector.Unsubscribe(toRemove)
|
||||
func (s *ContainerStore) SubscribeStats(ctx context.Context, stats chan ContainerStat) {
|
||||
s.statsCollector.Subscribe(ctx, stats)
|
||||
}
|
||||
|
||||
func (s *ContainerStore) init(ctx context.Context) {
|
||||
@@ -75,13 +63,12 @@ func (s *ContainerStore) init(ctx context.Context) {
|
||||
s.client.Events(ctx, events)
|
||||
|
||||
stats := make(chan ContainerStat)
|
||||
s.statsCollector.Subscribe(stats)
|
||||
defer s.statsCollector.Unsubscribe(stats)
|
||||
s.statsCollector.Subscribe(ctx, stats)
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-events:
|
||||
log.Debugf("received event: %+v", event)
|
||||
log.Tracef("received event: %+v", event)
|
||||
switch event.Name {
|
||||
case "start":
|
||||
if container, err := s.client.FindContainer(event.ActorID); err == nil {
|
||||
@@ -107,8 +94,12 @@ func (s *ContainerStore) init(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, sub := range s.subscribers {
|
||||
sub <- event
|
||||
for ctx, sub := range s.subscribers {
|
||||
select {
|
||||
case sub <- event:
|
||||
case <-ctx.Done():
|
||||
delete(s.subscribers, ctx)
|
||||
}
|
||||
}
|
||||
case stat := <-stats:
|
||||
if container, ok := s.containers[stat.ID]; ok {
|
||||
|
||||
@@ -90,7 +90,7 @@ func (g *EventGenerator) consumeReader() {
|
||||
|
||||
if readerError != nil {
|
||||
if readerError != ErrBadHeader {
|
||||
log.Debugf("reader error: %v", readerError)
|
||||
log.Tracef("reader error: %v", readerError)
|
||||
g.Errors <- readerError
|
||||
close(g.buffer)
|
||||
break
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
type StatsCollector struct {
|
||||
stream chan ContainerStat
|
||||
subscribers []chan ContainerStat
|
||||
subscribers map[context.Context]chan ContainerStat
|
||||
client Client
|
||||
cancelers map[string]context.CancelFunc
|
||||
}
|
||||
@@ -18,23 +18,14 @@ type StatsCollector struct {
|
||||
func NewStatsCollector(client Client) *StatsCollector {
|
||||
return &StatsCollector{
|
||||
stream: make(chan ContainerStat),
|
||||
subscribers: []chan ContainerStat{},
|
||||
subscribers: make(map[context.Context]chan ContainerStat),
|
||||
client: client,
|
||||
cancelers: make(map[string]context.CancelFunc),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *StatsCollector) Subscribe(stats chan ContainerStat) {
|
||||
c.subscribers = append(c.subscribers, stats)
|
||||
}
|
||||
|
||||
func (c *StatsCollector) Unsubscribe(subscriber chan ContainerStat) {
|
||||
for i, s := range c.subscribers {
|
||||
if s == subscriber {
|
||||
c.subscribers = append(c.subscribers[:i], c.subscribers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
func (c *StatsCollector) Subscribe(ctx context.Context, stats chan ContainerStat) {
|
||||
c.subscribers[ctx] = stats
|
||||
}
|
||||
|
||||
func (sc *StatsCollector) StartCollecting(ctx context.Context) {
|
||||
@@ -84,8 +75,12 @@ func (sc *StatsCollector) StartCollecting(ctx context.Context) {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case stat := <-sc.stream:
|
||||
for _, subscriber := range sc.subscribers {
|
||||
subscriber <- stat
|
||||
for c, sub := range sc.subscribers {
|
||||
select {
|
||||
case sub <- stat:
|
||||
case <-c.Done():
|
||||
delete(sc.subscribers, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,17 +44,10 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
for _, store := range h.stores {
|
||||
allContainers = append(allContainers, store.List()...)
|
||||
store.SubscribeStats(stats)
|
||||
store.Subscribe(events)
|
||||
store.SubscribeStats(ctx, stats)
|
||||
store.Subscribe(ctx, events)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
for _, store := range h.stores {
|
||||
store.UnsubscribeStats(stats)
|
||||
store.Unsubscribe(events)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := sendContainersJSON(allContainers, w); err != nil {
|
||||
log.Errorf("error writing containers to event stream: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user