1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-25 14:59:26 +01:00

fix(performance): improves streaming of logs by using the Docker library to demultiplex (#2295)

* fix(performance): improves streaming of logs by using the Docker library to demultiplex

* fixes go tests

* changes channel to unbuffered

* fixes golang race test

* fixes stopped containers

* wip

* uses different strategy

* cleans tests

* cleans tests

* adds more tests

* fixes possible bug

* fixes download

* adds more tests for download

* uses pool

* adds more tests
This commit is contained in:
Amir Raminfar
2023-07-11 09:52:30 -07:00
committed by GitHub
parent ad7b3a2b52
commit 1bb1081f1a
13 changed files with 468 additions and 433 deletions

View File

@@ -1 +1 @@
-r '\.(go)$' -R 'node_modules' -G '*_test.go' -s -- go run main.go --level debug --remote-host tcp://64.225.88.189:2376|clashleaders.com
-r '\.(go)$' -R 'node_modules' -G '\*\_test.go' -s -- go run main.go --level debug --remote-host tcp://64.225.88.189:2376|clashleaders.com

View File

@@ -62,7 +62,6 @@ type Client interface {
ListContainers() ([]Container, error)
FindContainer(string) (Container, error)
ContainerLogs(context.Context, string, string, StdType) (io.ReadCloser, error)
ContainerLogReader(context.Context, string) (io.ReadCloser, error)
Events(context.Context, chan<- ContainerEvent) <-chan error
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time, StdType) (io.ReadCloser, error)
ContainerStats(context.Context, string, chan<- ContainerStat) error
@@ -145,6 +144,12 @@ func (d *dockerClient) FindContainer(id string) (Container, error) {
return container, fmt.Errorf("unable to find container with id: %s", id)
}
if json, err := d.cli.ContainerInspect(context.Background(), container.ID); err == nil {
container.Tty = json.Config.Tty
} else {
return container, err
}
return container, nil
}
@@ -255,18 +260,12 @@ func (d *dockerClient) ContainerLogs(ctx context.Context, id string, since strin
Since: since,
}
log.Debugf("streaming logs from Docker with option: %+v", options)
reader, err := d.cli.ContainerLogs(ctx, id, options)
if err != nil {
return nil, err
}
containerJSON, err := d.cli.ContainerInspect(ctx, id)
if err != nil {
return nil, err
}
return newLogReader(reader, containerJSON.Config.Tty, true), nil
return reader, nil
}
func (d *dockerClient) Events(ctx context.Context, messages chan<- ContainerEvent) <-chan error {
@@ -297,30 +296,6 @@ func (d *dockerClient) Events(ctx context.Context, messages chan<- ContainerEven
return errors
}
func (d *dockerClient) ContainerLogReader(ctx context.Context, id string) (io.ReadCloser, error) {
options := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Timestamps: true,
Since: time.Unix(0, 0).Format(time.RFC3339),
Until: time.Now().Format(time.RFC3339),
}
reader, err := d.cli.ContainerLogs(ctx, id, options)
if err != nil {
return nil, err
}
containerJSON, err := d.cli.ContainerInspect(ctx, id)
if err != nil {
return nil, err
}
return newLogReader(reader, containerJSON.Config.Tty, false), nil
}
func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType StdType) (io.ReadCloser, error) {
options := types.ContainerLogsOptions{
ShowStdout: stdType&STDOUT != 0,
@@ -333,17 +308,11 @@ func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string,
log.Debugf("fetching logs from Docker with option: %+v", options)
reader, err := d.cli.ContainerLogs(ctx, id, options)
if err != nil {
return nil, err
}
containerJSON, err := d.cli.ContainerInspect(ctx, id)
if err != nil {
return nil, err
}
return newLogReader(reader, containerJSON.Config.Tty, true), nil
return reader, nil
}
func (d *dockerClient) Ping(ctx context.Context) (types.Ping, error) {

View File

@@ -7,7 +7,6 @@ import (
"errors"
"io"
"strings"
"testing"
"github.com/docker/docker/api/types"
@@ -126,36 +125,11 @@ func Test_dockerClient_ContainerLogs_happy(t *testing.T) {
options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true, Since: "since"}
proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil)
json := types.ContainerJSON{Config: &container.Config{Tty: false}}
proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil)
client := &dockerClient{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
logReader, _ := client.ContainerLogs(context.Background(), id, "since", STDALL)
actual, _ := io.ReadAll(logReader)
assert.Equal(t, expected, string(actual), "message doesn't match expected")
proxy.AssertExpectations(t)
}
func Test_dockerClient_ContainerLogs_happy_with_tty(t *testing.T) {
id := "123456"
proxy := new(mockedProxy)
expected := "INFO Testing logs..."
reader := io.NopCloser(strings.NewReader(expected))
options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true}
proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil)
json := types.ContainerJSON{Config: &container.Config{Tty: true}}
proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil)
client := &dockerClient{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
logReader, _ := client.ContainerLogs(context.Background(), id, "", STDALL)
actual, _ := io.ReadAll(logReader)
assert.Equal(t, expected, string(actual), "message doesn't match expected")
assert.Equal(t, string(b), string(actual), "message doesn't match expected")
proxy.AssertExpectations(t)
}
@@ -188,6 +162,10 @@ func Test_dockerClient_FindContainer_happy(t *testing.T) {
proxy := new(mockedProxy)
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(containers, nil)
json := types.ContainerJSON{Config: &container.Config{Tty: false}}
proxy.On("ContainerInspect", mock.Anything, "abcdefghijkl").Return(json, nil)
client := &dockerClient{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
container, err := client.FindContainer("abcdefghijkl")
@@ -198,6 +176,7 @@ func Test_dockerClient_FindContainer_happy(t *testing.T) {
Name: "z_test_container",
Names: []string{"/z_test_container"},
Host: "localhost",
Tty: false,
})
proxy.AssertExpectations(t)

234
docker/event_generator.go Normal file
View File

@@ -0,0 +1,234 @@
package docker
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"regexp"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type EventGenerator struct {
Events chan *LogEvent
Errors chan error
reader *bufio.Reader
next *LogEvent
buffer chan *LogEvent
tty bool
wg sync.WaitGroup
}
var bufPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
var BadHeaderErr = fmt.Errorf("dozzle/docker: unable to read header")
func NewEventGenerator(reader io.Reader, tty bool) *EventGenerator {
generator := &EventGenerator{
reader: bufio.NewReader(reader),
buffer: make(chan *LogEvent, 100),
Errors: make(chan error, 1),
Events: make(chan *LogEvent),
tty: tty,
}
generator.wg.Add(2)
go generator.consumeReader(&generator.wg)
go generator.processBuffer(&generator.wg)
return generator
}
func (g *EventGenerator) processBuffer(wg *sync.WaitGroup) {
var current, next *LogEvent
for {
if g.next != nil {
current = g.next
g.next = nil
next = g.peek()
} else {
event, ok := <-g.buffer
if !ok {
close(g.Events)
break
}
current = event
next = g.peek()
}
checkPosition(current, next)
g.Events <- current
}
wg.Done()
}
func (g *EventGenerator) consumeReader(wg *sync.WaitGroup) {
for {
message, streamType, readerError := readEvent(g.reader, g.tty)
if message != "" {
logEvent := createEvent(message, streamType)
logEvent.Level = guessLogLevel(logEvent)
g.buffer <- logEvent
}
if readerError != nil {
if readerError != BadHeaderErr {
g.Errors <- readerError
}
close(g.buffer)
break
}
}
wg.Done()
}
func (g *EventGenerator) peek() *LogEvent {
if g.next != nil {
return g.next
}
select {
case event := <-g.buffer:
g.next = event
return g.next
case <-time.After(50 * time.Millisecond):
return nil
}
}
func readEvent(reader *bufio.Reader, tty bool) (string, StdType, error) {
header := []byte{0, 0, 0, 0, 0, 0, 0, 0}
buffer := bufPool.Get().(*bytes.Buffer)
buffer.Reset()
defer bufPool.Put(buffer)
var streamType StdType = STDOUT
if tty {
message, err := reader.ReadString('\n')
if err != nil {
return message, streamType, err
}
return message, streamType, nil
} else {
n, err := reader.Read(header)
if err != nil {
return "", streamType, err
}
if n != 8 {
return "", streamType, BadHeaderErr
}
switch header[0] {
case 1:
streamType = STDOUT
case 2:
streamType = STDERR
default:
log.Warnf("unknown stream type %d", header[0])
}
count := binary.BigEndian.Uint32(header[4:])
if count == 0 {
return "", streamType, nil
}
_, err = io.CopyN(buffer, reader, int64(count))
if err != nil {
return "", streamType, err
}
return buffer.String(), streamType, nil
}
}
func createEvent(message string, streamType StdType) *LogEvent {
h := fnv.New32a()
h.Write([]byte(message))
logEvent := &LogEvent{Id: h.Sum32(), Message: message, Stream: streamType.String()}
if index := strings.IndexAny(message, " "); index != -1 {
logId := message[:index]
if timestamp, err := time.Parse(time.RFC3339Nano, logId); err == nil {
logEvent.Timestamp = timestamp.UnixMilli()
message = strings.TrimSuffix(message[index+1:], "\n")
logEvent.Message = message
if strings.HasPrefix(message, "{") && strings.HasSuffix(message, "}") {
var data map[string]interface{}
if err := json.Unmarshal([]byte(message), &data); err != nil {
log.Warnf("unable to parse json logs - error was \"%v\" while trying unmarshal \"%v\"", err.Error(), message)
} else {
logEvent.Message = data
}
}
}
}
return logEvent
}
func checkPosition(currentEvent *LogEvent, nextEvent *LogEvent) {
currentLevel := guessLogLevel(currentEvent)
if nextEvent != nil {
if currentEvent.IsCloseToTime(nextEvent) && currentLevel != "" && !nextEvent.HasLevel() {
currentEvent.Position = START
nextEvent.Position = MIDDLE
}
// If next item is not close to current item or has level, set current item position to end
if currentEvent.Position == MIDDLE && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(nextEvent)) {
currentEvent.Position = END
}
// If next item is close to current item and has no level, set next item position to middle
if currentEvent.Position == MIDDLE && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(nextEvent) {
nextEvent.Position = MIDDLE
}
// Set next item level to current item level
if currentEvent.Position == START || currentEvent.Position == MIDDLE {
nextEvent.Level = currentEvent.Level
}
} else if currentEvent.Position == MIDDLE {
currentEvent.Position = END
}
}
var KEY_VALUE_REGEX = regexp.MustCompile(`level=(\w+)`)
var ANSI_COLOR_REGEX = regexp.MustCompile(`\x1b\[[0-9;]*m`)
func guessLogLevel(logEvent *LogEvent) string {
switch value := logEvent.Message.(type) {
case string:
levels := []string{"error", "warn", "warning", "info", "debug", "trace", "fatal"}
stripped := ANSI_COLOR_REGEX.ReplaceAllString(value, "") // remove ansi color codes
for _, level := range levels {
if match, _ := regexp.MatchString("(?i)^"+level+"[^a-z]", stripped); match {
return level
}
if strings.Contains(value, "["+strings.ToUpper(level)+"]") {
return level
}
if strings.Contains(value, " "+strings.ToUpper(level)+" ") {
return level
}
}
if matches := KEY_VALUE_REGEX.FindStringSubmatch(value); matches != nil {
return matches[1]
}
case map[string]interface{}:
if level, ok := value["level"].(string); ok {
return level
}
}
return ""
}

