diff --git a/docker/client.go b/docker/client.go index aec4e9d0..c02ab03c 100644 --- a/docker/client.go +++ b/docker/client.go @@ -1,10 +1,7 @@ package docker import ( - "bufio" - "bytes" "context" - "encoding/binary" "encoding/json" "fmt" "io" @@ -37,9 +34,9 @@ type dockerProxy interface { type Client interface { ListContainers() ([]Container, error) FindContainer(string) (Container, error) - ContainerLogs(context.Context, string, int, string) (<-chan string, <-chan error) + ContainerLogs(context.Context, string, int, string) (io.ReadCloser, error) Events(context.Context) (<-chan ContainerEvent, <-chan error) - ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time) ([]string, error) + ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time) (io.ReadCloser, error) ContainerStats(context.Context, string, chan<- ContainerStat) error } @@ -121,34 +118,6 @@ func (d *dockerClient) ListContainers() ([]Container, error) { return containers, nil } -func logReader(reader io.ReadCloser, tty bool) func() (string, error) { - if tty { - scanner := bufio.NewScanner(reader) - return func() (string, error) { - if scanner.Scan() { - return scanner.Text(), nil - } - - return "", io.EOF - } - } - hdr := make([]byte, 8) - var buffer bytes.Buffer - return func() (string, error) { - buffer.Reset() - _, err := reader.Read(hdr) - if err != nil { - return "", err - } - count := binary.BigEndian.Uint32(hdr[4:]) - _, err = io.CopyN(&buffer, reader, int64(count)) - if err != nil { - return "", err - } - return buffer.String(), nil - } -} - func (d *dockerClient) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error { response, err := d.cli.ContainerStats(ctx, id, true) @@ -192,7 +161,7 @@ func (d *dockerClient) ContainerStats(ctx context.Context, id string, stats chan return nil } -func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (<-chan string, <-chan error) { +func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (io.ReadCloser, error) { log.WithField("id", id).WithField("since", since).Debug("streaming logs for container") options := types.ContainerLogsOptions{ @@ -203,42 +172,18 @@ func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize in Timestamps: true, Since: since, } - reader, err := d.cli.ContainerLogs(ctx, id, options) - errChannel := make(chan error, 1) + reader, err := d.cli.ContainerLogs(ctx, id, options) if err != nil { - errChannel <- err - close(errChannel) - return nil, errChannel + return nil, err } - messages := make(chan string) - go func() { - <-ctx.Done() - reader.Close() - }() + containerJSON, err := d.cli.ContainerInspect(ctx, id) + if err != nil { + return nil, err + } - containerJSON, _ := d.cli.ContainerInspect(ctx, id) - - go func() { - defer close(messages) - defer close(errChannel) - defer reader.Close() - nextEntry := logReader(reader, containerJSON.Config.Tty) - for { - line, err := nextEntry() - if err != nil { - errChannel <- err - break - } - select { - case messages <- line: - case <-ctx.Done(): - } - } - }() - - return messages, errChannel + return newLogReader(reader, containerJSON.Config.Tty), nil } func (d *dockerClient) Events(ctx context.Context) (<-chan ContainerEvent, <-chan error) { @@ -267,7 +212,7 @@ func (d *dockerClient) Events(ctx context.Context) (<-chan ContainerEvent, <-cha return messages, errors } -func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time) ([]string, error) { +func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time) (io.ReadCloser, error) { options := types.ContainerLogsOptions{ ShowStdout: true, ShowStderr: true, @@ -275,25 +220,17 @@ func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, Since: strconv.FormatInt(from.Unix(), 10), Until: strconv.FormatInt(to.Unix(), 10), } - reader, _ := d.cli.ContainerLogs(ctx, id, options) - defer reader.Close() - containerJSON, _ := d.cli.ContainerInspect(ctx, id) + reader, err := d.cli.ContainerLogs(ctx, id, options) - nextEntry := logReader(reader, containerJSON.Config.Tty) - - var messages []string - for { - line, err := nextEntry() - if err != nil { - if err == io.EOF { - break - } else { - return nil, err - } - } - messages = append(messages, line) + if err != nil { + return nil, err } - return messages, nil + containerJSON, err := d.cli.ContainerInspect(ctx, id) + if err != nil { + return nil, err + } + + return newLogReader(reader, containerJSON.Config.Tty), nil } diff --git a/docker/client_test.go b/docker/client_test.go index 518b9268..4833dee2 100644 --- a/docker/client_test.go +++ b/docker/client_test.go @@ -7,6 +7,7 @@ import ( "errors" "io" "io/ioutil" + "strings" "testing" "github.com/docker/docker/api/types" @@ -132,13 +133,10 @@ func Test_dockerClient_ContainerLogs_happy(t *testing.T) { proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil) client := &dockerClient{proxy, filters.NewArgs()} - messages, _ := client.ContainerLogs(context.Background(), id, 300, "since") + logReader, _ := client.ContainerLogs(context.Background(), id, 300, "since") - actual, _ := <-messages - assert.Equal(t, expected, actual, "message doesn't match expected") - - _, ok := <-messages - assert.False(t, ok, "channel should have been closed") + actual, _ := ioutil.ReadAll(logReader) + assert.Equal(t, expected, string(actual), "message doesn't match expected") proxy.AssertExpectations(t) } @@ -148,7 +146,7 @@ func Test_dockerClient_ContainerLogs_happy_with_tty(t *testing.T) { proxy := new(mockedProxy) expected := "INFO Testing logs..." - reader := ioutil.NopCloser(bytes.NewReader([]byte(expected))) + reader := ioutil.NopCloser(strings.NewReader(expected)) options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true} proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil) @@ -156,13 +154,11 @@ func Test_dockerClient_ContainerLogs_happy_with_tty(t *testing.T) { proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil) client := &dockerClient{proxy, filters.NewArgs()} - messages, _ := client.ContainerLogs(context.Background(), id, 300, "") + logReader, _ := client.ContainerLogs(context.Background(), id, 300, "") - actual, _ := <-messages - assert.Equal(t, expected, actual, "message doesn't match expected") + actual, _ := ioutil.ReadAll(logReader) + assert.Equal(t, expected, string(actual), "message doesn't match expected") - _, ok := <-messages - assert.False(t, ok, "channel should have been closed") proxy.AssertExpectations(t) } @@ -174,14 +170,10 @@ func Test_dockerClient_ContainerLogs_error(t *testing.T) { client := &dockerClient{proxy, filters.NewArgs()} - messages, err := client.ContainerLogs(context.Background(), id, 300, "") + reader, err := client.ContainerLogs(context.Background(), id, 300, "") - assert.Nil(t, messages, "messages should be nil") - - e, _ := <-err - assert.Error(t, e, "error should have been returned") - _, ok := <-err - assert.False(t, ok, "error channel should have been closed") + assert.Nil(t, reader, "reader should be nil") + assert.Error(t, err, "error should have been returned") proxy.AssertExpectations(t) } diff --git a/docker/reader.go b/docker/reader.go new file mode 100644 index 00000000..388ef846 --- /dev/null +++ b/docker/reader.go @@ -0,0 +1,49 @@ +package docker + +import ( + "bytes" + "encoding/binary" + "io" +) + +type logReader struct { + readerCloser io.ReadCloser + tty bool + lastHeader []byte + buffer bytes.Buffer +} + +func newLogReader(reader io.ReadCloser, tty bool) io.ReadCloser { + return &logReader{ + reader, + tty, + make([]byte, 8), + bytes.Buffer{}, + } +} + +func (r *logReader) Read(p []byte) (n int, err error) { + if r.tty { + return r.readerCloser.Read(p) + } else { + if r.buffer.Len() > 0 { + return r.buffer.Read(p) + } else { + r.buffer.Reset() + _, err := r.readerCloser.Read(r.lastHeader) + if err != nil { + return 0, err + } + count := binary.BigEndian.Uint32(r.lastHeader[4:]) + _, err = io.CopyN(&r.buffer, r.readerCloser, int64(count)) + if err != nil { + return 0, err + } + return r.buffer.Read(p) + } + } +} + +func (r *logReader) Close() error { + return r.readerCloser.Close() +} diff --git a/web/__snapshots__/web.snapshot b/web/__snapshots__/web.snapshot index 190abd08..7c28f5f3 100644 --- a/web/__snapshots__/web.snapshot +++ b/web/__snapshots__/web.snapshot @@ -81,12 +81,15 @@ X-Content-Type-Options: nosniff error finding container /* snapshot: Test_handler_streamLogs_error_reading */ -HTTP/1.1 200 OK +HTTP/1.1 500 Internal Server Error Connection: close Cache-Control: no-cache Connection: keep-alive -Content-Type: text/event-stream -X-Accel-Buffering: no +Content-Type: text/plain; charset=utf-8 +X-Accel-Buffering: no +X-Content-Type-Options: nosniff + +test error /* snapshot: Test_handler_streamLogs_happy */ HTTP/1.1 200 OK @@ -112,9 +115,6 @@ X-Accel-Buffering: no event: container-stopped data: end of stream -event: container-stopped -data: end of stream - /* snapshot: Test_handler_streamLogs_happy_with_id */ HTTP/1.1 200 OK Connection: close diff --git a/web/routes.go b/web/routes.go index fcc1c581..4ed2b403 100644 --- a/web/routes.go +++ b/web/routes.go @@ -1,6 +1,7 @@ package web import ( + "bufio" "encoding/json" "fmt" "html/template" @@ -8,6 +9,7 @@ import ( "net/http" "runtime" "strings" + "time" "github.com/amir20/dozzle/docker" @@ -103,11 +105,15 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request) to, _ := time.Parse(time.RFC3339, r.URL.Query().Get("to")) id := r.URL.Query().Get("id") - messages, _ := h.client.ContainerLogsBetweenDates(r.Context(), id, from, to) + reader, err := h.client.ContainerLogsBetweenDates(r.Context(), id, from, to) + defer reader.Close() - for _, m := range messages { - fmt.Fprintln(w, m) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + + io.Copy(w, reader) } func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) { @@ -123,47 +129,47 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) { return } - container, e := h.client.FindContainer(id) - if e != nil { - http.Error(w, e.Error(), http.StatusNotFound) + container, err := h.client.FindContainer(id) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) return } - messages, err := h.client.ContainerLogs(r.Context(), container.ID, h.config.TailSize, r.Header.Get("Last-Event-ID")) - w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") -Loop: - for { - select { - case message, ok := <-messages: - if !ok { - fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n") - break Loop - } - fmt.Fprintf(w, "data: %s\n", message) - if index := strings.IndexAny(message, " "); index != -1 { - id := message[:index] - if _, err := time.Parse(time.RFC3339Nano, id); err == nil { - fmt.Fprintf(w, "id: %s\n", id) - } - } - fmt.Fprintf(w, "\n") + + reader, err := h.client.ContainerLogs(r.Context(), container.ID, h.config.TailSize, r.Header.Get("Last-Event-ID")) + if err != nil { + if err == io.EOF { + fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n") f.Flush() - case e := <-err: - if e == io.EOF { - log.Debugf("container stopped: %v", container.ID) - fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n") - f.Flush() - } else { - log.Debugf("error while reading from log stream: %v", e) - break Loop + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return + } + defer reader.Close() + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + message := scanner.Text() + fmt.Fprintf(w, "data: %s\n", message) + if index := strings.IndexAny(message, " "); index != -1 { + id := message[:index] + if _, err := time.Parse(time.RFC3339Nano, id); err == nil { + fmt.Fprintf(w, "id: %s\n", id) } } + fmt.Fprintf(w, "\n") + f.Flush() } + log.Debugf("container stopped: %v", container.ID) + fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n") + f.Flush() + log.WithField("routines", runtime.NumGoroutine()).Debug("runtime goroutine stats") if log.IsLevelEnabled(log.DebugLevel) { diff --git a/web/routes_test.go b/web/routes_test.go index 33aa9ca2..9d987cb4 100644 --- a/web/routes_test.go +++ b/web/routes_test.go @@ -4,9 +4,11 @@ import ( "context" "errors" "io" + "io/ioutil" "net/http" "net/http/httptest" "os" + "strings" "testing" "github.com/magiconair/properties/assert" @@ -25,34 +27,17 @@ type MockedClient struct { func (m *MockedClient) FindContainer(id string) (docker.Container, error) { args := m.Called(id) - container, ok := args.Get(0).(docker.Container) - if !ok { - panic("containers is not of type docker.Container") - } - return container, args.Error(1) + return args.Get(0).(docker.Container), args.Error(1) } func (m *MockedClient) ListContainers() ([]docker.Container, error) { args := m.Called() - containers, ok := args.Get(0).([]docker.Container) - if !ok { - panic("containers is not of type []docker.Container") - } - return containers, args.Error(1) + return args.Get(0).([]docker.Container), args.Error(1) } -func (m *MockedClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (<-chan string, <-chan error) { +func (m *MockedClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (io.ReadCloser, error) { args := m.Called(ctx, id, tailSize) - channel, ok := args.Get(0).(chan string) - if !ok { - panic("channel is not of type chan string") - } - - err, ok := args.Get(1).(chan error) - if !ok { - panic("error is not of type chan error") - } - return channel, err + return args.Get(0).(io.ReadCloser), args.Error(1) } func (m *MockedClient) Events(ctx context.Context) (<-chan docker.ContainerEvent, <-chan error) { @@ -82,15 +67,9 @@ func Test_handler_streamLogs_happy(t *testing.T) { require.NoError(t, err, "NewRequest should not return an error.") mockedClient := new(MockedClient) - - messages := make(chan string) - errChannel := make(chan error) + reader := ioutil.NopCloser(strings.NewReader("INFO Testing logs...")) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) - mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(messages, errChannel) - go func() { - messages <- "INFO Testing logs..." - close(messages) - }() + mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(reader, nil) h := handler{client: mockedClient, config: &Config{TailSize: 300}} handler := http.HandlerFunc(h.streamLogs) @@ -109,15 +88,9 @@ func Test_handler_streamLogs_happy_with_id(t *testing.T) { require.NoError(t, err, "NewRequest should not return an error.") mockedClient := new(MockedClient) - - messages := make(chan string) - errChannel := make(chan error) + reader := ioutil.NopCloser(strings.NewReader("2020-05-13T18:55:37.772853839Z INFO Testing logs...")) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) - mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(messages, errChannel) - go func() { - messages <- "2020-05-13T18:55:37.772853839Z INFO Testing logs..." - close(messages) - }() + mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(reader, nil) h := handler{client: mockedClient, config: &Config{TailSize: 300}} handler := http.HandlerFunc(h.streamLogs) @@ -136,15 +109,8 @@ func Test_handler_streamLogs_happy_container_stopped(t *testing.T) { require.NoError(t, err, "NewRequest should not return an error.") mockedClient := new(MockedClient) - messages := make(chan string) - errChannel := make(chan error) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) - mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(messages, errChannel) - - go func() { - errChannel <- io.EOF - close(messages) - }() + mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(ioutil.NopCloser(strings.NewReader("")), io.EOF) h := handler{client: mockedClient, config: &Config{TailSize: 300}} handler := http.HandlerFunc(h.streamLogs) @@ -182,15 +148,8 @@ func Test_handler_streamLogs_error_reading(t *testing.T) { require.NoError(t, err, "NewRequest should not return an error.") mockedClient := new(MockedClient) - messages := make(chan string) - errChannel := make(chan error) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) - mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(messages, errChannel) - - go func() { - errChannel <- errors.New("test error") - close(messages) - }() + mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(ioutil.NopCloser(strings.NewReader("")), errors.New("test error")) h := handler{client: mockedClient, config: &Config{TailSize: 300}} handler := http.HandlerFunc(h.streamLogs)