1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-24 06:28:42 +01:00

Tries to user readers instead of channels (#915)

This commit is contained in:
Amir Raminfar
2021-01-02 19:25:52 -08:00
committed by GitHub
parent 246f06e1a5
commit de15fcd74f
6 changed files with 136 additions and 193 deletions

View File

@@ -1,10 +1,7 @@
package docker package docker
import ( import (
"bufio"
"bytes"
"context" "context"
"encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -37,9 +34,9 @@ type dockerProxy interface {
type Client interface { type Client interface {
ListContainers() ([]Container, error) ListContainers() ([]Container, error)
FindContainer(string) (Container, error) FindContainer(string) (Container, error)
ContainerLogs(context.Context, string, int, string) (<-chan string, <-chan error) ContainerLogs(context.Context, string, int, string) (io.ReadCloser, error)
Events(context.Context) (<-chan ContainerEvent, <-chan error) Events(context.Context) (<-chan ContainerEvent, <-chan error)
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time) ([]string, error) ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time) (io.ReadCloser, error)
ContainerStats(context.Context, string, chan<- ContainerStat) error ContainerStats(context.Context, string, chan<- ContainerStat) error
} }
@@ -121,34 +118,6 @@ func (d *dockerClient) ListContainers() ([]Container, error) {
return containers, nil return containers, nil
} }
func logReader(reader io.ReadCloser, tty bool) func() (string, error) {
if tty {
scanner := bufio.NewScanner(reader)
return func() (string, error) {
if scanner.Scan() {
return scanner.Text(), nil
}
return "", io.EOF
}
}
hdr := make([]byte, 8)
var buffer bytes.Buffer
return func() (string, error) {
buffer.Reset()
_, err := reader.Read(hdr)
if err != nil {
return "", err
}
count := binary.BigEndian.Uint32(hdr[4:])
_, err = io.CopyN(&buffer, reader, int64(count))
if err != nil {
return "", err
}
return buffer.String(), nil
}
}
func (d *dockerClient) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error { func (d *dockerClient) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error {
response, err := d.cli.ContainerStats(ctx, id, true) response, err := d.cli.ContainerStats(ctx, id, true)
@@ -192,7 +161,7 @@ func (d *dockerClient) ContainerStats(ctx context.Context, id string, stats chan
return nil return nil
} }
func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (<-chan string, <-chan error) { func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (io.ReadCloser, error) {
log.WithField("id", id).WithField("since", since).Debug("streaming logs for container") log.WithField("id", id).WithField("since", since).Debug("streaming logs for container")
options := types.ContainerLogsOptions{ options := types.ContainerLogsOptions{
@@ -203,42 +172,18 @@ func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize in
Timestamps: true, Timestamps: true,
Since: since, Since: since,
} }
reader, err := d.cli.ContainerLogs(ctx, id, options)
errChannel := make(chan error, 1)
reader, err := d.cli.ContainerLogs(ctx, id, options)
if err != nil { if err != nil {
errChannel <- err return nil, err
close(errChannel)
return nil, errChannel
} }
messages := make(chan string) containerJSON, err := d.cli.ContainerInspect(ctx, id)
go func() { if err != nil {
<-ctx.Done() return nil, err
reader.Close() }
}()
containerJSON, _ := d.cli.ContainerInspect(ctx, id) return newLogReader(reader, containerJSON.Config.Tty), nil
go func() {
defer close(messages)
defer close(errChannel)
defer reader.Close()
nextEntry := logReader(reader, containerJSON.Config.Tty)
for {
line, err := nextEntry()
if err != nil {
errChannel <- err
break
}
select {
case messages <- line:
case <-ctx.Done():
}
}
}()
return messages, errChannel
} }
func (d *dockerClient) Events(ctx context.Context) (<-chan ContainerEvent, <-chan error) { func (d *dockerClient) Events(ctx context.Context) (<-chan ContainerEvent, <-chan error) {
@@ -267,7 +212,7 @@ func (d *dockerClient) Events(ctx context.Context) (<-chan ContainerEvent, <-cha
return messages, errors return messages, errors
} }
func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time) ([]string, error) { func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time) (io.ReadCloser, error) {
options := types.ContainerLogsOptions{ options := types.ContainerLogsOptions{
ShowStdout: true, ShowStdout: true,
ShowStderr: true, ShowStderr: true,
@@ -275,25 +220,17 @@ func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string,
Since: strconv.FormatInt(from.Unix(), 10), Since: strconv.FormatInt(from.Unix(), 10),
Until: strconv.FormatInt(to.Unix(), 10), Until: strconv.FormatInt(to.Unix(), 10),
} }
reader, _ := d.cli.ContainerLogs(ctx, id, options)
defer reader.Close()
containerJSON, _ := d.cli.ContainerInspect(ctx, id) reader, err := d.cli.ContainerLogs(ctx, id, options)
nextEntry := logReader(reader, containerJSON.Config.Tty) if err != nil {
return nil, err
var messages []string
for {
line, err := nextEntry()
if err != nil {
if err == io.EOF {
break
} else {
return nil, err
}
}
messages = append(messages, line)
} }
return messages, nil containerJSON, err := d.cli.ContainerInspect(ctx, id)
if err != nil {
return nil, err
}
return newLogReader(reader, containerJSON.Config.Tty), nil
} }