View File

@@ -0,0 +1,79 @@
package docker
import (
"bufio"
"bytes"
"encoding/binary"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEventGenerator_Events_tty(t *testing.T) {
input := "example input"
reader := bufio.NewReader(strings.NewReader(input))
g := NewEventGenerator(reader, true)
event := <-g.Events
require.NotNil(t, event, "Expected event to not be nil, but got nil")
assert.Equal(t, input, event.Message)
}
func TestEventGenerator_Events_non_tty(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))
g := NewEventGenerator(reader, false)
event := <-g.Events
require.NotNil(t, event, "Expected event to not be nil, but got nil")
assert.Equal(t, input, event.Message)
}
func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))
g := NewEventGenerator(reader, false)
<-g.Events
_, ok := <-g.Events
assert.False(t, ok, "Expected channel to be closed")
}
func TestEventGenerator_Events_routines_done(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))
g := NewEventGenerator(reader, false)
<-g.Events
assert.False(t, waitTimeout(&g.wg, 1*time.Second), "Expected routines to be done")
}
func makeMessage(message string, stream StdType) []byte {
data := make([]byte, 8)
binary.BigEndian.PutUint32(data[4:], uint32(len(message)))
data[0] = byte(stream / 2)
data = append(data, []byte(message)...)
return data
}
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}

