diff --git a/internal/agent/server.go b/internal/agent/server.go index 931d8ad7..398f67a6 100644 --- a/internal/agent/server.go +++ b/internal/agent/server.go @@ -60,10 +60,11 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream return err } + events := make(chan *container.LogEvent) dockerReader := docker.NewLogReader(reader, c.Tty) - g := container.NewEventGenerator(out.Context(), dockerReader, c) + g := container.NewEventGenerator(out.Context(), dockerReader, c, events) - for event := range g.Events { + for event := range events { out.Send(&pb.StreamLogsResponse{ Event: logEventToPb(event), }) @@ -88,12 +89,13 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe return err } + events := make(chan *container.LogEvent) dockerReader := docker.NewLogReader(reader, c.Tty) - g := container.NewEventGenerator(out.Context(), dockerReader, c) + g := container.NewEventGenerator(out.Context(), dockerReader, c, events) for { select { - case event := <-g.Events: + case event := <-events: out.Send(&pb.StreamLogsResponse{ Event: logEventToPb(event), }) diff --git a/internal/container/event_generator.go b/internal/container/event_generator.go index 93de32b8..547dbb62 100644 --- a/internal/container/event_generator.go +++ b/internal/container/event_generator.go @@ -17,7 +17,7 @@ import ( ) type EventGenerator struct { - Events chan *LogEvent + events chan<- *LogEvent Errors chan error reader LogReader next *LogEvent @@ -33,12 +33,12 @@ type LogReader interface { Read() (string, StdType, error) } -func NewEventGenerator(ctx context.Context, reader LogReader, container Container) *EventGenerator { +func NewEventGenerator(ctx context.Context, reader LogReader, container Container, events chan<- *LogEvent) *EventGenerator { generator := &EventGenerator{ reader: reader, buffer: make(chan *LogEvent, 100), Errors: make(chan error, 1), - Events: make(chan *LogEvent), + events: events, containerID: container.ID, ctx: ctx, } @@ -69,14 +69,12 @@ loop: checkPosition(current, next) select { - case g.Events <- current: + case g.events <- current: case <-g.ctx.Done(): break loop } } - close(g.Events) - g.wg.Done() } @@ -114,6 +112,10 @@ func (g *EventGenerator) peek() *LogEvent { } } +func (g *EventGenerator) Wait() { + g.wg.Wait() +} + func createEvent(message string, streamType StdType) *LogEvent { h := fnv.New32a() h.Write([]byte(message)) diff --git a/internal/container/event_generator_test.go b/internal/container/event_generator_test.go index 81d25779..f9a56b3c 100644 --- a/internal/container/event_generator_test.go +++ b/internal/container/event_generator_test.go @@ -16,8 +16,9 @@ import ( func TestEventGenerator_Events_tty(t *testing.T) { input := "example input" - g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: true}) - event := <-g.Events + events := make(chan *LogEvent, 1) + NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: true}, events) + event := <-events require.NotNil(t, event, "Expected event to not be nil, but got nil") assert.Equal(t, input, event.Message) @@ -26,8 +27,9 @@ func TestEventGenerator_Events_tty(t *testing.T) { func TestEventGenerator_Events_non_tty(t *testing.T) { input := "example input" - g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false}) - event := <-g.Events + events := make(chan *LogEvent, 1) + NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false}, events) + event := <-events require.NotNil(t, event, "Expected event to not be nil, but got nil") assert.Equal(t, input, event.Message) @@ -36,9 +38,12 @@ func TestEventGenerator_Events_non_tty(t *testing.T) { func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) { input := "example input" - g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false}) - <-g.Events - _, ok := <-g.Events + events := make(chan *LogEvent, 1) + g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false}, events) + <-events + g.wg.Wait() // Wait for goroutines to finish + close(events) // Close the channel manually since generator no longer owns it + _, ok := <-events assert.False(t, ok, "Expected channel to be closed") } @@ -46,8 +51,9 @@ func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) { func TestEventGenerator_Events_routines_done(t *testing.T) { input := "example input" - g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false}) - <-g.Events + events := make(chan *LogEvent, 1) + g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false}, events) + <-events assert.False(t, waitTimeout(&g.wg, 1*time.Second), "Expected routines to be done") } diff --git a/internal/support/docker/docker_service.go b/internal/support/docker/docker_service.go index 5eedb477..d69a9ffa 100644 --- a/internal/support/docker/docker_service.go +++ b/internal/support/docker/docker_service.go @@ -58,9 +58,17 @@ func (d *DockerClientService) LogsBetweenDates(ctx context.Context, c container. return nil, err } + events := make(chan *container.LogEvent) dockerReader := docker.NewLogReader(reader, c.Tty) - g := container.NewEventGenerator(ctx, dockerReader, c) - return g.Events, nil + g := container.NewEventGenerator(ctx, dockerReader, c, events) + + // Start a goroutine to close the channel when EventGenerator completes + go func() { + defer close(events) + g.Wait() + }() + + return events, nil } func (d *DockerClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error { @@ -70,16 +78,25 @@ func (d *DockerClientService) StreamLogs(ctx context.Context, c container.Contai } dockerReader := docker.NewLogReader(reader, c.Tty) - g := container.NewEventGenerator(ctx, dockerReader, c) - for event := range g.Events { - events <- event - } + g := container.NewEventGenerator(ctx, dockerReader, c, events) + // Create a channel to signal when EventGenerator completes + done := make(chan struct{}) + go func() { + g.Wait() + close(done) + }() + + // Wait for either an error, completion, or context cancellation select { case e := <-g.Errors: return e - default: - return nil + case <-done: + // EventGenerator completed successfully + close(events) + return io.EOF + case <-ctx.Done(): + return ctx.Err() } } diff --git a/internal/support/k8s/k8s_service.go b/internal/support/k8s/k8s_service.go index 03f01213..8ec11379 100644 --- a/internal/support/k8s/k8s_service.go +++ b/internal/support/k8s/k8s_service.go @@ -51,9 +51,17 @@ func (k *K8sClientService) LogsBetweenDates(ctx context.Context, c container.Con return nil, err } + events := make(chan *container.LogEvent) k8sReader := k8s.NewLogReader(reader) - g := container.NewEventGenerator(ctx, k8sReader, c) - return g.Events, nil + g := container.NewEventGenerator(ctx, k8sReader, c, events) + + // Start a goroutine to close the channel when EventGenerator completes + go func() { + defer close(events) + g.Wait() + }() + + return events, nil } func (k *K8sClientService) RawLogs(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (io.ReadCloser, error) { @@ -67,16 +75,25 @@ func (k *K8sClientService) StreamLogs(ctx context.Context, c container.Container } k8sReader := k8s.NewLogReader(reader) - g := container.NewEventGenerator(ctx, k8sReader, c) - for event := range g.Events { - events <- event - } + g := container.NewEventGenerator(ctx, k8sReader, c, events) + // Create a channel to signal when EventGenerator completes + done := make(chan struct{}) + go func() { + g.Wait() + close(done) + }() + + // Wait for either an error, completion, or context cancellation select { case e := <-g.Errors: return e - default: - return nil + case <-done: + // EventGenerator completed successfully + close(events) + return io.EOF + case <-ctx.Done(): + return ctx.Err() } } diff --git a/internal/web/__snapshots__/web.snapshot b/internal/web/__snapshots__/web.snapshot index ede3c54a..8fe0ad23 100644 --- a/internal/web/__snapshots__/web.snapshot +++ b/internal/web/__snapshots__/web.snapshot @@ -164,13 +164,13 @@ stdout or stderr is required /* snapshot: Test_handler_streamLogs_happy */ :ping -data: {"m":"INFO Testing logs...\n","ts":0,"id":3835490584,"l":"info","s":"stdout","c":"123456"} - - event: container-event data: {"name":"container-stopped","host":"localhost","actorId":"123456","time":""} +data: {"m":"INFO Testing logs...\n","ts":0,"id":3835490584,"l":"info","s":"stdout","c":"123456"} + + /* snapshot: Test_handler_streamLogs_happy_container_stopped */ @@ -182,9 +182,9 @@ data: {"name":"container-stopped","host":"localhost","actorId":"123456","time":" /* snapshot: Test_handler_streamLogs_happy_with_id */ :ping -data: {"m":"INFO Testing logs...","rm":"INFO Testing logs...","ts":1589396137772,"id":2908612274,"l":"info","s":"stdout","c":"123456"} -id: 1589396137772 - - event: container-event -data: {"name":"container-stopped","host":"localhost","actorId":"123456","time":""} \ No newline at end of file +data: {"name":"container-stopped","host":"localhost","actorId":"123456","time":""} + + +data: {"m":"INFO Testing logs...","rm":"INFO Testing logs...","ts":1589396137772,"id":2908612274,"l":"info","s":"stdout","c":"123456"} +id: 1589396137772 \ No newline at end of file