View File

@@ -7,6 +7,7 @@ import (
"errors" "errors"
"io" "io"
"io/ioutil" "io/ioutil"
"strings"
"testing" "testing"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
@@ -132,13 +133,10 @@ func Test_dockerClient_ContainerLogs_happy(t *testing.T) {
proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil) proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil)
client := &dockerClient{proxy, filters.NewArgs()} client := &dockerClient{proxy, filters.NewArgs()}
messages, _ := client.ContainerLogs(context.Background(), id, 300, "since") logReader, _ := client.ContainerLogs(context.Background(), id, 300, "since")
actual, _ := <-messages actual, _ := ioutil.ReadAll(logReader)
assert.Equal(t, expected, actual, "message doesn't match expected") assert.Equal(t, expected, string(actual), "message doesn't match expected")
_, ok := <-messages
assert.False(t, ok, "channel should have been closed")
proxy.AssertExpectations(t) proxy.AssertExpectations(t)
} }
@@ -148,7 +146,7 @@ func Test_dockerClient_ContainerLogs_happy_with_tty(t *testing.T) {
proxy := new(mockedProxy) proxy := new(mockedProxy)
expected := "INFO Testing logs..." expected := "INFO Testing logs..."
reader := ioutil.NopCloser(bytes.NewReader([]byte(expected))) reader := ioutil.NopCloser(strings.NewReader(expected))
options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true} options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true}
proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil) proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil)
@@ -156,13 +154,11 @@ func Test_dockerClient_ContainerLogs_happy_with_tty(t *testing.T) {
proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil) proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil)
client := &dockerClient{proxy, filters.NewArgs()} client := &dockerClient{proxy, filters.NewArgs()}
messages, _ := client.ContainerLogs(context.Background(), id, 300, "") logReader, _ := client.ContainerLogs(context.Background(), id, 300, "")
actual, _ := <-messages actual, _ := ioutil.ReadAll(logReader)
assert.Equal(t, expected, actual, "message doesn't match expected") assert.Equal(t, expected, string(actual), "message doesn't match expected")
_, ok := <-messages
assert.False(t, ok, "channel should have been closed")
proxy.AssertExpectations(t) proxy.AssertExpectations(t)
} }
@@ -174,14 +170,10 @@ func Test_dockerClient_ContainerLogs_error(t *testing.T) {
client := &dockerClient{proxy, filters.NewArgs()} client := &dockerClient{proxy, filters.NewArgs()}
messages, err := client.ContainerLogs(context.Background(), id, 300, "") reader, err := client.ContainerLogs(context.Background(), id, 300, "")
assert.Nil(t, messages, "messages should be nil") assert.Nil(t, reader, "reader should be nil")
assert.Error(t, err, "error should have been returned")
e, _ := <-err
assert.Error(t, e, "error should have been returned")
_, ok := <-err
assert.False(t, ok, "error channel should have been closed")
proxy.AssertExpectations(t) proxy.AssertExpectations(t)
} }

49
docker/reader.go Normal file
View File

@@ -0,0 +1,49 @@
package docker
import (
"bytes"
"encoding/binary"
"io"
)
type logReader struct {
readerCloser io.ReadCloser
tty bool
lastHeader []byte
buffer bytes.Buffer
}
func newLogReader(reader io.ReadCloser, tty bool) io.ReadCloser {
return &logReader{
reader,
tty,
make([]byte, 8),
bytes.Buffer{},
}
}
func (r *logReader) Read(p []byte) (n int, err error) {
if r.tty {
return r.readerCloser.Read(p)
} else {
if r.buffer.Len() > 0 {
return r.buffer.Read(p)
} else {
r.buffer.Reset()
_, err := r.readerCloser.Read(r.lastHeader)
if err != nil {
return 0, err
}
count := binary.BigEndian.Uint32(r.lastHeader[4:])
_, err = io.CopyN(&r.buffer, r.readerCloser, int64(count))
if err != nil {
return 0, err
}
return r.buffer.Read(p)
}
}
}
func (r *logReader) Close() error {
return r.readerCloser.Close()
}

View File

