1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-21 21:33:18 +01:00

fix: improves search by backfilling on first search (#3261)

This commit is contained in:
Amir Raminfar
2024-09-07 15:11:09 -07:00
committed by GitHub
parent 328a277cff
commit a5ffd88884
7 changed files with 230 additions and 102 deletions

View File

@@ -79,16 +79,11 @@ function useLogStream(url: Ref<string>, loadMoreUrl?: Ref<string>) {
buffer.value = [];
}
} else {
let empty = false;
if (messages.value.length == 0) {
// sort the buffer the very first time because of multiple logs in parallel
buffer.value.sort((a, b) => a.date.getTime() - b.date.getTime());
empty = true;
}
messages.value = [...messages.value, ...buffer.value];
if (isSearching && messages.value.length < 90 && empty) {
loadOlderLogs();
}
buffer.value = [];
}
}
@@ -141,6 +136,13 @@ function useLogStream(url: Ref<string>, loadMoreUrl?: Ref<string>) {
flushBuffer.flush();
});
es.addEventListener("logs-backfill", (e) => {
const data = JSON.parse((e as MessageEvent).data) as LogEvent[];
const logs = data.map((e) => asLogEntry(e));
messages.value = [...logs, ...messages.value];
});
es.onmessage = (e) => {
if (e.data) {
buffer.value = [...buffer.value, parseMessage(e.data)];
@@ -167,11 +169,12 @@ function useLogStream(url: Ref<string>, loadMoreUrl?: Ref<string>) {
const signal = abortController.signal;
fetchingInProgress = true;
try {
const stopWatcher = watchOnce(url, () => abortController.abort("stream changed"));
const moreParams = { ...params.value, from: from.toISOString(), to: to.toISOString(), minimum: "100" };
const logs = await (
await fetch(withBase(`${loadMoreUrl.value}?${new URLSearchParams(moreParams).toString()}`), { signal })
).text();
const urlWithMoreParams = computed(() =>
withBase(`${loadMoreUrl.value}?${new URLSearchParams(moreParams).toString()}`),
);
const stopWatcher = watchOnce(urlWithMoreParams, () => abortController.abort("stream changed"));
const logs = await (await fetch(urlWithMoreParams.value, { signal })).text();
stopWatcher();
if (logs && signal.aborted === false) {

View File

@@ -83,3 +83,7 @@ func (l *LogEvent) HasLevel() bool {
func (l *LogEvent) IsCloseToTime(other *LogEvent) bool {
return math.Abs(float64(l.Timestamp-other.Timestamp)) < 10
}
func (l *LogEvent) MessageId() int64 {
return l.Timestamp
}

101
internal/support/web/sse.go Normal file
View File

@@ -0,0 +1,101 @@
package support_web
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
)
type SSEWriter struct {
f http.Flusher
w http.ResponseWriter
}
type HasId interface {
MessageId() int64
}
func NewSSEWriter(ctx context.Context, w http.ResponseWriter) (*SSEWriter, error) {
f, ok := w.(http.Flusher)
if !ok {
return nil, http.ErrNotSupported
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-transform")
w.Header().Add("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
sse := &SSEWriter{
f: f,
w: w,
}
return sse, nil
}
func (s *SSEWriter) Write(data []byte) (int, error) {
written, err := s.w.Write(data)
if err != nil {
return written, err
}
_, err = s.w.Write([]byte("\n\n"))
if err != nil {
return written, err
}
s.f.Flush()
return written, nil
}
func (s *SSEWriter) Ping() error {
_, err := s.Write([]byte(":ping "))
return err
}
func (s *SSEWriter) Message(data any) error {
encoded, err := json.Marshal(data)
if err != nil {
return err
}
buffer := bytes.Buffer{}
buffer.WriteString("data: ")
buffer.Write(encoded)
buffer.WriteString("\n")
if f, ok := data.(HasId); ok {
if f.MessageId() > 0 {
buffer.WriteString(fmt.Sprintf("id: %d\n", f.MessageId()))
}
}
_, err = buffer.WriteTo(s)
return err
}
func (s *SSEWriter) Event(event string, data any) error {
encoded, err := json.Marshal(data)
if err != nil {
return err
}
buffer := bytes.Buffer{}
buffer.WriteString("event: " + event + "\n")
buffer.WriteString("data: ")
buffer.Write(encoded)
buffer.WriteString("\n")
_, err = buffer.WriteTo(s)
return err
}

17
internal/utils/time.go Normal file
View File

@@ -0,0 +1,17 @@
package utils
import "time"
func Min(a, b time.Time) time.Time {
if a.Before(b) {
return a
}
return b
}
func Max(a, b time.Time) time.Time {
if a.After(b) {
return a
}
return b
}

View File

@@ -187,6 +187,7 @@ X-Accel-Buffering: no
data: {"m":"INFO Testing logs...","ts":0,"id":4256192898,"l":"info","s":"stdout","c":"123456"}
event: container-event
data: {"actorId":"123456","name":"container-stopped","host":"localhost"}
@@ -216,5 +217,6 @@ X-Accel-Buffering: no
data: {"m":"INFO Testing logs...","ts":1589396137772,"id":1469707724,"l":"info","s":"stdout","c":"123456"}
id: 1589396137772
event: container-event
data: {"actorId":"123456","name":"container-stopped","host":"localhost"}

View File

@@ -1,30 +1,23 @@
package web
import (
"fmt"
"net/http"
"github.com/goccy/go-json"
"github.com/amir20/dozzle/internal/analytics"
"github.com/amir20/dozzle/internal/docker"
docker_support "github.com/amir20/dozzle/internal/support/docker"
support_web "github.com/amir20/dozzle/internal/support/web"
"github.com/rs/zerolog/log"
)
func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
sseWriter, err := support_web.NewSSEWriter(r.Context(), w)
if err != nil {
log.Error().Err(err).Msg("error creating sse writer")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-transform")
w.Header().Add("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
ctx := r.Context()
events := make(chan docker.ContainerEvent)
stats := make(chan docker.ContainerStat)
@@ -38,37 +31,30 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
for _, err := range errors {
log.Warn().Err(err).Msg("error listing containers")
if hostNotAvailableError, ok := err.(*docker_support.HostUnavailableError); ok {
bytes, _ := json.Marshal(hostNotAvailableError.Host)
if _, err := fmt.Fprintf(w, "event: update-host\ndata: %s\n\n", string(bytes)); err != nil {
if err := sseWriter.Event("update-host", hostNotAvailableError.Host); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
}
}
}
if err := sendContainersJSON(allContainers, w); err != nil {
if err := sseWriter.Event("containers-changed", allContainers); err != nil {
log.Error().Err(err).Msg("error writing containers to event stream")
return
}
f.Flush()
go sendBeaconEvent(h, r, len(allContainers))
for {
select {
case host := <-availableHosts:
bytes, _ := json.Marshal(host)
if _, err := fmt.Fprintf(w, "event: update-host\ndata: %s\n\n", string(bytes)); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
}
f.Flush()
case stat := <-stats:
bytes, _ := json.Marshal(stat)
if _, err := fmt.Fprintf(w, "event: container-stat\ndata: %s\n\n", string(bytes)); err != nil {
if err := sseWriter.Event("update-host", host); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
return
}
case stat := <-stats:
if err := sseWriter.Event("container-stat", stat); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
return
}
f.Flush()
case event, ok := <-events:
if !ok {
return
@@ -78,21 +64,18 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
if event.Name == "start" {
log.Debug().Str("container", event.ActorID).Msg("container started")
if containers, err := h.multiHostService.ListContainersForHost(event.Host); err == nil {
if err := sendContainersJSON(containers, w); err != nil {
if err := sseWriter.Event("containers-changed", containers); err != nil {
log.Error().Err(err).Msg("error writing containers to event stream")
return
}
}
}
bytes, _ := json.Marshal(event)
if _, err := fmt.Fprintf(w, "event: container-event\ndata: %s\n\n", string(bytes)); err != nil {
if err := sseWriter.Event("container-event", event); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
return
}
f.Flush()
case "health_status: healthy", "health_status: unhealthy":
log.Debug().Str("container", event.ActorID).Str("health", event.Name).Msg("container health status")
healthy := "unhealthy"
@@ -103,12 +86,11 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
"actorId": event.ActorID,
"health": healthy,
}
bytes, _ := json.Marshal(payload)
if _, err := fmt.Fprintf(w, "event: container-health\ndata: %s\n\n", string(bytes)); err != nil {
if err := sseWriter.Event("container-health", payload); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
return
}
f.Flush()
}
case <-ctx.Done():
return
@@ -142,19 +124,3 @@ func sendBeaconEvent(h *handler, r *http.Request, runningContainers int) {
log.Debug().Err(err).Msg("error sending beacon")
}
}
func sendContainersJSON(containers []docker.Container, w http.ResponseWriter) error {
if _, err := fmt.Fprint(w, "event: containers-changed\ndata: "); err != nil {
return err
}
if err := json.NewEncoder(w).Encode(containers); err != nil {
return err
}
if _, err := fmt.Fprint(w, "\n\n"); err != nil {
return err
}
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"errors"
"regexp"
"sort"
"strconv"
"strings"
@@ -19,6 +20,7 @@ import (
"github.com/amir20/dozzle/internal/docker"
"github.com/amir20/dozzle/internal/support/search"
support_web "github.com/amir20/dozzle/internal/support/web"
"github.com/amir20/dozzle/internal/utils"
"github.com/docker/docker/pkg/stdcopy"
"github.com/dustin/go-humanize"
@@ -234,35 +236,83 @@ func streamLogsForContainers(w http.ResponseWriter, r *http.Request, multiHostCl
return
}
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
sseWriter, err := support_web.NewSSEWriter(r.Context(), w)
if err != nil {
log.Error().Err(err).Msg("error creating sse writer")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-transform")
w.Header().Add("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
logs := make(chan *docker.LogEvent)
events := make(chan *docker.ContainerEvent, 1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
existingContainers, errs := multiHostClient.ListAllContainersFiltered(filter)
if len(errs) > 0 {
log.Warn().Err(errs[0]).Msg("error while listing containers")
}
absoluteTime := time.Time{}
var regex *regexp.Regexp
liveLogs := make(chan *docker.LogEvent)
events := make(chan *docker.ContainerEvent, 1)
backfill := make(chan []*docker.LogEvent)
if r.URL.Query().Has("filter") {
var err error
regex, err = search.ParseRegex(r.URL.Query().Get("filter"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
absoluteTime = time.Now()
go func() {
minimum := 50
to := absoluteTime
for minimum > 0 {
events := make([]*docker.LogEvent, 0)
for _, container := range existingContainers {
containerService, err := multiHostClient.FindContainer(container.Host, container.ID)
if err != nil {
log.Error().Err(err).Msg("error while finding container")
return
}
if to.Before(containerService.Container.Created) {
continue
}
logs, err := containerService.LogsBetweenDates(r.Context(), to.Add(-100*time.Second), to, stdTypes)
if err != nil {
log.Error().Err(err).Msg("error while fetching logs")
return
}
for log := range logs {
if search.Search(regex, log) {
events = append(events, log)
}
}
}
to = to.Add(-100 * time.Second)
minimum -= len(events)
sort.Slice(events, func(i, j int) bool {
return events[i].Timestamp < events[j].Timestamp
})
backfill <- events
}
}()
}
streamLogs := func(container docker.Container) {
containerService, err := multiHostClient.FindContainer(container.Host, container.ID)
if err != nil {
log.Error().Err(err).Msg("error while finding container")
return
}
err = containerService.StreamLogs(r.Context(), container.StartedAt, stdTypes, logs)
start := utils.Max(absoluteTime, container.StartedAt)
err = containerService.StreamLogs(r.Context(), start, stdTypes, liveLogs)
if err != nil {
if errors.Is(err, io.EOF) {
log.Debug().Str("container", container.ID).Msg("streaming ended")
@@ -280,50 +330,35 @@ func streamLogsForContainers(w http.ResponseWriter, r *http.Request, multiHostCl
newContainers := make(chan docker.Container)
multiHostClient.SubscribeContainersStarted(r.Context(), newContainers, filter)
var regex *regexp.Regexp
if r.URL.Query().Has("filter") {
var err error
regex, err = search.ParseRegex(r.URL.Query().Get("filter"))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
ticker := time.NewTicker(5 * time.Second)
loop:
for {
select {
case logEvent := <-logs:
case logEvent := <-liveLogs:
if regex != nil {
if !search.Search(regex, logEvent) {
continue
}
}
if buf, err := json.Marshal(logEvent); err != nil {
log.Error().Err(err).Msg("error encoding log event")
} else {
fmt.Fprintf(w, "data: %s\n", buf)
}
if logEvent.Timestamp > 0 {
fmt.Fprintf(w, "id: %d\n", logEvent.Timestamp)
}
fmt.Fprintf(w, "\n")
f.Flush()
case <-ticker.C:
fmt.Fprintf(w, ":ping \n\n")
f.Flush()
sseWriter.Message(logEvent)
case container := <-newContainers:
events <- &docker.ContainerEvent{ActorID: container.ID, Name: "container-started", Host: container.Host}
go streamLogs(container)
case event := <-events:
log.Debug().Str("event", event.Name).Str("container", event.ActorID).Msg("received event")
if buf, err := json.Marshal(event); err != nil {
if err := sseWriter.Event("container-event", event); err != nil {
log.Error().Err(err).Msg("error encoding container event")
} else {
fmt.Fprintf(w, "event: container-event\ndata: %s\n\n", buf)
f.Flush()
}
case backfillEvents := <-backfill:
if err := sseWriter.Event("logs-backfill", backfillEvents); err != nil {
log.Error().Err(err).Msg("error encoding container event")
}
case <-ticker.C:
sseWriter.Ping()
case <-r.Context().Done():
break loop
}