View File

@@ -1,173 +0,0 @@
package docker
import (
"bufio"
"encoding/json"
"hash/fnv"
"regexp"
"strings"
"time"
log "github.com/sirupsen/logrus"
)
type eventGenerator struct {
reader *bufio.Reader
channel chan *LogEvent
next *LogEvent
lastError error
}
func NewEventIterator(reader *bufio.Reader) *eventGenerator {
generator := &eventGenerator{reader: reader, channel: make(chan *LogEvent, 100)}
go generator.consume()
return generator
}
func (g *eventGenerator) Next() (*LogEvent, error) {
var currentEvent *LogEvent
var nextEvent *LogEvent
if g.next != nil {
currentEvent = g.next
g.next = nil
nextEvent = g.Peek()
} else {
event, ok := <-g.channel
if !ok {
return nil, g.lastError
}
currentEvent = event
nextEvent = g.Peek()
}
currentLevel := guessLogLevel(currentEvent)
if nextEvent != nil {
if currentEvent.IsCloseToTime(nextEvent) && currentLevel != "" && !nextEvent.HasLevel() {
currentEvent.Position = START
nextEvent.Position = MIDDLE
}
// If next item is not close to current item or has level, set current item position to end
if currentEvent.Position == MIDDLE && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(nextEvent)) {
currentEvent.Position = END
}
// If next item is close to current item and has no level, set next item position to middle
if currentEvent.Position == MIDDLE && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(nextEvent) {
nextEvent.Position = MIDDLE
}
// Set next item level to current item level
if currentEvent.Position == START || currentEvent.Position == MIDDLE {
nextEvent.Level = currentEvent.Level
}
} else if currentEvent.Position == MIDDLE {
currentEvent.Position = END
}
return currentEvent, nil
}
func (g *eventGenerator) LastError() error {
return g.lastError
}
func (g *eventGenerator) Peek() *LogEvent {
if g.next != nil {
return g.next
}
select {
case event := <-g.channel:
g.next = event
return g.next
case <-time.After(50 * time.Millisecond):
return nil
}
}
func (g *eventGenerator) consume() {
for {
message, readerError := g.reader.ReadString('\n')
if message != "" {
h := fnv.New32a()
h.Write([]byte(message))
std := message[:3]
var stdType StdType
switch std {
case "OUT":
stdType = STDOUT
message = message[3:]
case "ERR":
stdType = STDERR
message = message[3:]
default:
log.Debugf("unknown std type [%s] with message [%s]", std, message)
stdType = UNKNOWN
}
logEvent := &LogEvent{Id: h.Sum32(), Message: message, Stream: stdType.String()}
if index := strings.IndexAny(message, " "); index != -1 {
logId := message[:index]
if timestamp, err := time.Parse(time.RFC3339Nano, logId); err == nil {
logEvent.Timestamp = timestamp.UnixMilli()
message = strings.TrimSuffix(message[index+1:], "\n")
logEvent.Message = message
if strings.HasPrefix(message, "{") && strings.HasSuffix(message, "}") {
var data map[string]interface{}
if err := json.Unmarshal([]byte(message), &data); err != nil {
log.Warnf("unable to parse json logs - error was \"%v\" while trying unmarshal \"%v\"", err.Error(), message)
} else {
logEvent.Message = data
}
}
}
}
logEvent.Level = guessLogLevel(logEvent)
g.channel <- logEvent
}
if readerError != nil {
g.lastError = readerError
close(g.channel)
return
}
}
}
var KEY_VALUE_REGEX = regexp.MustCompile(`level=(\w+)`)
var ANSI_COLOR_REGEX = regexp.MustCompile(`\x1b\[[0-9;]*m`)
func guessLogLevel(logEvent *LogEvent) string {
switch value := logEvent.Message.(type) {
case string:
levels := []string{"error", "warn", "warning", "info", "debug", "trace", "fatal"}
stripped := ANSI_COLOR_REGEX.ReplaceAllString(value, "") // remove ansi color codes
for _, level := range levels {
if match, _ := regexp.MatchString("(?i)^"+level+"[^a-z]", stripped); match {
return level
}
if strings.Contains(value, "["+strings.ToUpper(level)+"]") {
return level
}
if strings.Contains(value, " "+strings.ToUpper(level)+" ") {
return level
}
}
if matches := KEY_VALUE_REGEX.FindStringSubmatch(value); matches != nil {
return matches[1]
}
case map[string]interface{}:
if level, ok := value["level"].(string); ok {
return level
}
}
return ""
}

