From a4981f1b2c9dd17e596cd148dabb594ba44687d5 Mon Sep 17 00:00:00 2001 From: Amir Raminfar Date: Mon, 3 Dec 2018 12:34:08 -0800 Subject: [PATCH] Does a bunch of little cleanup (#11) * Cleans up the context by reusing r.Context() and passing messages instead of readers * Fixes tests * Removes parameters * Fixes error buffer --- docker/client.go | 52 +++++++++++++++++++++++++++++++++++++++--- main.go | 59 +++++++++++++++--------------------------------- main_test.go | 43 +++++++++++++++++++---------------- package.json | 2 +- 4 files changed, 91 insertions(+), 65 deletions(-) diff --git a/docker/client.go b/docker/client.go index ae75c2b0..e3b134c1 100644 --- a/docker/client.go +++ b/docker/client.go @@ -1,7 +1,9 @@ package docker import ( + "bytes" "context" + "encoding/binary" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" "github.com/docker/docker/client" @@ -18,7 +20,7 @@ type dockerClient struct { // Client is a proxy around the docker client type Client interface { ListContainers() ([]Container, error) - ContainerLogs(ctx context.Context, id string) (io.ReadCloser, error) + ContainerLogs(ctx context.Context, id string) (<-chan string, <-chan error) Events(ctx context.Context) (<-chan events.Message, <-chan error) } @@ -58,12 +60,56 @@ func (d *dockerClient) ListContainers() ([]Container, error) { return containers[i].Name < containers[j].Name }) + if containers == nil { + containers = []Container{} + } + return containers, nil } -func (d *dockerClient) ContainerLogs(ctx context.Context, id string) (io.ReadCloser, error) { +func (d *dockerClient) ContainerLogs(ctx context.Context, id string) (<-chan string, <-chan error) { options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true} - return d.cli.ContainerLogs(ctx, id, options) + + reader, err := d.cli.ContainerLogs(ctx, id, options) + if err != nil { + tmpErrors := make(chan error, 1) + tmpErrors <- err + return nil, tmpErrors + } + + go func() { + <-ctx.Done() + reader.Close() + }() + + messages := make(chan string) + errChannel := make(chan error) + + go func() { + hdr := make([]byte, 8) + var buffer bytes.Buffer + for { + _, err := reader.Read(hdr) + if err != nil { + errChannel <- err + break + } + count := binary.BigEndian.Uint32(hdr[4:]) + _, err = io.CopyN(&buffer, reader, int64(count)) + if err != nil { + errChannel <- err + break + } + messages <- buffer.String() + buffer.Reset() + } + close(messages) + close(errChannel) + reader.Close() + }() + + return messages, errChannel + } func (d *dockerClient) Events(ctx context.Context) (<-chan events.Message, <-chan error) { diff --git a/main.go b/main.go index 00f72e04..02a44b60 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,6 @@ package main import ( - "bytes" - "context" - "encoding/binary" "encoding/json" "fmt" "github.com/amir20/dozzle/docker" @@ -127,19 +124,7 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) { return } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - reader, err := h.client.ContainerLogs(ctx, id) - if err != nil { - log.Println(err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer reader.Close() - go func() { - <-r.Context().Done() - reader.Close() - }() + messages, err := h.client.ContainerLogs(r.Context(), id) w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") @@ -147,28 +132,23 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) { w.Header().Set("Transfer-Encoding", "chunked") log.Debugf("Starting to stream logs for %s", id) - hdr := make([]byte, 8) - var buffer bytes.Buffer +Loop: for { - _, err := reader.Read(hdr) - if err != nil { - log.Debugf("Error while reading from log stream: %v", err) - break + select { + case message, ok := <-messages: + if !ok { + break Loop + } + _, e := fmt.Fprintf(w, "data: %s\n\n", message) + if e != nil { + log.Debugf("Error while writing to log stream: %v", e) + break Loop + } + f.Flush() + case e := <-err: + log.Debugf("Error while reading from log stream: %v", e) + break Loop } - count := binary.BigEndian.Uint32(hdr[4:]) - _, err = io.CopyN(&buffer, reader, int64(count)) - - if err != nil { - log.Debugf("Error while reading from log stream: %v", err) - break - } - _, err = fmt.Fprintf(w, "data: %s\n\n", buffer.String()) - buffer.Reset() - if err != nil { - log.Debugf("Error while writing to log stream: %v", err) - break - } - f.Flush() } } @@ -184,8 +164,7 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") w.Header().Set("Transfer-Encoding", "chunked") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := r.Context() messages, err := h.client.Events(ctx) Loop: @@ -208,11 +187,9 @@ Loop: default: log.Debugf("Ignoring docker event: %v", message.Action) } - case <-r.Context().Done(): - cancel() + case <-ctx.Done(): break Loop case <-err: - cancel() break Loop } } diff --git a/main_test.go b/main_test.go index 9e65b326..5319e54f 100644 --- a/main_test.go +++ b/main_test.go @@ -1,17 +1,12 @@ package main import ( - "bytes" "context" - "encoding/binary" "errors" "github.com/docker/docker/api/types/events" - "io" - "io/ioutil" "net/http" "net/http/httptest" "os" - "strings" "testing" "github.com/amir20/dozzle/docker" @@ -34,13 +29,18 @@ func (m *MockedClient) ListContainers() ([]docker.Container, error) { return containers, args.Error(1) } -func (m *MockedClient) ContainerLogs(ctx context.Context, id string) (io.ReadCloser, error) { +func (m *MockedClient) ContainerLogs(ctx context.Context, id string) (<-chan string, <-chan error) { args := m.Called(ctx, id) - reader, ok := args.Get(0).(io.ReadCloser) + channel, ok := args.Get(0).(chan string) if !ok { - panic("reader is not of type io.ReadCloser") + panic("channel is not of type chan string") } - return reader, args.Error(1) + + err, ok := args.Get(1).(chan error) + if !ok { + panic("error is not of type chan error") + } + return channel, err } func (m *MockedClient) Events(ctx context.Context) (<-chan events.Message, <-chan error) { @@ -96,15 +96,14 @@ func Test_handler_streamLogs_happy(t *testing.T) { rr := httptest.NewRecorder() mockedClient := new(MockedClient) - log := "INFO Testing logs..." - b := make([]byte, 8) - binary.BigEndian.PutUint32(b[4:], uint32(len(log))) - b = append(b, []byte(log)...) - - var reader io.ReadCloser - reader = ioutil.NopCloser(bytes.NewReader(b)) - mockedClient.On("ContainerLogs", mock.Anything, id).Return(reader, nil) + messages := make(chan string) + errChannel := make(chan error) + mockedClient.On("ContainerLogs", mock.Anything, id).Return(messages, errChannel) + go func() { + messages <- "INFO Testing logs..." + close(messages) + }() h := handler{client: mockedClient} handler := http.HandlerFunc(h.streamLogs) @@ -123,9 +122,13 @@ func Test_handler_streamLogs_error_reading(t *testing.T) { rr := httptest.NewRecorder() mockedClient := new(MockedClient) - var reader io.ReadCloser - reader = ioutil.NopCloser(strings.NewReader("")) - mockedClient.On("ContainerLogs", mock.Anything, id).Return(reader, nil) + messages := make(chan string) + errChannel := make(chan error) + mockedClient.On("ContainerLogs", mock.Anything, id).Return(messages, errChannel) + + go func() { + errChannel <- errors.New("test error") + }() h := handler{client: mockedClient} handler := http.HandlerFunc(h.streamLogs) diff --git a/package.json b/package.json index 3f9db767..8e392f57 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "prestart": "npm run clean", "start": "DOCKER_API_VERSION=1.38 concurrently 'npm run watch-server' 'npm run watch-assets'", "watch-assets": "parcel watch --public-url '__BASE__' assets/index.html -d static", - "watch-server": "reflex -g '*.go' -R '^node_modules/' -R '^static/' -R '^.cache/' -G '*_test.go' -s -- go run main.go --level debug", + "watch-server": "reflex -g '**/*.go' -R '^node_modules/' -R '^static/' -R '^.cache/' -G '*_test.go' -s -- go run main.go --level debug", "prebuild": "npm run clean", "build": "parcel build --no-source-maps --public-url '__BASE__' assets/index.html -d static", "clean": "rm -rf static/ a_main-packr.go",