package web import ( "fmt" "net/http" "github.com/goccy/go-json" "github.com/amir20/dozzle/internal/analytics" "github.com/amir20/dozzle/internal/docker" docker_support "github.com/amir20/dozzle/internal/support/docker" "github.com/rs/zerolog/log" ) func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { f, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-transform") w.Header().Add("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") ctx := r.Context() events := make(chan docker.ContainerEvent) stats := make(chan docker.ContainerStat) availableHosts := make(chan docker.Host) h.multiHostService.SubscribeEventsAndStats(ctx, events, stats) h.multiHostService.SubscribeAvailableHosts(ctx, availableHosts) allContainers, errors := h.multiHostService.ListAllContainers() for _, err := range errors { log.Warn().Err(err).Msg("error listing containers") if hostNotAvailableError, ok := err.(*docker_support.HostUnavailableError); ok { bytes, _ := json.Marshal(hostNotAvailableError.Host) if _, err := fmt.Fprintf(w, "event: update-host\ndata: %s\n\n", string(bytes)); err != nil { log.Error().Err(err).Msg("error writing event to event stream") } } } if err := sendContainersJSON(allContainers, w); err != nil { log.Error().Err(err).Msg("error writing containers to event stream") return } f.Flush() go sendBeaconEvent(h, r, len(allContainers)) for { select { case host := <-availableHosts: bytes, _ := json.Marshal(host) if _, err := fmt.Fprintf(w, "event: update-host\ndata: %s\n\n", string(bytes)); err != nil { log.Error().Err(err).Msg("error writing event to event stream") } f.Flush() case stat := <-stats: bytes, _ := json.Marshal(stat) if _, err := fmt.Fprintf(w, "event: container-stat\ndata: %s\n\n", string(bytes)); err != nil { log.Error().Err(err).Msg("error writing event to event stream") return } f.Flush() case event, ok := <-events: if !ok { return } switch event.Name { case "start", "die", "destroy": if event.Name == "start" { log.Debug().Str("container", event.ActorID).Msg("container started") if containers, err := h.multiHostService.ListContainersForHost(event.Host); err == nil { if err := sendContainersJSON(containers, w); err != nil { log.Error().Err(err).Msg("error writing containers to event stream") return } } } bytes, _ := json.Marshal(event) if _, err := fmt.Fprintf(w, "event: container-event\ndata: %s\n\n", string(bytes)); err != nil { log.Error().Err(err).Msg("error writing event to event stream") return } f.Flush() case "health_status: healthy", "health_status: unhealthy": log.Debug().Str("container", event.ActorID).Str("health", event.Name).Msg("container health status") healthy := "unhealthy" if event.Name == "health_status: healthy" { healthy = "healthy" } payload := map[string]string{ "actorId": event.ActorID, "health": healthy, } bytes, _ := json.Marshal(payload) if _, err := fmt.Fprintf(w, "event: container-health\ndata: %s\n\n", string(bytes)); err != nil { log.Error().Err(err).Msg("error writing event to event stream") return } f.Flush() } case <-ctx.Done(): return } } } func sendBeaconEvent(h *handler, r *http.Request, runningContainers int) { if h.config.NoAnalytics { return } b := analytics.BeaconEvent{ AuthProvider: string(h.config.Authorization.Provider), Browser: r.Header.Get("User-Agent"), Clients: h.multiHostService.TotalClients(), HasActions: h.config.EnableActions, HasCustomAddress: h.config.Addr != ":8080", HasCustomBase: h.config.Base != "/", HasHostname: h.config.Hostname != "", Name: "events", RunningContainers: runningContainers, Version: h.config.Version, } local, err := h.multiHostService.LocalHost() if err == nil { b.ServerID = local.ID } if err := analytics.SendBeacon(b); err != nil { log.Debug().Err(err).Msg("error sending beacon") } } func sendContainersJSON(containers []docker.Container, w http.ResponseWriter) error { if _, err := fmt.Fprint(w, "event: containers-changed\ndata: "); err != nil { return err } if err := json.NewEncoder(w).Encode(containers); err != nil { return err } if _, err := fmt.Fprint(w, "\n\n"); err != nil { return err } return nil }