View File

@@ -1,56 +0,0 @@
package docker
import (
"bufio"
"io"
"strings"
"testing"
"github.com/magiconair/properties/assert"
"github.com/stretchr/testify/require"
)
func TestNewEventIterator(t *testing.T) {
input := "example input"
reader := bufio.NewReader(strings.NewReader("OUT" + input))
generator := NewEventIterator(reader)
require.NotNil(t, generator, "Expected generator to not be nil, but got nil")
}
func TestEventGenerator_Next(t *testing.T) {
input := "example input"
reader := bufio.NewReader(strings.NewReader("OUT" + input))
generator := NewEventIterator(reader)
event, err := generator.Next()
require.NoError(t, err, "Expected no error, but got: %v", err)
require.NotNil(t, event, "Expected event to not be nil, but got nil")
}
func TestEventGenerator_LastError(t *testing.T) {
input := "example input"
reader := bufio.NewReader(strings.NewReader("OUT" + input))
generator := NewEventIterator(reader)
require.Nil(t, generator.LastError(), "Expected LastError to return nil, but got: %v", generator.LastError())
generator.Next()
// expert error to be EOF
assert.Equal(t, generator.LastError(), io.EOF, "Expected LastError to return EOF, but got: %v", generator.LastError().Error())
}
func TestEventGenerator_Peek(t *testing.T) {
input := "example input"
reader := bufio.NewReader(strings.NewReader("OUT" + input))
generator := NewEventIterator(reader)
event := generator.Peek()
require.NotNil(t, event, "Expected event to not be nil, but got nil")
assert.Equal(t, event.Message, input, "Expected event message to be %s, but got: %s", input, event.Message.(string))
}

