diff --git a/web/events.go b/web/events.go index 7ec30068..ebf147fe 100644 --- a/web/events.go +++ b/web/events.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "sync" "github.com/amir20/dozzle/docker" @@ -30,25 +31,46 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { events := make(chan docker.ContainerEvent) stats := make(chan docker.ContainerStat) - for _, client := range h.clients { - client.Events(ctx, events) - if err := sendContainersJSON(client, w); err != nil { - log.Errorf("error while encoding containers to stream: %v", err) - } - if containers, err := client.ListContainers(); err == nil { + { + wg := sync.WaitGroup{} + wg.Add(len(h.clients)) + results := make(chan []docker.Container, len(h.clients)) + + for _, client := range h.clients { + client.Events(ctx, events) + go func(client docker.Client) { - for _, c := range containers { - if c.State == "running" { - if err := client.ContainerStats(ctx, c.ID, stats); err != nil && !errors.Is(err, context.Canceled) { - log.Errorf("error while streaming container stats: %v", err) + defer wg.Done() + if containers, err := client.ListContainers(); err == nil { + results <- containers + go func(client docker.Client) { + for _, c := range containers { + if c.State == "running" { + if err := client.ContainerStats(ctx, c.ID, stats); err != nil && !errors.Is(err, context.Canceled) { + log.Errorf("error while streaming container stats: %v", err) + } + } } - } + }(client) + } else { + log.Errorf("error while listing containers: %v", err) } }(client) } - } + wg.Wait() + close(results) - f.Flush() + allContainers := []docker.Container{} + for containers := range results { + allContainers = append(allContainers, containers...) + } + + if err := sendContainersJSON(allContainers, w); err != nil { + log.Errorf("error writing containers to event stream: %v", err) + } + + f.Flush() + } for { select { @@ -72,7 +94,11 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { if err := h.clients[event.Host].ContainerStats(ctx, event.ActorID, stats); err != nil && !errors.Is(err, context.Canceled) { log.Errorf("error when streaming new container stats: %v", err) } - if err := sendContainersJSON(h.clients[event.Host], w); err != nil { + containers, err := h.clients[event.Host].ListContainers() + if err != nil { + log.Errorf("error when listing containers: %v", err) + } + if err := sendContainersJSON(containers, w); err != nil { log.Errorf("error encoding containers to stream: %v", err) return } @@ -112,12 +138,7 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { } } -func sendContainersJSON(client docker.Client, w http.ResponseWriter) error { - containers, err := client.ListContainers() - if err != nil { - return err - } - +func sendContainersJSON(containers []docker.Container, w http.ResponseWriter) error { if _, err := fmt.Fprint(w, "event: containers-changed\ndata: "); err != nil { return err }