From ce2b008098e6738d1fcfb889dd6f0967e9aa1cc5 Mon Sep 17 00:00:00 2001 From: Amir Raminfar Date: Tue, 9 Jul 2024 08:14:28 -0700 Subject: [PATCH] fix: improves memory leak in new container stream (#3076) --- internal/agent/server.go | 26 +++++++++---------- internal/docker/event_generator.go | 19 ++++++++++---- internal/docker/event_generator_test.go | 9 ++++--- internal/support/docker/client_service.go | 5 ++-- internal/support/docker/multi_host_service.go | 4 +++ internal/web/routes.go | 3 ++- package.json | 2 +- 7 files changed, 42 insertions(+), 26 deletions(-) diff --git a/internal/agent/server.go b/internal/agent/server.go index 38c86baa..2a11aea0 100644 --- a/internal/agent/server.go +++ b/internal/agent/server.go @@ -53,19 +53,19 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream return err } - g := docker.NewEventGenerator(reader, container) + g := docker.NewEventGenerator(out.Context(), reader, container) - for { - select { - case event := <-g.Events: - out.Send(&pb.StreamLogsResponse{ - Event: logEventToPb(event), - }) - case e := <-g.Errors: - return e - case <-out.Context().Done(): - return nil - } + for event := range g.Events { + out.Send(&pb.StreamLogsResponse{ + Event: logEventToPb(event), + }) + } + + select { + case e := <-g.Errors: + return e + default: + return nil } } @@ -80,7 +80,7 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe return err } - g := docker.NewEventGenerator(reader, container) + g := docker.NewEventGenerator(out.Context(), reader, container) for { select { diff --git a/internal/docker/event_generator.go b/internal/docker/event_generator.go index a3c80122..8c543ada 100644 --- a/internal/docker/event_generator.go +++ b/internal/docker/event_generator.go @@ -3,6 +3,7 @@ package docker import ( "bufio" "bytes" + "context" "encoding/binary" "errors" "fmt" @@ -28,6 +29,7 @@ type EventGenerator struct { tty bool wg sync.WaitGroup containerID string + ctx context.Context } var bufPool = sync.Pool{ @@ -38,7 +40,7 @@ var bufPool = sync.Pool{ var ErrBadHeader = fmt.Errorf("dozzle/docker: unable to read header") -func NewEventGenerator(reader io.Reader, container Container) *EventGenerator { +func NewEventGenerator(ctx context.Context, reader io.Reader, container Container) *EventGenerator { generator := &EventGenerator{ reader: bufio.NewReader(reader), buffer: make(chan *LogEvent, 100), @@ -46,6 +48,7 @@ func NewEventGenerator(reader io.Reader, container Container) *EventGenerator { Events: make(chan *LogEvent), tty: container.Tty, containerID: container.ID, + ctx: ctx, } generator.wg.Add(2) go generator.consumeReader() @@ -56,6 +59,7 @@ func NewEventGenerator(reader io.Reader, container Container) *EventGenerator { func (g *EventGenerator) processBuffer() { var current, next *LogEvent +loop: for { if g.next != nil { current = g.next @@ -64,18 +68,23 @@ func (g *EventGenerator) processBuffer() { } else { event, ok := <-g.buffer if !ok { - close(g.Events) - break + break loop } - current = event next = g.peek() } checkPosition(current, next) - g.Events <- current + select { + case g.Events <- current: + case <-g.ctx.Done(): + break loop + } } + + close(g.Events) + g.wg.Done() } diff --git a/internal/docker/event_generator_test.go b/internal/docker/event_generator_test.go index 5a143bec..6eba1b5e 100644 --- a/internal/docker/event_generator_test.go +++ b/internal/docker/event_generator_test.go @@ -3,6 +3,7 @@ package docker import ( "bufio" "bytes" + "context" "encoding/binary" "reflect" "strings" @@ -19,7 +20,7 @@ func TestEventGenerator_Events_tty(t *testing.T) { input := "example input" reader := bufio.NewReader(strings.NewReader(input)) - g := NewEventGenerator(reader, Container{Tty: true}) + g := NewEventGenerator(context.Background(), reader, Container{Tty: true}) event := <-g.Events require.NotNil(t, event, "Expected event to not be nil, but got nil") @@ -30,7 +31,7 @@ func TestEventGenerator_Events_non_tty(t *testing.T) { input := "example input" reader := bytes.NewReader(makeMessage(input, STDOUT)) - g := NewEventGenerator(reader, Container{Tty: false}) + g := NewEventGenerator(context.Background(), reader, Container{Tty: false}) event := <-g.Events require.NotNil(t, event, "Expected event to not be nil, but got nil") @@ -41,7 +42,7 @@ func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) { input := "example input" reader := bytes.NewReader(makeMessage(input, STDOUT)) - g := NewEventGenerator(reader, Container{Tty: false}) + g := NewEventGenerator(context.Background(), reader, Container{Tty: false}) <-g.Events _, ok := <-g.Events @@ -52,7 +53,7 @@ func TestEventGenerator_Events_routines_done(t *testing.T) { input := "example input" reader := bytes.NewReader(makeMessage(input, STDOUT)) - g := NewEventGenerator(reader, Container{Tty: false}) + g := NewEventGenerator(context.Background(), reader, Container{Tty: false}) <-g.Events assert.False(t, waitTimeout(&g.wg, 1*time.Second), "Expected routines to be done") } diff --git a/internal/support/docker/client_service.go b/internal/support/docker/client_service.go index 7ec3698f..4b3c76b1 100644 --- a/internal/support/docker/client_service.go +++ b/internal/support/docker/client_service.go @@ -47,7 +47,7 @@ func (d *dockerClientService) LogsBetweenDates(ctx context.Context, container do return nil, err } - g := docker.NewEventGenerator(reader, container) + g := docker.NewEventGenerator(ctx, reader, container) return g.Events, nil } @@ -57,10 +57,11 @@ func (d *dockerClientService) StreamLogs(ctx context.Context, container docker.C return err } - g := docker.NewEventGenerator(reader, container) + g := docker.NewEventGenerator(ctx, reader, container) for event := range g.Events { events <- event } + select { case e := <-g.Errors: return e diff --git a/internal/support/docker/multi_host_service.go b/internal/support/docker/multi_host_service.go index 0757abb0..1dbca864 100644 --- a/internal/support/docker/multi_host_service.go +++ b/internal/support/docker/multi_host_service.go @@ -174,6 +174,10 @@ func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, conta for _, client := range m.clients { client.SubscribeContainersStarted(ctx, newContainers) } + go func() { + <-ctx.Done() + close(newContainers) + }() go func() { for container := range newContainers { diff --git a/internal/web/routes.go b/internal/web/routes.go index b125a8db..c40a39b3 100644 --- a/internal/web/routes.go +++ b/internal/web/routes.go @@ -115,7 +115,6 @@ func createRouter(h *handler) *chi.Mux { r.Get("/healthcheck", h.healthcheck) - // r.Mount("/debug", middleware.Profiler()) }) if base != "/" { @@ -126,6 +125,8 @@ func createRouter(h *handler) *chi.Mux { fileServer = http.FileServer(http.FS(h.content)) + // r.Mount("/debug", middleware.Profiler()) + return r } diff --git a/package.json b/package.json index 6eaadb10..f6876e59 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ "watch:backend": "LIVE_FS=true DEV=true DOZZLE_ADDR=localhost:3100 reflex -c .reflex.server", "dev": "concurrently --kill-others \"npm:watch:*\"", "build": "vite build", - "preview": "LIVE_FS=true DOZZLE_ADDR=localhost:3100 reflex -c .reflex", + "preview": "LIVE_FS=true DOZZLE_ADDR=localhost:3100 reflex -c .reflex.server", "release": "bumpp", "test": "TZ=UTC vitest", "typecheck": "vue-tsc --noEmit",