View File

@@ -1,61 +0,0 @@
package docker
import (
"bytes"
"encoding/binary"
"io"
)
type logReader struct {
readerCloser io.ReadCloser
tty bool
lastHeader []byte
buffer bytes.Buffer
label bool
}
func newLogReader(reader io.ReadCloser, tty bool, labelStd bool) io.ReadCloser {
return &logReader{
reader,
tty,
make([]byte, 8),
bytes.Buffer{},
labelStd,
}
}
func (r *logReader) Read(p []byte) (n int, err error) {
if r.tty {
return r.readerCloser.Read(p)
} else {
if r.buffer.Len() > 0 {
return r.buffer.Read(p)
} else {
r.buffer.Reset()
_, err := r.readerCloser.Read(r.lastHeader)
if err != nil {
return 0, err
}
if r.label {
std := r.lastHeader[0] // https://github.com/rancher/docker/blob/master/pkg/stdcopy/stdcopy.go#L94
if std == 1 {
r.buffer.WriteString("OUT")
}
if std == 2 {
r.buffer.WriteString("ERR")
}
}
count := binary.BigEndian.Uint32(r.lastHeader[4:])
_, err = io.CopyN(&r.buffer, r.readerCloser, int64(count))
if err != nil {
return 0, err
}
return r.buffer.Read(p)
}
}
}
func (r *logReader) Close() error {
return r.readerCloser.Close()
}

