From a5ffd888840a88e63e2bd405d8f373e2353f0ad6 Mon Sep 17 00:00:00 2001 From: Amir Raminfar Date: Sat, 7 Sep 2024 15:11:09 -0700 Subject: [PATCH] fix: improves search by backfilling on first search (#3261) --- assets/composable/eventStreams.ts | 21 +++-- internal/docker/types.go | 4 + internal/support/web/sse.go | 101 ++++++++++++++++++++ internal/utils/time.go | 17 ++++ internal/web/__snapshots__/web.snapshot | 2 + internal/web/events.go | 68 ++++---------- internal/web/logs.go | 119 +++++++++++++++--------- 7 files changed, 230 insertions(+), 102 deletions(-) create mode 100644 internal/support/web/sse.go create mode 100644 internal/utils/time.go diff --git a/assets/composable/eventStreams.ts b/assets/composable/eventStreams.ts index 23c990ad..d6cc68f5 100644 --- a/assets/composable/eventStreams.ts +++ b/assets/composable/eventStreams.ts @@ -79,16 +79,11 @@ function useLogStream(url: Ref, loadMoreUrl?: Ref) { buffer.value = []; } } else { - let empty = false; if (messages.value.length == 0) { // sort the buffer the very first time because of multiple logs in parallel buffer.value.sort((a, b) => a.date.getTime() - b.date.getTime()); - empty = true; } messages.value = [...messages.value, ...buffer.value]; - if (isSearching && messages.value.length < 90 && empty) { - loadOlderLogs(); - } buffer.value = []; } } @@ -141,6 +136,13 @@ function useLogStream(url: Ref, loadMoreUrl?: Ref) { flushBuffer.flush(); }); + + es.addEventListener("logs-backfill", (e) => { + const data = JSON.parse((e as MessageEvent).data) as LogEvent[]; + const logs = data.map((e) => asLogEntry(e)); + messages.value = [...logs, ...messages.value]; + }); + es.onmessage = (e) => { if (e.data) { buffer.value = [...buffer.value, parseMessage(e.data)]; @@ -167,11 +169,12 @@ function useLogStream(url: Ref, loadMoreUrl?: Ref) { const signal = abortController.signal; fetchingInProgress = true; try { - const stopWatcher = watchOnce(url, () => abortController.abort("stream changed")); const moreParams = { ...params.value, from: from.toISOString(), to: to.toISOString(), minimum: "100" }; - const logs = await ( - await fetch(withBase(`${loadMoreUrl.value}?${new URLSearchParams(moreParams).toString()}`), { signal }) - ).text(); + const urlWithMoreParams = computed(() => + withBase(`${loadMoreUrl.value}?${new URLSearchParams(moreParams).toString()}`), + ); + const stopWatcher = watchOnce(urlWithMoreParams, () => abortController.abort("stream changed")); + const logs = await (await fetch(urlWithMoreParams.value, { signal })).text(); stopWatcher(); if (logs && signal.aborted === false) { diff --git a/internal/docker/types.go b/internal/docker/types.go index 0befd9bd..a0196718 100644 --- a/internal/docker/types.go +++ b/internal/docker/types.go @@ -83,3 +83,7 @@ func (l *LogEvent) HasLevel() bool { func (l *LogEvent) IsCloseToTime(other *LogEvent) bool { return math.Abs(float64(l.Timestamp-other.Timestamp)) < 10 } + +func (l *LogEvent) MessageId() int64 { + return l.Timestamp +} diff --git a/internal/support/web/sse.go b/internal/support/web/sse.go new file mode 100644 index 00000000..52854202 --- /dev/null +++ b/internal/support/web/sse.go @@ -0,0 +1,101 @@ +package support_web + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +type SSEWriter struct { + f http.Flusher + w http.ResponseWriter +} + +type HasId interface { + MessageId() int64 +} + +func NewSSEWriter(ctx context.Context, w http.ResponseWriter) (*SSEWriter, error) { + f, ok := w.(http.Flusher) + + if !ok { + return nil, http.ErrNotSupported + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-transform") + w.Header().Add("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + sse := &SSEWriter{ + f: f, + w: w, + } + + return sse, nil +} + +func (s *SSEWriter) Write(data []byte) (int, error) { + written, err := s.w.Write(data) + if err != nil { + return written, err + } + + _, err = s.w.Write([]byte("\n\n")) + if err != nil { + return written, err + } + + s.f.Flush() + + return written, nil +} + +func (s *SSEWriter) Ping() error { + _, err := s.Write([]byte(":ping ")) + return err +} + +func (s *SSEWriter) Message(data any) error { + encoded, err := json.Marshal(data) + + if err != nil { + return err + } + + buffer := bytes.Buffer{} + + buffer.WriteString("data: ") + buffer.Write(encoded) + buffer.WriteString("\n") + + if f, ok := data.(HasId); ok { + if f.MessageId() > 0 { + buffer.WriteString(fmt.Sprintf("id: %d\n", f.MessageId())) + } + } + + _, err = buffer.WriteTo(s) + return err +} + +func (s *SSEWriter) Event(event string, data any) error { + encoded, err := json.Marshal(data) + + if err != nil { + return err + } + + buffer := bytes.Buffer{} + + buffer.WriteString("event: " + event + "\n") + buffer.WriteString("data: ") + buffer.Write(encoded) + buffer.WriteString("\n") + + _, err = buffer.WriteTo(s) + return err +} diff --git a/internal/utils/time.go b/internal/utils/time.go new file mode 100644 index 00000000..02c74fc0 --- /dev/null +++ b/internal/utils/time.go @@ -0,0 +1,17 @@ +package utils + +import "time" + +func Min(a, b time.Time) time.Time { + if a.Before(b) { + return a + } + return b +} + +func Max(a, b time.Time) time.Time { + if a.After(b) { + return a + } + return b +} diff --git a/internal/web/__snapshots__/web.snapshot b/internal/web/__snapshots__/web.snapshot index 36c05c37..dece87a5 100644 --- a/internal/web/__snapshots__/web.snapshot +++ b/internal/web/__snapshots__/web.snapshot @@ -187,6 +187,7 @@ X-Accel-Buffering: no data: {"m":"INFO Testing logs...","ts":0,"id":4256192898,"l":"info","s":"stdout","c":"123456"} + event: container-event data: {"actorId":"123456","name":"container-stopped","host":"localhost"} @@ -216,5 +217,6 @@ X-Accel-Buffering: no data: {"m":"INFO Testing logs...","ts":1589396137772,"id":1469707724,"l":"info","s":"stdout","c":"123456"} id: 1589396137772 + event: container-event data: {"actorId":"123456","name":"container-stopped","host":"localhost"} \ No newline at end of file diff --git a/internal/web/events.go b/internal/web/events.go index 133ce23e..81aeaa3c 100644 --- a/internal/web/events.go +++ b/internal/web/events.go @@ -1,30 +1,23 @@ package web import ( - "fmt" "net/http" - "github.com/goccy/go-json" - "github.com/amir20/dozzle/internal/analytics" "github.com/amir20/dozzle/internal/docker" docker_support "github.com/amir20/dozzle/internal/support/docker" + support_web "github.com/amir20/dozzle/internal/support/web" "github.com/rs/zerolog/log" ) func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { - f, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + sseWriter, err := support_web.NewSSEWriter(r.Context(), w) + if err != nil { + log.Error().Err(err).Msg("error creating sse writer") + http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-transform") - w.Header().Add("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("X-Accel-Buffering", "no") - ctx := r.Context() events := make(chan docker.ContainerEvent) stats := make(chan docker.ContainerStat) @@ -38,37 +31,30 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { for _, err := range errors { log.Warn().Err(err).Msg("error listing containers") if hostNotAvailableError, ok := err.(*docker_support.HostUnavailableError); ok { - bytes, _ := json.Marshal(hostNotAvailableError.Host) - if _, err := fmt.Fprintf(w, "event: update-host\ndata: %s\n\n", string(bytes)); err != nil { + if err := sseWriter.Event("update-host", hostNotAvailableError.Host); err != nil { log.Error().Err(err).Msg("error writing event to event stream") } } } - if err := sendContainersJSON(allContainers, w); err != nil { + if err := sseWriter.Event("containers-changed", allContainers); err != nil { log.Error().Err(err).Msg("error writing containers to event stream") - return } - f.Flush() - go sendBeaconEvent(h, r, len(allContainers)) for { select { case host := <-availableHosts: - bytes, _ := json.Marshal(host) - if _, err := fmt.Fprintf(w, "event: update-host\ndata: %s\n\n", string(bytes)); err != nil { - log.Error().Err(err).Msg("error writing event to event stream") - } - f.Flush() - case stat := <-stats: - bytes, _ := json.Marshal(stat) - if _, err := fmt.Fprintf(w, "event: container-stat\ndata: %s\n\n", string(bytes)); err != nil { + if err := sseWriter.Event("update-host", host); err != nil { + log.Error().Err(err).Msg("error writing event to event stream") + return + } + case stat := <-stats: + if err := sseWriter.Event("container-stat", stat); err != nil { log.Error().Err(err).Msg("error writing event to event stream") return } - f.Flush() case event, ok := <-events: if !ok { return @@ -78,21 +64,18 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { if event.Name == "start" { log.Debug().Str("container", event.ActorID).Msg("container started") if containers, err := h.multiHostService.ListContainersForHost(event.Host); err == nil { - if err := sendContainersJSON(containers, w); err != nil { + if err := sseWriter.Event("containers-changed", containers); err != nil { log.Error().Err(err).Msg("error writing containers to event stream") return } } } - bytes, _ := json.Marshal(event) - if _, err := fmt.Fprintf(w, "event: container-event\ndata: %s\n\n", string(bytes)); err != nil { + if err := sseWriter.Event("container-event", event); err != nil { log.Error().Err(err).Msg("error writing event to event stream") return } - f.Flush() - case "health_status: healthy", "health_status: unhealthy": log.Debug().Str("container", event.ActorID).Str("health", event.Name).Msg("container health status") healthy := "unhealthy" @@ -103,12 +86,11 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { "actorId": event.ActorID, "health": healthy, } - bytes, _ := json.Marshal(payload) - if _, err := fmt.Fprintf(w, "event: container-health\ndata: %s\n\n", string(bytes)); err != nil { + + if err := sseWriter.Event("container-health", payload); err != nil { log.Error().Err(err).Msg("error writing event to event stream") return } - f.Flush() } case <-ctx.Done(): return @@ -142,19 +124,3 @@ func sendBeaconEvent(h *handler, r *http.Request, runningContainers int) { log.Debug().Err(err).Msg("error sending beacon") } } - -func sendContainersJSON(containers []docker.Container, w http.ResponseWriter) error { - if _, err := fmt.Fprint(w, "event: containers-changed\ndata: "); err != nil { - return err - } - - if err := json.NewEncoder(w).Encode(containers); err != nil { - return err - } - - if _, err := fmt.Fprint(w, "\n\n"); err != nil { - return err - } - - return nil -} diff --git a/internal/web/logs.go b/internal/web/logs.go index 87fc5fe8..e1cce455 100644 --- a/internal/web/logs.go +++ b/internal/web/logs.go @@ -5,6 +5,7 @@ import ( "context" "errors" "regexp" + "sort" "strconv" "strings" @@ -19,6 +20,7 @@ import ( "github.com/amir20/dozzle/internal/docker" "github.com/amir20/dozzle/internal/support/search" + support_web "github.com/amir20/dozzle/internal/support/web" "github.com/amir20/dozzle/internal/utils" "github.com/docker/docker/pkg/stdcopy" "github.com/dustin/go-humanize" @@ -234,35 +236,83 @@ func streamLogsForContainers(w http.ResponseWriter, r *http.Request, multiHostCl return } - f, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + sseWriter, err := support_web.NewSSEWriter(r.Context(), w) + if err != nil { + log.Error().Err(err).Msg("error creating sse writer") + http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-transform") - w.Header().Add("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("X-Accel-Buffering", "no") - - logs := make(chan *docker.LogEvent) - events := make(chan *docker.ContainerEvent, 1) - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() existingContainers, errs := multiHostClient.ListAllContainersFiltered(filter) if len(errs) > 0 { log.Warn().Err(errs[0]).Msg("error while listing containers") } + absoluteTime := time.Time{} + var regex *regexp.Regexp + liveLogs := make(chan *docker.LogEvent) + events := make(chan *docker.ContainerEvent, 1) + backfill := make(chan []*docker.LogEvent) + + if r.URL.Query().Has("filter") { + var err error + regex, err = search.ParseRegex(r.URL.Query().Get("filter")) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + absoluteTime = time.Now() + + go func() { + minimum := 50 + to := absoluteTime + for minimum > 0 { + events := make([]*docker.LogEvent, 0) + for _, container := range existingContainers { + containerService, err := multiHostClient.FindContainer(container.Host, container.ID) + if err != nil { + log.Error().Err(err).Msg("error while finding container") + return + } + + if to.Before(containerService.Container.Created) { + continue + } + + logs, err := containerService.LogsBetweenDates(r.Context(), to.Add(-100*time.Second), to, stdTypes) + if err != nil { + log.Error().Err(err).Msg("error while fetching logs") + return + } + + for log := range logs { + if search.Search(regex, log) { + events = append(events, log) + } + } + } + + to = to.Add(-100 * time.Second) + minimum -= len(events) + + sort.Slice(events, func(i, j int) bool { + return events[i].Timestamp < events[j].Timestamp + }) + + backfill <- events + } + }() + } + streamLogs := func(container docker.Container) { containerService, err := multiHostClient.FindContainer(container.Host, container.ID) if err != nil { log.Error().Err(err).Msg("error while finding container") return } - err = containerService.StreamLogs(r.Context(), container.StartedAt, stdTypes, logs) + start := utils.Max(absoluteTime, container.StartedAt) + err = containerService.StreamLogs(r.Context(), start, stdTypes, liveLogs) if err != nil { if errors.Is(err, io.EOF) { log.Debug().Str("container", container.ID).Msg("streaming ended") @@ -280,50 +330,35 @@ func streamLogsForContainers(w http.ResponseWriter, r *http.Request, multiHostCl newContainers := make(chan docker.Container) multiHostClient.SubscribeContainersStarted(r.Context(), newContainers, filter) - var regex *regexp.Regexp - if r.URL.Query().Has("filter") { - var err error - regex, err = search.ParseRegex(r.URL.Query().Get("filter")) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - } + ticker := time.NewTicker(5 * time.Second) loop: for { select { - case logEvent := <-logs: + case logEvent := <-liveLogs: if regex != nil { if !search.Search(regex, logEvent) { continue } } - if buf, err := json.Marshal(logEvent); err != nil { - log.Error().Err(err).Msg("error encoding log event") - } else { - fmt.Fprintf(w, "data: %s\n", buf) - } - if logEvent.Timestamp > 0 { - fmt.Fprintf(w, "id: %d\n", logEvent.Timestamp) - } - fmt.Fprintf(w, "\n") - f.Flush() - case <-ticker.C: - fmt.Fprintf(w, ":ping \n\n") - f.Flush() + sseWriter.Message(logEvent) case container := <-newContainers: events <- &docker.ContainerEvent{ActorID: container.ID, Name: "container-started", Host: container.Host} go streamLogs(container) case event := <-events: log.Debug().Str("event", event.Name).Str("container", event.ActorID).Msg("received event") - if buf, err := json.Marshal(event); err != nil { + if err := sseWriter.Event("container-event", event); err != nil { log.Error().Err(err).Msg("error encoding container event") - } else { - fmt.Fprintf(w, "event: container-event\ndata: %s\n\n", buf) - f.Flush() } + case backfillEvents := <-backfill: + if err := sseWriter.Event("logs-backfill", backfillEvents); err != nil { + log.Error().Err(err).Msg("error encoding container event") + } + + case <-ticker.C: + sseWriter.Ping() + case <-r.Context().Done(): break loop }