diff --git a/internal/agent/client.go b/internal/agent/client.go index 73d8ba6c..1cc29198 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -92,7 +92,7 @@ func rpcErrToErr(err error) error { } } -func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since time.Time, until time.Time, std container.StdType) (<-chan *container.LogEvent, error) { +func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since time.Time, until time.Time, std container.StdType) (<-chan container.LogEvent, error) { stream, err := c.client.LogsBetweenDates(ctx, &pb.LogsBetweenDatesRequest{ ContainerId: containerID, Since: timestamppb.New(since), @@ -104,7 +104,7 @@ func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since return nil, err } - events := make(chan *container.LogEvent) + events := make(chan container.LogEvent) go func() { sendLogs(stream, events) @@ -114,7 +114,7 @@ func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since return events, nil } -func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, since time.Time, std container.StdType, events chan<- *container.LogEvent) error { +func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, since time.Time, std container.StdType, events chan<- container.LogEvent) error { stream, err := c.client.StreamLogs(ctx, &pb.StreamLogsRequest{ ContainerId: containerID, Since: timestamppb.New(since), @@ -128,7 +128,7 @@ func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, si return sendLogs(stream, events) } -func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- *container.LogEvent) error { +func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- container.LogEvent) error { for { resp, err := stream.Recv() if err != nil { @@ -154,7 +154,7 @@ func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- *container. continue } - events <- &container.LogEvent{ + events <- container.LogEvent{ Id: resp.Event.Id, ContainerID: resp.Event.ContainerId, Message: message, diff --git a/internal/agent/server.go b/internal/agent/server.go index 931d8ad7..00eccfc9 100644 --- a/internal/agent/server.go +++ b/internal/agent/server.go @@ -65,7 +65,7 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream for event := range g.Events { out.Send(&pb.StreamLogsResponse{ - Event: logEventToPb(event), + Event: logEventToPb(&event), }) } @@ -95,7 +95,7 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe select { case event := <-g.Events: out.Send(&pb.StreamLogsResponse{ - Event: logEventToPb(event), + Event: logEventToPb(&event), }) case e := <-g.Errors: return e diff --git a/internal/container/event_generator.go b/internal/container/event_generator.go index 93de32b8..66090182 100644 --- a/internal/container/event_generator.go +++ b/internal/container/event_generator.go @@ -17,11 +17,11 @@ import ( ) type EventGenerator struct { - Events chan *LogEvent + Events chan LogEvent Errors chan error reader LogReader next *LogEvent - buffer chan *LogEvent + buffer chan LogEvent wg sync.WaitGroup containerID string ctx context.Context @@ -36,9 +36,9 @@ type LogReader interface { func NewEventGenerator(ctx context.Context, reader LogReader, container Container) *EventGenerator { generator := &EventGenerator{ reader: reader, - buffer: make(chan *LogEvent, 100), + buffer: make(chan LogEvent, 100), Errors: make(chan error, 1), - Events: make(chan *LogEvent), + Events: make(chan LogEvent), containerID: container.ID, ctx: ctx, } @@ -49,24 +49,29 @@ func NewEventGenerator(ctx context.Context, reader LogReader, container Containe } func (g *EventGenerator) processBuffer() { - var current, next *LogEvent + var current, next LogEvent + var hasNext bool loop: for { if g.next != nil { - current = g.next + current = *g.next g.next = nil - next = g.peek() + next, hasNext = g.peek() } else { event, ok := <-g.buffer if !ok { break loop } current = event - next = g.peek() + next, hasNext = g.peek() } - checkPosition(current, next) + if hasNext { + checkPosition(¤t, &next) + } else { + checkPosition(¤t, nil) + } select { case g.Events <- current: @@ -101,23 +106,23 @@ func (g *EventGenerator) consumeReader() { g.wg.Done() } -func (g *EventGenerator) peek() *LogEvent { +func (g *EventGenerator) peek() (LogEvent, bool) { if g.next != nil { - return g.next + return *g.next, true } select { case event := <-g.buffer: - g.next = event - return g.next + g.next = &event + return event, true case <-time.After(50 * time.Millisecond): - return nil + return LogEvent{}, false } } -func createEvent(message string, streamType StdType) *LogEvent { +func createEvent(message string, streamType StdType) LogEvent { h := fnv.New32a() h.Write([]byte(message)) - logEvent := &LogEvent{Id: h.Sum32(), Message: message, Stream: streamType.String()} + logEvent := LogEvent{Id: h.Sum32(), Message: message, Stream: streamType.String()} if index := strings.IndexAny(message, " "); index != -1 { logId := message[:index] if timestamp, err := time.Parse(time.RFC3339Nano, logId); err == nil { @@ -157,20 +162,20 @@ func createEvent(message string, streamType StdType) *LogEvent { } func checkPosition(currentEvent *LogEvent, nextEvent *LogEvent) { - currentLevel := guessLogLevel(currentEvent) + currentLevel := guessLogLevel(*currentEvent) if nextEvent != nil { - if currentEvent.IsCloseToTime(nextEvent) && currentLevel != "unknown" && !nextEvent.HasLevel() { + if currentEvent.IsCloseToTime(*nextEvent) && currentLevel != "unknown" && !nextEvent.HasLevel() { currentEvent.Position = Beginning nextEvent.Position = Middle } // If next item is not close to current item or has level, set current item position to end - if currentEvent.Position == Middle && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(nextEvent)) { + if currentEvent.Position == Middle && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(*nextEvent)) { currentEvent.Position = End } // If next item is close to current item and has no level, set next item position to middle - if currentEvent.Position == Middle && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(nextEvent) { + if currentEvent.Position == Middle && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(*nextEvent) { nextEvent.Position = Middle } // Set next item level to current item level diff --git a/internal/container/event_generator_test.go b/internal/container/event_generator_test.go index 81d25779..48c283a8 100644 --- a/internal/container/event_generator_test.go +++ b/internal/container/event_generator_test.go @@ -96,14 +96,14 @@ func Test_createEvent(t *testing.T) { tests := []struct { name string args args - want *LogEvent + want LogEvent }{ { name: "empty message", args: args{ message: "", }, - want: &LogEvent{ + want: LogEvent{ Message: "", }, }, { @@ -111,7 +111,7 @@ func Test_createEvent(t *testing.T) { args: args{ message: "2020-05-13T18:55:37.772853839Z {\"xyz\": \"value\", \"abc\": \"value2\"}", }, - want: &LogEvent{ + want: LogEvent{ Message: data, }, }, @@ -120,7 +120,7 @@ func Test_createEvent(t *testing.T) { args: args{ message: "2020-05-13T18:55:37.772853839Z {\"key\"}", }, - want: &LogEvent{ + want: LogEvent{ Message: "{\"key\"}", }, }, @@ -129,7 +129,7 @@ func Test_createEvent(t *testing.T) { args: args{ message: "2020-05-13T18:55:37.772853839Z 123", }, - want: &LogEvent{ + want: LogEvent{ Message: "123", }, }, @@ -138,7 +138,7 @@ func Test_createEvent(t *testing.T) { args: args{ message: "2020-05-13T18:55:37.772853839Z sample text with=equal sign", }, - want: &LogEvent{ + want: LogEvent{ Message: "sample text with=equal sign", }, }, @@ -147,7 +147,7 @@ func Test_createEvent(t *testing.T) { args: args{ message: "2020-05-13T18:55:37.772853839Z null", }, - want: &LogEvent{ + want: LogEvent{ Message: "", }, }, diff --git a/internal/container/level_guesser.go b/internal/container/level_guesser.go index b334432c..13a34ff5 100644 --- a/internal/container/level_guesser.go +++ b/internal/container/level_guesser.go @@ -41,7 +41,7 @@ func init() { SupportedLogLevels["unknown"] = struct{}{} } -func guessLogLevel(logEvent *LogEvent) string { +func guessLogLevel(logEvent LogEvent) string { switch value := logEvent.Message.(type) { case string: value = stripANSI(value) diff --git a/internal/container/level_guesser_test.go b/internal/container/level_guesser_test.go index 075cba79..f901820d 100644 --- a/internal/container/level_guesser_test.go +++ b/internal/container/level_guesser_test.go @@ -72,7 +72,7 @@ func TestGuessLogLevel(t *testing.T) { for _, test := range tests { name, _ := json.Marshal(test.input) t.Run(string(name), func(t *testing.T) { - actual := guessLogLevel(&LogEvent{Message: test.input}) + actual := guessLogLevel(LogEvent{Message: test.input}) if actual != test.expected { t.Errorf("Expected %s, got %s", test.expected, actual) } diff --git a/internal/container/types.go b/internal/container/types.go index d1592ed4..19470a4e 100644 --- a/internal/container/types.go +++ b/internal/container/types.go @@ -176,14 +176,14 @@ type LogEvent struct { ContainerID string `json:"c,omitempty"` } -func (l *LogEvent) HasLevel() bool { +func (l LogEvent) HasLevel() bool { return l.Level != "unknown" } -func (l *LogEvent) IsCloseToTime(other *LogEvent) bool { +func (l LogEvent) IsCloseToTime(other LogEvent) bool { return math.Abs(float64(l.Timestamp-other.Timestamp)) < 10 } -func (l *LogEvent) MessageId() int64 { +func (l LogEvent) MessageId() int64 { return l.Timestamp } diff --git a/internal/support/container/agent_service.go b/internal/support/container/agent_service.go index 3475e855..f79dcc4c 100644 --- a/internal/support/container/agent_service.go +++ b/internal/support/container/agent_service.go @@ -32,11 +32,11 @@ func (a *agentService) RawLogs(ctx context.Context, container container.Containe return a.client.StreamRawBytes(ctx, container.ID, from, to, stdTypes) } -func (a *agentService) LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) { +func (a *agentService) LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) { return a.client.LogsBetweenDates(ctx, container.ID, from, to, stdTypes) } -func (a *agentService) StreamLogs(ctx context.Context, container container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error { +func (a *agentService) StreamLogs(ctx context.Context, container container.Container, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error { return a.client.StreamContainerLogs(ctx, container.ID, from, stdTypes, events) } diff --git a/internal/support/container/client_service.go b/internal/support/container/client_service.go index 77e244b5..406e6475 100644 --- a/internal/support/container/client_service.go +++ b/internal/support/container/client_service.go @@ -15,7 +15,7 @@ type ClientService interface { ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error) Host(ctx context.Context) (container.Host, error) ContainerAction(ctx context.Context, container container.Container, action container.ContainerAction) error - LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) + LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) RawLogs(context.Context, container.Container, time.Time, time.Time, container.StdType) (io.ReadCloser, error) // Subscriptions @@ -24,7 +24,7 @@ type ClientService interface { SubscribeContainersStarted(context.Context, chan<- container.Container) // Blocking streaming functions that should be used in a goroutine - StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- *container.LogEvent) error + StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- container.LogEvent) error // Terminal Attach(context.Context, container.Container, io.Reader, io.Writer) error diff --git a/internal/support/container/container_service.go b/internal/support/container/container_service.go index 07a97760..6e448167 100644 --- a/internal/support/container/container_service.go +++ b/internal/support/container/container_service.go @@ -24,11 +24,11 @@ func (c *ContainerService) RawLogs(ctx context.Context, from time.Time, to time. return c.clientService.RawLogs(ctx, c.Container, from, to, stdTypes) } -func (c *ContainerService) LogsBetweenDates(ctx context.Context, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) { +func (c *ContainerService) LogsBetweenDates(ctx context.Context, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) { return c.clientService.LogsBetweenDates(ctx, c.Container, from, to, stdTypes) } -func (c *ContainerService) StreamLogs(ctx context.Context, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error { +func (c *ContainerService) StreamLogs(ctx context.Context, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error { return c.clientService.StreamLogs(ctx, c.Container, from, stdTypes, events) } diff --git a/internal/support/docker/docker_service.go b/internal/support/docker/docker_service.go index 5eedb477..fd8e186a 100644 --- a/internal/support/docker/docker_service.go +++ b/internal/support/docker/docker_service.go @@ -52,7 +52,7 @@ func (d *DockerClientService) RawLogs(ctx context.Context, container container.C } -func (d *DockerClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) { +func (d *DockerClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) { reader, err := d.client.ContainerLogsBetweenDates(ctx, c.ID, from, to, stdTypes) if err != nil { return nil, err @@ -63,7 +63,7 @@ func (d *DockerClientService) LogsBetweenDates(ctx context.Context, c container. return g.Events, nil } -func (d *DockerClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error { +func (d *DockerClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error { reader, err := d.client.ContainerLogs(ctx, c.ID, from, stdTypes) if err != nil { return err diff --git a/internal/support/k8s/k8s_service.go b/internal/support/k8s/k8s_service.go index 03f01213..22c3d7d5 100644 --- a/internal/support/k8s/k8s_service.go +++ b/internal/support/k8s/k8s_service.go @@ -45,7 +45,7 @@ func (k *K8sClientService) ContainerAction(ctx context.Context, container contai return k.client.ContainerActions(ctx, action, container.ID) } -func (k *K8sClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) { +func (k *K8sClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) { reader, err := k.client.ContainerLogsBetweenDates(ctx, c.ID, from, to, stdTypes) if err != nil { return nil, err @@ -60,7 +60,7 @@ func (k *K8sClientService) RawLogs(ctx context.Context, container container.Cont return k.client.ContainerLogsBetweenDates(ctx, container.ID, from, to, stdTypes) } -func (k *K8sClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error { +func (k *K8sClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error { reader, err := k.client.ContainerLogs(ctx, c.ID, from, stdTypes) if err != nil { return err diff --git a/internal/support/web/search.go b/internal/support/web/search.go index 068d9712..55b66d9b 100644 --- a/internal/support/web/search.go +++ b/internal/support/web/search.go @@ -16,7 +16,7 @@ func ParseRegex(search string) (*regexp.Regexp, error) { return CreateRegex(search, search == strings.ToLower(search)) } -func Search(re *regexp.Regexp, logEvent *container.LogEvent) bool { +func Search(re *regexp.Regexp, logEvent container.LogEvent) bool { matcher := NewPatternMatcher(re, MarkerStart, MarkerEnd) - return matcher.MarkInLogEvent(logEvent) + return matcher.MarkInLogEvent(&logEvent) } diff --git a/internal/web/logs.go b/internal/web/logs.go index 498a7bce..0a165eb2 100644 --- a/internal/web/logs.go +++ b/internal/web/logs.go @@ -82,7 +82,7 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request) } minimum := 0 - buffer := utils.NewRingBuffer[*container.LogEvent](500) + buffer := utils.NewRingBuffer[container.LogEvent](500) if r.URL.Query().Has("min") { minimum, err = strconv.Atoi(r.URL.Query().Get("min")) if err != nil { @@ -94,7 +94,7 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request) http.Error(w, errors.New("minimum must be between 0 and buffer size").Error(), http.StatusBadRequest) return } - buffer = utils.NewRingBuffer[*container.LogEvent](minimum) + buffer = utils.NewRingBuffer[container.LogEvent](minimum) } maxStart := math.MaxInt @@ -177,7 +177,7 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request) break } - support_web.EscapeHTMLValues(event) + support_web.EscapeHTMLValues(&event) buffer.Push(event) } } @@ -294,9 +294,9 @@ func (h *handler) streamLogsForContainers(w http.ResponseWriter, r *http.Request absoluteTime := time.Time{} var regex *regexp.Regexp - liveLogs := make(chan *container.LogEvent) + liveLogs := make(chan container.LogEvent) events := make(chan *container.ContainerEvent, 1) - backfill := make(chan []*container.LogEvent) + backfill := make(chan []container.LogEvent) levels := make(map[string]struct{}) for _, level := range r.URL.Query()["levels"] { @@ -318,7 +318,7 @@ func (h *handler) streamLogsForContainers(w http.ResponseWriter, r *http.Request delta := -10 * time.Second to := absoluteTime for minimum > 0 { - events := make([]*container.LogEvent, 0) + events := make([]container.LogEvent, 0) stillRunning := false for _, container := range existingContainers { containerService, err := h.hostService.FindContainer(container.Host, container.ID, userLabels) @@ -418,7 +418,7 @@ loop: continue } - support_web.EscapeHTMLValues(logEvent) + support_web.EscapeHTMLValues(&logEvent) sseWriter.Message(logEvent) case c := <-newContainers: if _, err := h.hostService.FindContainer(c.Host, c.ID, userLabels); err == nil { @@ -433,8 +433,8 @@ loop: } case backfillEvents := <-backfill: - for _, event := range backfillEvents { - support_web.EscapeHTMLValues(event) + for i := range backfillEvents { + support_web.EscapeHTMLValues(&backfillEvents[i]) } if err := sseWriter.Event("logs-backfill", backfillEvents); err != nil { log.Error().Err(err).Msg("error encoding container event")