View File

@@ -17,6 +17,7 @@ type Container struct {
Status string `json:"status"`
Health string `json:"health,omitempty"`
Host string `json:"host,omitempty"`
Tty bool `json:"-"`
}
// ContainerStat represent stats instant for a container

View File

@@ -77,18 +77,21 @@ Connection: close
Content-Security-Policy: default-src 'none'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; manifest-src 'self'; connect-src 'self' api.github.com;
Content-Type: application/ld+json; charset=UTF-8
{"m":"INFO Testing logs...","ts":1589396137772,"id":1122614848,"l":"info","s":"stdout"}
{"m":"INFO Testing logs...","ts":1589396137772,"id":1543246723,"l":"info","s":"stderr"}
{"m":"INFO Testing stdout logs...","ts":1589396137772,"id":466600245,"l":"info","s":"stdout"}
{"m":"INFO Testing stderr logs...","ts":1589396197772,"id":1101501603,"l":"info","s":"stderr"}
/* snapshot: Test_handler_download_logs */
INFO Testing logs...
/* snapshot: Test_handler_streamEvents_error */
HTTP/1.1 200 OK
Connection: close
Cache-Control: no-transform
Cache-Control: no-cache
Connection: keep-alive
Content-Type: text/event-stream
X-Accel-Buffering: no
HTTP/1.1 200 OK
Connection: close
Cache-Control: no-transform
Cache-Control: no-cache
Connection: keep-alive
Content-Type: text/event-stream
X-Accel-Buffering: no
event: containers-changed
data: []
@@ -167,7 +170,7 @@ Content-Security-Policy: default-src 'none'; script-src 'self'; style-src 'self'
Content-Type: text/event-stream
X-Accel-Buffering: no
data: {"m":"INFO Testing logs...","ts":0,"id":852638900,"l":"info","s":"stdout"}
data: {"m":"INFO Testing logs...","ts":0,"id":4256192898,"l":"info","s":"stdout"}
event: container-stopped
data: end of stream
@@ -195,7 +198,7 @@ Content-Security-Policy: default-src 'none'; script-src 'self'; style-src 'self'
Content-Type: text/event-stream
X-Accel-Buffering: no
data: {"m":"INFO Testing logs...","ts":1589396137772,"id":3373215946,"l":"info","s":"stdout"}
data: {"m":"INFO Testing logs...","ts":1589396137772,"id":1469707724,"l":"info","s":"stdout"}
id: 1589396137772
event: container-stopped

View File