@@ -81,12 +81,15 @@ X-Content-Type-Options: nosniff
error finding container error finding container
/* snapshot: Test_handler_streamLogs_error_reading */ /* snapshot: Test_handler_streamLogs_error_reading */
HTTP/1.1 200 OK HTTP/1.1 500 Internal Server Error
Connection: close Connection: close
Cache-Control: no-cache Cache-Control: no-cache
Connection: keep-alive Connection: keep-alive
Content-Type: text/event-stream Content-Type: text/plain; charset=utf-8
X-Accel-Buffering: no X-Accel-Buffering: no
X-Content-Type-Options: nosniff
test error
/* snapshot: Test_handler_streamLogs_happy */ /* snapshot: Test_handler_streamLogs_happy */
HTTP/1.1 200 OK HTTP/1.1 200 OK
@@ -112,9 +115,6 @@ X-Accel-Buffering: no
event: container-stopped event: container-stopped
data: end of stream data: end of stream
event: container-stopped
data: end of stream
/* snapshot: Test_handler_streamLogs_happy_with_id */ /* snapshot: Test_handler_streamLogs_happy_with_id */
HTTP/1.1 200 OK HTTP/1.1 200 OK
Connection: close Connection: close

View File

@@ -1,6 +1,7 @@
package web package web
import ( import (
"bufio"
"encoding/json" "encoding/json"
"fmt" "fmt"
"html/template" "html/template"
@@ -8,6 +9,7 @@ import (
"net/http" "net/http"
"runtime" "runtime"
"strings" "strings"
"time" "time"
"github.com/amir20/dozzle/docker" "github.com/amir20/dozzle/docker"
@@ -103,11 +105,15 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request)
to, _ := time.Parse(time.RFC3339, r.URL.Query().Get("to")) to, _ := time.Parse(time.RFC3339, r.URL.Query().Get("to"))
id := r.URL.Query().Get("id") id := r.URL.Query().Get("id")
messages, _ := h.client.ContainerLogsBetweenDates(r.Context(), id, from, to) reader, err := h.client.ContainerLogsBetweenDates(r.Context(), id, from, to)
defer reader.Close()
for _, m := range messages { if err != nil {
fmt.Fprintln(w, m) http.Error(w, err.Error(), http.StatusInternalServerError)
return
} }
io.Copy(w, reader)
} }
func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) { func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) {
@@ -123,47 +129,47 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) {
return return
} }
container, e := h.client.FindContainer(id) container, err := h.client.FindContainer(id)
if e != nil { if err != nil {
http.Error(w, e.Error(), http.StatusNotFound) http.Error(w, err.Error(), http.StatusNotFound)
return return
} }
messages, err := h.client.ContainerLogs(r.Context(), container.ID, h.config.TailSize, r.Header.Get("Last-Event-ID"))
w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") w.Header().Set("X-Accel-Buffering", "no")
Loop:
for { reader, err := h.client.ContainerLogs(r.Context(), container.ID, h.config.TailSize, r.Header.Get("Last-Event-ID"))
select { if err != nil {
case message, ok := <-messages: if err == io.EOF {
if !ok { fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n")
fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n")
break Loop
}
fmt.Fprintf(w, "data: %s\n", message)
if index := strings.IndexAny(message, " "); index != -1 {
id := message[:index]
if _, err := time.Parse(time.RFC3339Nano, id); err == nil {
fmt.Fprintf(w, "id: %s\n", id)
}
}
fmt.Fprintf(w, "\n")
f.Flush() f.Flush()
case e := <-err: } else {
if e == io.EOF { http.Error(w, err.Error(), http.StatusInternalServerError)
log.Debugf("container stopped: %v", container.ID) }
fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n") return
f.Flush() }
} else { defer reader.Close()
log.Debugf("error while reading from log stream: %v", e)
break Loop scanner := bufio.NewScanner(reader)
for scanner.Scan() {
message := scanner.Text()
fmt.Fprintf(w, "data: %s\n", message)
if index := strings.IndexAny(message, " "); index != -1 {
id := message[:index]
if _, err := time.Parse(time.RFC3339Nano, id); err == nil {
fmt.Fprintf(w, "id: %s\n", id)
} }
} }
fmt.Fprintf(w, "\n")
f.Flush()
} }
log.Debugf("container stopped: %v", container.ID)
fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n")
f.Flush()
log.WithField("routines", runtime.NumGoroutine()).Debug("runtime goroutine stats") log.WithField("routines", runtime.NumGoroutine()).Debug("runtime goroutine stats")
if log.IsLevelEnabled(log.DebugLevel) { if log.IsLevelEnabled(log.DebugLevel) {

View File

@@ -4,9 +4,11 @@ import (
"context" "context"
"errors" "errors"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"strings"
"testing" "testing"
"github.com/magiconair/properties/assert" "github.com/magiconair/properties/assert"
@@ -25,34 +27,17 @@ type MockedClient struct {
func (m *MockedClient) FindContainer(id string) (docker.Container, error) { func (m *MockedClient) FindContainer(id string) (docker.Container, error) {
args := m.Called(id) args := m.Called(id)
container, ok := args.Get(0).(docker.Container) return args.Get(0).(docker.Container), args.Error(1)
if !ok {
panic("containers is not of type docker.Container")
}
return container, args.Error(1)
} }
func (m *MockedClient) ListContainers() ([]docker.Container, error) { func (m *MockedClient) ListContainers() ([]docker.Container, error) {
args := m.Called() args := m.Called()
containers, ok := args.Get(0).([]docker.Container) return args.Get(0).([]docker.Container), args.Error(1)
if !ok {
panic("containers is not of type []docker.Container")
}
return containers, args.Error(1)
} }
func (m *MockedClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (<-chan string, <-chan error) { func (m *MockedClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (io.ReadCloser, error) {
args := m.Called(ctx, id, tailSize) args := m.Called(ctx, id, tailSize)
channel, ok := args.Get(0).(chan string) return args.Get(0).(io.ReadCloser), args.Error(1)
if !ok {
panic("channel is not of type chan string")
}
err, ok := args.Get(1).(chan error)
if !ok {
panic("error is not of type chan error")
}
return channel, err
} }
func (m *MockedClient) Events(ctx context.Context) (<-chan docker.ContainerEvent, <-chan error) { func (m *MockedClient) Events(ctx context.Context) (<-chan docker.ContainerEvent, <-chan error) {
@@ -82,15 +67,9 @@ func Test_handler_streamLogs_happy(t *testing.T) {
require.NoError(t, err, "NewRequest should not return an error.") require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient) mockedClient := new(MockedClient)
reader := ioutil.NopCloser(strings.NewReader("INFO Testing logs..."))
messages := make(chan string)
errChannel := make(chan error)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(messages, errChannel) mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(reader, nil)
go func() {
messages <- "INFO Testing logs..."
close(messages)
}()
h := handler{client: mockedClient, config: &Config{TailSize: 300}} h := handler{client: mockedClient, config: &Config{TailSize: 300}}
handler := http.HandlerFunc(h.streamLogs) handler := http.HandlerFunc(h.streamLogs)
@@ -109,15 +88,9 @@ func Test_handler_streamLogs_happy_with_id(t *testing.T) {
require.NoError(t, err, "NewRequest should not return an error.") require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient) mockedClient := new(MockedClient)
reader := ioutil.NopCloser(strings.NewReader("2020-05-13T18:55:37.772853839Z INFO Testing logs..."))
messages := make(chan string)
errChannel := make(chan error)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(messages, errChannel) mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, 300).Return(reader, nil)
go func() {
messages <- "2020-05-13T18:55:37.772853839Z INFO Testing logs..."
close(messages)
}()
h := handler{client: mockedClient, config: &Config{TailSize: 300}} h := handler{client: mockedClient, config: &Config{TailSize: 300}}
handler := http.HandlerFunc(h.streamLogs) handler := http.HandlerFunc(h.streamLogs)
@@ -136,15 +109,8 @@ func Test_handler_streamLogs_happy_container_stopped(t *testing.T) {
require.NoError(t, err, "NewRequest should not return an error.") require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient) mockedClient := new(MockedClient)
messages := make(chan string)
errChannel := make(chan error)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(messages, errChannel) mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(ioutil.NopCloser(strings.NewReader("")), io.EOF)
go func() {
errChannel <- io.EOF
close(messages)
}()
h := handler{client: mockedClient, config: &Config{TailSize: 300}} h := handler{client: mockedClient, config: &Config{TailSize: 300}}
handler := http.HandlerFunc(h.streamLogs) handler := http.HandlerFunc(h.streamLogs)
@@ -182,15 +148,8 @@ func Test_handler_streamLogs_error_reading(t *testing.T) {
require.NoError(t, err, "NewRequest should not return an error.") require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient) mockedClient := new(MockedClient)
messages := make(chan string)
errChannel := make(chan error)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(messages, errChannel) mockedClient.On("ContainerLogs", mock.Anything, id, 300).Return(ioutil.NopCloser(strings.NewReader("")), errors.New("test error"))
go func() {
errChannel <- errors.New("test error")
close(messages)
}()
h := handler{client: mockedClient, config: &Config{TailSize: 300}} h := handler{client: mockedClient, config: &Config{TailSize: 300}}
handler := http.HandlerFunc(h.streamLogs) handler := http.HandlerFunc(h.streamLogs)