@@ -1,7 +1,6 @@
package web
import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
@@ -14,6 +13,7 @@ import (
"time"
"github.com/amir20/dozzle/docker"
"github.com/docker/docker/pkg/stdcopy"
"github.com/dustin/go-humanize"
"github.com/go-chi/chi/v5"
@@ -38,12 +38,16 @@ func (h *handler) downloadLogs(w http.ResponseWriter, r *http.Request) {
zw.Comment = "Logs generated by Dozzle"
zw.ModTime = now
reader, err := h.clientFromRequest(r).ContainerLogReader(r.Context(), container.ID)
reader, err := h.clientFromRequest(r).ContainerLogsBetweenDates(r.Context(), id, time.Time{}, now, docker.STDALL)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
io.Copy(zw, reader)
if container.Tty {
io.Copy(zw, reader)
} else {
stdcopy.StdCopy(zw, zw, reader)
}
}
func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request) {
@@ -66,25 +70,30 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request)
return
}
reader, err := h.clientFromRequest(r).ContainerLogsBetweenDates(r.Context(), id, from, to, stdTypes)
defer reader.Close()
container, err := h.clientFromRequest(r).FindContainer(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
reader, err := h.clientFromRequest(r).ContainerLogsBetweenDates(r.Context(), container.ID, from, to, stdTypes)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
buffered := bufio.NewReader(reader)
iterator := docker.NewEventIterator(buffered)
g := docker.NewEventGenerator(reader, container.Tty)
loop:
for {
logEvent, readerError := iterator.Next()
if readerError != nil {
break
}
if err := json.NewEncoder(w).Encode(logEvent); err != nil {
log.Errorf("json encoding error while streaming %v", err.Error())
select {
case event, ok := <-g.Events:
if !ok {
break loop
}
if err := json.NewEncoder(w).Encode(event); err != nil {
log.Errorf("json encoding error while streaming %v", err.Error())
}
}
}
}
@@ -138,57 +147,50 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) {
}
return
}
defer reader.Close()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
for {
select {
case <-r.Context().Done():
return
case <-ticker.C:
fmt.Fprintf(w, ":ping \n\n")
g := docker.NewEventGenerator(reader, container.Tty)
loop:
for {
select {
case event, ok := <-g.Events:
if !ok {
log.WithFields(log.Fields{"id": id}).Debug("stream closed")
break loop
}
if buf, err := json.Marshal(event); err != nil {
log.Errorf("json encoding error while streaming %v", err.Error())
} else {
fmt.Fprintf(w, "data: %s\n", buf)
}
if event.Timestamp > 0 {
fmt.Fprintf(w, "id: %d\n", event.Timestamp)
}
fmt.Fprintf(w, "\n")
f.Flush()
case <-ticker.C:
fmt.Fprintf(w, ":ping \n\n")
f.Flush()
}
}
select {
case err := <-g.Errors:
if err != nil {
if err == io.EOF {
log.Debugf("container stopped: %v", container.ID)
fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n")
f.Flush()
} else if err != context.Canceled {
log.Errorf("unknown error while streaming %v", err.Error())
}
}
}()
buffered := bufio.NewReader(reader)
iterator := docker.NewEventIterator(buffered)
for {
logEvent, err := iterator.Next()
if err != nil {
break
}
if buf, err := json.Marshal(logEvent); err != nil {
log.Errorf("json encoding error while streaming %v", err.Error())
} else {
fmt.Fprintf(w, "data: %s\n", buf)
}
if logEvent.Timestamp > 0 {
fmt.Fprintf(w, "id: %d\n", logEvent.Timestamp)
}
fmt.Fprintf(w, "\n")
f.Flush()
default:
}
log.Debugf("streaming stopped: %v", container.ID)
if iterator.LastError() == io.EOF {
log.Debugf("container stopped: %v", container.ID)
fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n")
f.Flush()
} else if iterator.LastError() != context.Canceled {
log.Errorf("unknown error while streaming %v", iterator.LastError().Error())
}
log.WithField("routines", runtime.NumGoroutine()).Debug("runtime goroutine stats")
if log.IsLevelEnabled(log.DebugLevel) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
@@ -197,6 +199,7 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) {
"allocated": humanize.Bytes(m.Alloc),
"totalAllocated": humanize.Bytes(m.TotalAlloc),
"system": humanize.Bytes(m.Sys),
"routines": runtime.NumGoroutine(),
}).Debug("runtime mem stats")
}
}

View File

@@ -0,0 +1,36 @@
package web
import (
"bytes"
"compress/gzip"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/amir20/dozzle/docker"
"github.com/beme/abide"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func Test_handler_download_logs(t *testing.T) {
id := "123456"
req, err := http.NewRequest("GET", "/api/logs/download/localhost/"+id, nil)
require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient)
data := makeMessage("INFO Testing logs...", docker.STDOUT)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Tty: false}, nil)
mockedClient.On("ContainerLogsBetweenDates", mock.Anything, id, mock.Anything, mock.Anything, docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil)
handler := createDefaultHandler(mockedClient)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
reader, _ := gzip.NewReader(rr.Body)
abide.AssertReader(t, t.Name(), reader)
mockedClient.AssertExpectations(t)
}

View File

@@ -1,6 +1,8 @@
package web
import (
"bytes"
"encoding/binary"
"errors"
"io"
"time"
@@ -27,9 +29,11 @@ func Test_handler_streamLogs_happy(t *testing.T) {
require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient)
reader := io.NopCloser(strings.NewReader("OUTINFO Testing logs..."))
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(reader, nil)
data := makeMessage("INFO Testing logs...", docker.STDOUT)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Tty: false}, nil)
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil)
handler := createDefaultHandler(mockedClient)
rr := httptest.NewRecorder()
@@ -49,9 +53,11 @@ func Test_handler_streamLogs_happy_with_id(t *testing.T) {
require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient)
reader := io.NopCloser(strings.NewReader("OUT2020-05-13T18:55:37.772853839Z INFO Testing logs..."))
data := makeMessage("2020-05-13T18:55:37.772853839Z INFO Testing logs...", docker.STDOUT)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(reader, nil)
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil)
handler := createDefaultHandler(mockedClient)
rr := httptest.NewRecorder()
@@ -139,7 +145,8 @@ func Test_handler_streamLogs_error_std(t *testing.T) {
// for /api/logs
func Test_handler_between_dates(t *testing.T) {
req, err := http.NewRequest("GET", "/api/logs/localhost/123456", nil)
id := "123456"
req, err := http.NewRequest("GET", "/api/logs/localhost/"+id, nil)
require.NoError(t, err, "NewRequest should not return an error.")
from, _ := time.Parse(time.RFC3339, "2018-01-01T00:00:00Z")
@@ -154,8 +161,13 @@ func Test_handler_between_dates(t *testing.T) {
req.URL.RawQuery = q.Encode()
mockedClient := new(MockedClient)
reader := io.NopCloser(strings.NewReader("OUT2020-05-13T18:55:37.772853839Z INFO Testing logs...\nERR2020-05-13T18:55:37.772853839Z INFO Testing logs...\n"))
mockedClient.On("ContainerLogsBetweenDates", mock.Anything, "123456", from, to, docker.STDALL).Return(reader, nil)
first := makeMessage("2020-05-13T18:55:37.772853839Z INFO Testing stdout logs...\n", docker.STDOUT)
second := makeMessage("2020-05-13T18:56:37.772853839Z INFO Testing stderr logs...\n", docker.STDERR)
data := append(first, second...)
mockedClient.On("ContainerLogsBetweenDates", mock.Anything, id, from, to, docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil)
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
handler := createDefaultHandler(mockedClient)
rr := httptest.NewRecorder()
@@ -163,3 +175,12 @@ func Test_handler_between_dates(t *testing.T) {
abide.AssertHTTPResponse(t, t.Name(), rr.Result())
mockedClient.AssertExpectations(t)
}
func makeMessage(message string, stream docker.StdType) []byte {
data := make([]byte, 8)
binary.BigEndian.PutUint32(data[4:], uint32(len(message)))
data[0] = byte(stream / 2)
data = append(data, []byte(message)...)
return data
}