From 1bb1081f1a5914fa925bd4d99e749fdd13f30747 Mon Sep 17 00:00:00 2001 From: Amir Raminfar Date: Tue, 11 Jul 2023 09:52:30 -0700 Subject: [PATCH] fix(performance): improves streaming of logs by using the Docker library to demultiplex (#2295) * fix(performance): improves streaming of logs by using the Docker library to demultiplex * fixes go tests * changes channel to unbuffered * fixes golang race test * fixes stopped containers * wip * uses different strategy * cleans tests * cleans tests * adds more tests * fixes possible bug * fixes download * adds more tests for download * uses pool * adds more tests --- .reflex | 2 +- docker/client.go | 47 ++----- docker/client_test.go | 33 +---- docker/event_generator.go | 234 +++++++++++++++++++++++++++++++++ docker/event_generator_test.go | 79 +++++++++++ docker/log_iterator.go | 173 ------------------------ docker/log_iterator_test.go | 56 -------- docker/reader.go | 61 --------- docker/types.go | 1 + web/__snapshots__/web.snapshot | 27 ++-- web/logs.go | 115 ++++++++-------- web/routes_download_test.go | 36 +++++ web/routes_logs_test.go | 37 ++++-- 13 files changed, 468 insertions(+), 433 deletions(-) create mode 100644 docker/event_generator.go create mode 100644 docker/event_generator_test.go delete mode 100644 docker/log_iterator.go delete mode 100644 docker/log_iterator_test.go delete mode 100644 docker/reader.go create mode 100644 web/routes_download_test.go diff --git a/.reflex b/.reflex index a45fb2ad..2a9540f2 100644 --- a/.reflex +++ b/.reflex @@ -1 +1 @@ --r '\.(go)$' -R 'node_modules' -G '*_test.go' -s -- go run main.go --level debug --remote-host tcp://64.225.88.189:2376|clashleaders.com +-r '\.(go)$' -R 'node_modules' -G '\*\_test.go' -s -- go run main.go --level debug --remote-host tcp://64.225.88.189:2376|clashleaders.com diff --git a/docker/client.go b/docker/client.go index 1c3a6ae7..965f6e82 100644 --- a/docker/client.go +++ b/docker/client.go @@ -62,7 +62,6 @@ type Client interface { ListContainers() ([]Container, error) FindContainer(string) (Container, error) ContainerLogs(context.Context, string, string, StdType) (io.ReadCloser, error) - ContainerLogReader(context.Context, string) (io.ReadCloser, error) Events(context.Context, chan<- ContainerEvent) <-chan error ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time, StdType) (io.ReadCloser, error) ContainerStats(context.Context, string, chan<- ContainerStat) error @@ -145,6 +144,12 @@ func (d *dockerClient) FindContainer(id string) (Container, error) { return container, fmt.Errorf("unable to find container with id: %s", id) } + if json, err := d.cli.ContainerInspect(context.Background(), container.ID); err == nil { + container.Tty = json.Config.Tty + } else { + return container, err + } + return container, nil } @@ -255,18 +260,12 @@ func (d *dockerClient) ContainerLogs(ctx context.Context, id string, since strin Since: since, } - log.Debugf("streaming logs from Docker with option: %+v", options) reader, err := d.cli.ContainerLogs(ctx, id, options) if err != nil { return nil, err } - containerJSON, err := d.cli.ContainerInspect(ctx, id) - if err != nil { - return nil, err - } - - return newLogReader(reader, containerJSON.Config.Tty, true), nil + return reader, nil } func (d *dockerClient) Events(ctx context.Context, messages chan<- ContainerEvent) <-chan error { @@ -297,30 +296,6 @@ func (d *dockerClient) Events(ctx context.Context, messages chan<- ContainerEven return errors } -func (d *dockerClient) ContainerLogReader(ctx context.Context, id string) (io.ReadCloser, error) { - options := types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Timestamps: true, - Since: time.Unix(0, 0).Format(time.RFC3339), - Until: time.Now().Format(time.RFC3339), - } - - reader, err := d.cli.ContainerLogs(ctx, id, options) - - if err != nil { - return nil, err - } - - containerJSON, err := d.cli.ContainerInspect(ctx, id) - if err != nil { - return nil, err - } - - return newLogReader(reader, containerJSON.Config.Tty, false), nil - -} - func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType StdType) (io.ReadCloser, error) { options := types.ContainerLogsOptions{ ShowStdout: stdType&STDOUT != 0, @@ -333,17 +308,11 @@ func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, log.Debugf("fetching logs from Docker with option: %+v", options) reader, err := d.cli.ContainerLogs(ctx, id, options) - if err != nil { return nil, err } - containerJSON, err := d.cli.ContainerInspect(ctx, id) - if err != nil { - return nil, err - } - - return newLogReader(reader, containerJSON.Config.Tty, true), nil + return reader, nil } func (d *dockerClient) Ping(ctx context.Context) (types.Ping, error) { diff --git a/docker/client_test.go b/docker/client_test.go index b1521e63..5e22f1ff 100644 --- a/docker/client_test.go +++ b/docker/client_test.go @@ -7,7 +7,6 @@ import ( "errors" "io" - "strings" "testing" "github.com/docker/docker/api/types" @@ -126,36 +125,11 @@ func Test_dockerClient_ContainerLogs_happy(t *testing.T) { options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true, Since: "since"} proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil) - json := types.ContainerJSON{Config: &container.Config{Tty: false}} - proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil) - client := &dockerClient{proxy, filters.NewArgs(), &Host{ID: "localhost"}} logReader, _ := client.ContainerLogs(context.Background(), id, "since", STDALL) actual, _ := io.ReadAll(logReader) - assert.Equal(t, expected, string(actual), "message doesn't match expected") - proxy.AssertExpectations(t) -} - -func Test_dockerClient_ContainerLogs_happy_with_tty(t *testing.T) { - id := "123456" - - proxy := new(mockedProxy) - expected := "INFO Testing logs..." - - reader := io.NopCloser(strings.NewReader(expected)) - options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true} - proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil) - - json := types.ContainerJSON{Config: &container.Config{Tty: true}} - proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil) - - client := &dockerClient{proxy, filters.NewArgs(), &Host{ID: "localhost"}} - logReader, _ := client.ContainerLogs(context.Background(), id, "", STDALL) - - actual, _ := io.ReadAll(logReader) - assert.Equal(t, expected, string(actual), "message doesn't match expected") - + assert.Equal(t, string(b), string(actual), "message doesn't match expected") proxy.AssertExpectations(t) } @@ -188,6 +162,10 @@ func Test_dockerClient_FindContainer_happy(t *testing.T) { proxy := new(mockedProxy) proxy.On("ContainerList", mock.Anything, mock.Anything).Return(containers, nil) + + json := types.ContainerJSON{Config: &container.Config{Tty: false}} + proxy.On("ContainerInspect", mock.Anything, "abcdefghijkl").Return(json, nil) + client := &dockerClient{proxy, filters.NewArgs(), &Host{ID: "localhost"}} container, err := client.FindContainer("abcdefghijkl") @@ -198,6 +176,7 @@ func Test_dockerClient_FindContainer_happy(t *testing.T) { Name: "z_test_container", Names: []string{"/z_test_container"}, Host: "localhost", + Tty: false, }) proxy.AssertExpectations(t) diff --git a/docker/event_generator.go b/docker/event_generator.go new file mode 100644 index 00000000..a04dfaf5 --- /dev/null +++ b/docker/event_generator.go @@ -0,0 +1,234 @@ +package docker + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "hash/fnv" + "io" + "regexp" + "strings" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +type EventGenerator struct { + Events chan *LogEvent + Errors chan error + reader *bufio.Reader + next *LogEvent + buffer chan *LogEvent + tty bool + wg sync.WaitGroup +} + +var bufPool = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} + +var BadHeaderErr = fmt.Errorf("dozzle/docker: unable to read header") + +func NewEventGenerator(reader io.Reader, tty bool) *EventGenerator { + generator := &EventGenerator{ + reader: bufio.NewReader(reader), + buffer: make(chan *LogEvent, 100), + Errors: make(chan error, 1), + Events: make(chan *LogEvent), + tty: tty, + } + generator.wg.Add(2) + go generator.consumeReader(&generator.wg) + go generator.processBuffer(&generator.wg) + return generator +} + +func (g *EventGenerator) processBuffer(wg *sync.WaitGroup) { + var current, next *LogEvent + + for { + if g.next != nil { + current = g.next + g.next = nil + next = g.peek() + } else { + event, ok := <-g.buffer + if !ok { + close(g.Events) + break + } + + current = event + next = g.peek() + } + + checkPosition(current, next) + + g.Events <- current + } + wg.Done() +} + +func (g *EventGenerator) consumeReader(wg *sync.WaitGroup) { + for { + message, streamType, readerError := readEvent(g.reader, g.tty) + if message != "" { + logEvent := createEvent(message, streamType) + logEvent.Level = guessLogLevel(logEvent) + g.buffer <- logEvent + } + + if readerError != nil { + if readerError != BadHeaderErr { + g.Errors <- readerError + } + close(g.buffer) + break + } + } + wg.Done() +} + +func (g *EventGenerator) peek() *LogEvent { + if g.next != nil { + return g.next + } + select { + case event := <-g.buffer: + g.next = event + return g.next + case <-time.After(50 * time.Millisecond): + return nil + } +} + +func readEvent(reader *bufio.Reader, tty bool) (string, StdType, error) { + header := []byte{0, 0, 0, 0, 0, 0, 0, 0} + buffer := bufPool.Get().(*bytes.Buffer) + buffer.Reset() + defer bufPool.Put(buffer) + var streamType StdType = STDOUT + if tty { + message, err := reader.ReadString('\n') + if err != nil { + return message, streamType, err + } + return message, streamType, nil + } else { + n, err := reader.Read(header) + if err != nil { + return "", streamType, err + } + if n != 8 { + return "", streamType, BadHeaderErr + } + + switch header[0] { + case 1: + streamType = STDOUT + case 2: + streamType = STDERR + default: + log.Warnf("unknown stream type %d", header[0]) + } + + count := binary.BigEndian.Uint32(header[4:]) + if count == 0 { + return "", streamType, nil + } + _, err = io.CopyN(buffer, reader, int64(count)) + if err != nil { + return "", streamType, err + } + return buffer.String(), streamType, nil + } +} + +func createEvent(message string, streamType StdType) *LogEvent { + h := fnv.New32a() + h.Write([]byte(message)) + logEvent := &LogEvent{Id: h.Sum32(), Message: message, Stream: streamType.String()} + if index := strings.IndexAny(message, " "); index != -1 { + logId := message[:index] + if timestamp, err := time.Parse(time.RFC3339Nano, logId); err == nil { + logEvent.Timestamp = timestamp.UnixMilli() + message = strings.TrimSuffix(message[index+1:], "\n") + logEvent.Message = message + if strings.HasPrefix(message, "{") && strings.HasSuffix(message, "}") { + var data map[string]interface{} + if err := json.Unmarshal([]byte(message), &data); err != nil { + log.Warnf("unable to parse json logs - error was \"%v\" while trying unmarshal \"%v\"", err.Error(), message) + } else { + logEvent.Message = data + } + } + } + } + return logEvent +} + +func checkPosition(currentEvent *LogEvent, nextEvent *LogEvent) { + currentLevel := guessLogLevel(currentEvent) + if nextEvent != nil { + if currentEvent.IsCloseToTime(nextEvent) && currentLevel != "" && !nextEvent.HasLevel() { + currentEvent.Position = START + nextEvent.Position = MIDDLE + } + + // If next item is not close to current item or has level, set current item position to end + if currentEvent.Position == MIDDLE && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(nextEvent)) { + currentEvent.Position = END + } + + // If next item is close to current item and has no level, set next item position to middle + if currentEvent.Position == MIDDLE && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(nextEvent) { + nextEvent.Position = MIDDLE + } + // Set next item level to current item level + if currentEvent.Position == START || currentEvent.Position == MIDDLE { + nextEvent.Level = currentEvent.Level + } + } else if currentEvent.Position == MIDDLE { + currentEvent.Position = END + } +} + +var KEY_VALUE_REGEX = regexp.MustCompile(`level=(\w+)`) +var ANSI_COLOR_REGEX = regexp.MustCompile(`\x1b\[[0-9;]*m`) + +func guessLogLevel(logEvent *LogEvent) string { + switch value := logEvent.Message.(type) { + case string: + levels := []string{"error", "warn", "warning", "info", "debug", "trace", "fatal"} + stripped := ANSI_COLOR_REGEX.ReplaceAllString(value, "") // remove ansi color codes + for _, level := range levels { + if match, _ := regexp.MatchString("(?i)^"+level+"[^a-z]", stripped); match { + return level + } + + if strings.Contains(value, "["+strings.ToUpper(level)+"]") { + return level + } + + if strings.Contains(value, " "+strings.ToUpper(level)+" ") { + return level + } + } + + if matches := KEY_VALUE_REGEX.FindStringSubmatch(value); matches != nil { + return matches[1] + } + + case map[string]interface{}: + if level, ok := value["level"].(string); ok { + return level + } + } + + return "" +} diff --git a/docker/event_generator_test.go b/docker/event_generator_test.go new file mode 100644 index 00000000..7ddb3034 --- /dev/null +++ b/docker/event_generator_test.go @@ -0,0 +1,79 @@ +package docker + +import ( + "bufio" + "bytes" + "encoding/binary" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventGenerator_Events_tty(t *testing.T) { + input := "example input" + reader := bufio.NewReader(strings.NewReader(input)) + + g := NewEventGenerator(reader, true) + event := <-g.Events + + require.NotNil(t, event, "Expected event to not be nil, but got nil") + assert.Equal(t, input, event.Message) +} + +func TestEventGenerator_Events_non_tty(t *testing.T) { + input := "example input" + reader := bytes.NewReader(makeMessage(input, STDOUT)) + + g := NewEventGenerator(reader, false) + event := <-g.Events + + require.NotNil(t, event, "Expected event to not be nil, but got nil") + assert.Equal(t, input, event.Message) +} + +func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) { + input := "example input" + reader := bytes.NewReader(makeMessage(input, STDOUT)) + + g := NewEventGenerator(reader, false) + <-g.Events + _, ok := <-g.Events + + assert.False(t, ok, "Expected channel to be closed") +} + +func TestEventGenerator_Events_routines_done(t *testing.T) { + input := "example input" + reader := bytes.NewReader(makeMessage(input, STDOUT)) + + g := NewEventGenerator(reader, false) + <-g.Events + assert.False(t, waitTimeout(&g.wg, 1*time.Second), "Expected routines to be done") +} + +func makeMessage(message string, stream StdType) []byte { + data := make([]byte, 8) + binary.BigEndian.PutUint32(data[4:], uint32(len(message))) + data[0] = byte(stream / 2) + data = append(data, []byte(message)...) + + return data +} + +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} diff --git a/docker/log_iterator.go b/docker/log_iterator.go deleted file mode 100644 index f233bf7d..00000000 --- a/docker/log_iterator.go +++ /dev/null @@ -1,173 +0,0 @@ -package docker - -import ( - "bufio" - "encoding/json" - "hash/fnv" - "regexp" - "strings" - "time" - - log "github.com/sirupsen/logrus" -) - -type eventGenerator struct { - reader *bufio.Reader - channel chan *LogEvent - next *LogEvent - lastError error -} - -func NewEventIterator(reader *bufio.Reader) *eventGenerator { - generator := &eventGenerator{reader: reader, channel: make(chan *LogEvent, 100)} - go generator.consume() - return generator -} - -func (g *eventGenerator) Next() (*LogEvent, error) { - var currentEvent *LogEvent - var nextEvent *LogEvent - if g.next != nil { - currentEvent = g.next - g.next = nil - nextEvent = g.Peek() - } else { - event, ok := <-g.channel - if !ok { - return nil, g.lastError - } - - currentEvent = event - nextEvent = g.Peek() - } - - currentLevel := guessLogLevel(currentEvent) - - if nextEvent != nil { - if currentEvent.IsCloseToTime(nextEvent) && currentLevel != "" && !nextEvent.HasLevel() { - currentEvent.Position = START - nextEvent.Position = MIDDLE - } - - // If next item is not close to current item or has level, set current item position to end - if currentEvent.Position == MIDDLE && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(nextEvent)) { - currentEvent.Position = END - } - - // If next item is close to current item and has no level, set next item position to middle - if currentEvent.Position == MIDDLE && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(nextEvent) { - nextEvent.Position = MIDDLE - } - // Set next item level to current item level - if currentEvent.Position == START || currentEvent.Position == MIDDLE { - nextEvent.Level = currentEvent.Level - } - } else if currentEvent.Position == MIDDLE { - currentEvent.Position = END - } - - return currentEvent, nil -} - -func (g *eventGenerator) LastError() error { - return g.lastError -} - -func (g *eventGenerator) Peek() *LogEvent { - if g.next != nil { - return g.next - } - select { - case event := <-g.channel: - g.next = event - return g.next - case <-time.After(50 * time.Millisecond): - return nil - } -} - -func (g *eventGenerator) consume() { - for { - message, readerError := g.reader.ReadString('\n') - - if message != "" { - h := fnv.New32a() - h.Write([]byte(message)) - std := message[:3] - var stdType StdType - switch std { - case "OUT": - stdType = STDOUT - message = message[3:] - case "ERR": - stdType = STDERR - message = message[3:] - default: - log.Debugf("unknown std type [%s] with message [%s]", std, message) - stdType = UNKNOWN - } - - logEvent := &LogEvent{Id: h.Sum32(), Message: message, Stream: stdType.String()} - - if index := strings.IndexAny(message, " "); index != -1 { - logId := message[:index] - if timestamp, err := time.Parse(time.RFC3339Nano, logId); err == nil { - logEvent.Timestamp = timestamp.UnixMilli() - message = strings.TrimSuffix(message[index+1:], "\n") - logEvent.Message = message - if strings.HasPrefix(message, "{") && strings.HasSuffix(message, "}") { - var data map[string]interface{} - if err := json.Unmarshal([]byte(message), &data); err != nil { - log.Warnf("unable to parse json logs - error was \"%v\" while trying unmarshal \"%v\"", err.Error(), message) - } else { - logEvent.Message = data - } - } - } - } - logEvent.Level = guessLogLevel(logEvent) - g.channel <- logEvent - } - - if readerError != nil { - g.lastError = readerError - close(g.channel) - return - } - } -} - -var KEY_VALUE_REGEX = regexp.MustCompile(`level=(\w+)`) -var ANSI_COLOR_REGEX = regexp.MustCompile(`\x1b\[[0-9;]*m`) - -func guessLogLevel(logEvent *LogEvent) string { - switch value := logEvent.Message.(type) { - case string: - levels := []string{"error", "warn", "warning", "info", "debug", "trace", "fatal"} - stripped := ANSI_COLOR_REGEX.ReplaceAllString(value, "") // remove ansi color codes - for _, level := range levels { - if match, _ := regexp.MatchString("(?i)^"+level+"[^a-z]", stripped); match { - return level - } - - if strings.Contains(value, "["+strings.ToUpper(level)+"]") { - return level - } - - if strings.Contains(value, " "+strings.ToUpper(level)+" ") { - return level - } - } - - if matches := KEY_VALUE_REGEX.FindStringSubmatch(value); matches != nil { - return matches[1] - } - - case map[string]interface{}: - if level, ok := value["level"].(string); ok { - return level - } - } - - return "" -} diff --git a/docker/log_iterator_test.go b/docker/log_iterator_test.go deleted file mode 100644 index db2b922e..00000000 --- a/docker/log_iterator_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package docker - -import ( - "bufio" - "io" - "strings" - "testing" - - "github.com/magiconair/properties/assert" - "github.com/stretchr/testify/require" -) - -func TestNewEventIterator(t *testing.T) { - input := "example input" - reader := bufio.NewReader(strings.NewReader("OUT" + input)) - - generator := NewEventIterator(reader) - require.NotNil(t, generator, "Expected generator to not be nil, but got nil") -} - -func TestEventGenerator_Next(t *testing.T) { - input := "example input" - reader := bufio.NewReader(strings.NewReader("OUT" + input)) - - generator := NewEventIterator(reader) - - event, err := generator.Next() - require.NoError(t, err, "Expected no error, but got: %v", err) - require.NotNil(t, event, "Expected event to not be nil, but got nil") -} - -func TestEventGenerator_LastError(t *testing.T) { - input := "example input" - reader := bufio.NewReader(strings.NewReader("OUT" + input)) - - generator := NewEventIterator(reader) - - require.Nil(t, generator.LastError(), "Expected LastError to return nil, but got: %v", generator.LastError()) - - generator.Next() - - // expert error to be EOF - assert.Equal(t, generator.LastError(), io.EOF, "Expected LastError to return EOF, but got: %v", generator.LastError().Error()) -} - -func TestEventGenerator_Peek(t *testing.T) { - input := "example input" - reader := bufio.NewReader(strings.NewReader("OUT" + input)) - - generator := NewEventIterator(reader) - - event := generator.Peek() - - require.NotNil(t, event, "Expected event to not be nil, but got nil") - assert.Equal(t, event.Message, input, "Expected event message to be %s, but got: %s", input, event.Message.(string)) -} diff --git a/docker/reader.go b/docker/reader.go deleted file mode 100644 index ee4eb457..00000000 --- a/docker/reader.go +++ /dev/null @@ -1,61 +0,0 @@ -package docker - -import ( - "bytes" - "encoding/binary" - "io" -) - -type logReader struct { - readerCloser io.ReadCloser - tty bool - lastHeader []byte - buffer bytes.Buffer - label bool -} - -func newLogReader(reader io.ReadCloser, tty bool, labelStd bool) io.ReadCloser { - return &logReader{ - reader, - tty, - make([]byte, 8), - bytes.Buffer{}, - labelStd, - } -} - -func (r *logReader) Read(p []byte) (n int, err error) { - if r.tty { - return r.readerCloser.Read(p) - } else { - if r.buffer.Len() > 0 { - return r.buffer.Read(p) - } else { - r.buffer.Reset() - _, err := r.readerCloser.Read(r.lastHeader) - if err != nil { - return 0, err - } - if r.label { - std := r.lastHeader[0] // https://github.com/rancher/docker/blob/master/pkg/stdcopy/stdcopy.go#L94 - - if std == 1 { - r.buffer.WriteString("OUT") - } - if std == 2 { - r.buffer.WriteString("ERR") - } - } - count := binary.BigEndian.Uint32(r.lastHeader[4:]) - _, err = io.CopyN(&r.buffer, r.readerCloser, int64(count)) - if err != nil { - return 0, err - } - return r.buffer.Read(p) - } - } -} - -func (r *logReader) Close() error { - return r.readerCloser.Close() -} diff --git a/docker/types.go b/docker/types.go index eb022cc8..9f21f5e6 100644 --- a/docker/types.go +++ b/docker/types.go @@ -17,6 +17,7 @@ type Container struct { Status string `json:"status"` Health string `json:"health,omitempty"` Host string `json:"host,omitempty"` + Tty bool `json:"-"` } // ContainerStat represent stats instant for a container diff --git a/web/__snapshots__/web.snapshot b/web/__snapshots__/web.snapshot index e7fad160..5b06fc77 100644 --- a/web/__snapshots__/web.snapshot +++ b/web/__snapshots__/web.snapshot @@ -77,18 +77,21 @@ Connection: close Content-Security-Policy: default-src 'none'; script-src 'self'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; manifest-src 'self'; connect-src 'self' api.github.com; Content-Type: application/ld+json; charset=UTF-8 -{"m":"INFO Testing logs...","ts":1589396137772,"id":1122614848,"l":"info","s":"stdout"} -{"m":"INFO Testing logs...","ts":1589396137772,"id":1543246723,"l":"info","s":"stderr"} +{"m":"INFO Testing stdout logs...","ts":1589396137772,"id":466600245,"l":"info","s":"stdout"} +{"m":"INFO Testing stderr logs...","ts":1589396197772,"id":1101501603,"l":"info","s":"stderr"} + +/* snapshot: Test_handler_download_logs */ +INFO Testing logs... /* snapshot: Test_handler_streamEvents_error */ -HTTP/1.1 200 OK -Connection: close -Cache-Control: no-transform -Cache-Control: no-cache -Connection: keep-alive -Content-Type: text/event-stream -X-Accel-Buffering: no - +HTTP/1.1 200 OK +Connection: close +Cache-Control: no-transform +Cache-Control: no-cache +Connection: keep-alive +Content-Type: text/event-stream +X-Accel-Buffering: no + event: containers-changed data: [] @@ -167,7 +170,7 @@ Content-Security-Policy: default-src 'none'; script-src 'self'; style-src 'self' Content-Type: text/event-stream X-Accel-Buffering: no -data: {"m":"INFO Testing logs...","ts":0,"id":852638900,"l":"info","s":"stdout"} +data: {"m":"INFO Testing logs...","ts":0,"id":4256192898,"l":"info","s":"stdout"} event: container-stopped data: end of stream @@ -195,7 +198,7 @@ Content-Security-Policy: default-src 'none'; script-src 'self'; style-src 'self' Content-Type: text/event-stream X-Accel-Buffering: no -data: {"m":"INFO Testing logs...","ts":1589396137772,"id":3373215946,"l":"info","s":"stdout"} +data: {"m":"INFO Testing logs...","ts":1589396137772,"id":1469707724,"l":"info","s":"stdout"} id: 1589396137772 event: container-stopped diff --git a/web/logs.go b/web/logs.go index a4a7c711..be3f461f 100644 --- a/web/logs.go +++ b/web/logs.go @@ -1,7 +1,6 @@ package web import ( - "bufio" "compress/gzip" "context" "encoding/json" @@ -14,6 +13,7 @@ import ( "time" "github.com/amir20/dozzle/docker" + "github.com/docker/docker/pkg/stdcopy" "github.com/dustin/go-humanize" "github.com/go-chi/chi/v5" @@ -38,12 +38,16 @@ func (h *handler) downloadLogs(w http.ResponseWriter, r *http.Request) { zw.Comment = "Logs generated by Dozzle" zw.ModTime = now - reader, err := h.clientFromRequest(r).ContainerLogReader(r.Context(), container.ID) + reader, err := h.clientFromRequest(r).ContainerLogsBetweenDates(r.Context(), id, time.Time{}, now, docker.STDALL) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - io.Copy(zw, reader) + if container.Tty { + io.Copy(zw, reader) + } else { + stdcopy.StdCopy(zw, zw, reader) + } } func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request) { @@ -66,25 +70,30 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request) return } - reader, err := h.clientFromRequest(r).ContainerLogsBetweenDates(r.Context(), id, from, to, stdTypes) - defer reader.Close() + container, err := h.clientFromRequest(r).FindContainer(id) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + reader, err := h.clientFromRequest(r).ContainerLogsBetweenDates(r.Context(), container.ID, from, to, stdTypes) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - buffered := bufio.NewReader(reader) - iterator := docker.NewEventIterator(buffered) + g := docker.NewEventGenerator(reader, container.Tty) +loop: for { - logEvent, readerError := iterator.Next() - if readerError != nil { - break - } - - if err := json.NewEncoder(w).Encode(logEvent); err != nil { - log.Errorf("json encoding error while streaming %v", err.Error()) + select { + case event, ok := <-g.Events: + if !ok { + break loop + } + if err := json.NewEncoder(w).Encode(event); err != nil { + log.Errorf("json encoding error while streaming %v", err.Error()) + } } } } @@ -138,57 +147,50 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) { } return } - defer reader.Close() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - go func() { - for { - select { - case <-r.Context().Done(): - return - case <-ticker.C: - fmt.Fprintf(w, ":ping \n\n") + g := docker.NewEventGenerator(reader, container.Tty) + +loop: + for { + select { + case event, ok := <-g.Events: + if !ok { + log.WithFields(log.Fields{"id": id}).Debug("stream closed") + break loop + } + if buf, err := json.Marshal(event); err != nil { + log.Errorf("json encoding error while streaming %v", err.Error()) + } else { + fmt.Fprintf(w, "data: %s\n", buf) + } + if event.Timestamp > 0 { + fmt.Fprintf(w, "id: %d\n", event.Timestamp) + } + fmt.Fprintf(w, "\n") + f.Flush() + case <-ticker.C: + fmt.Fprintf(w, ":ping \n\n") + f.Flush() + } + } + + select { + case err := <-g.Errors: + if err != nil { + if err == io.EOF { + log.Debugf("container stopped: %v", container.ID) + fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n") f.Flush() + } else if err != context.Canceled { + log.Errorf("unknown error while streaming %v", err.Error()) } } - }() - - buffered := bufio.NewReader(reader) - iterator := docker.NewEventIterator(buffered) - - for { - - logEvent, err := iterator.Next() - if err != nil { - break - } - - if buf, err := json.Marshal(logEvent); err != nil { - log.Errorf("json encoding error while streaming %v", err.Error()) - } else { - fmt.Fprintf(w, "data: %s\n", buf) - } - if logEvent.Timestamp > 0 { - fmt.Fprintf(w, "id: %d\n", logEvent.Timestamp) - } - fmt.Fprintf(w, "\n") - f.Flush() + default: } - log.Debugf("streaming stopped: %v", container.ID) - - if iterator.LastError() == io.EOF { - log.Debugf("container stopped: %v", container.ID) - fmt.Fprintf(w, "event: container-stopped\ndata: end of stream\n\n") - f.Flush() - } else if iterator.LastError() != context.Canceled { - log.Errorf("unknown error while streaming %v", iterator.LastError().Error()) - } - - log.WithField("routines", runtime.NumGoroutine()).Debug("runtime goroutine stats") - if log.IsLevelEnabled(log.DebugLevel) { var m runtime.MemStats runtime.ReadMemStats(&m) @@ -197,6 +199,7 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) { "allocated": humanize.Bytes(m.Alloc), "totalAllocated": humanize.Bytes(m.TotalAlloc), "system": humanize.Bytes(m.Sys), + "routines": runtime.NumGoroutine(), }).Debug("runtime mem stats") } } diff --git a/web/routes_download_test.go b/web/routes_download_test.go new file mode 100644 index 00000000..bf02471d --- /dev/null +++ b/web/routes_download_test.go @@ -0,0 +1,36 @@ +package web + +import ( + "bytes" + "compress/gzip" + "io" + + "net/http" + "net/http/httptest" + "testing" + + "github.com/amir20/dozzle/docker" + "github.com/beme/abide" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func Test_handler_download_logs(t *testing.T) { + id := "123456" + req, err := http.NewRequest("GET", "/api/logs/download/localhost/"+id, nil) + require.NoError(t, err, "NewRequest should not return an error.") + + mockedClient := new(MockedClient) + + data := makeMessage("INFO Testing logs...", docker.STDOUT) + + mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Tty: false}, nil) + mockedClient.On("ContainerLogsBetweenDates", mock.Anything, id, mock.Anything, mock.Anything, docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil) + + handler := createDefaultHandler(mockedClient) + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + reader, _ := gzip.NewReader(rr.Body) + abide.AssertReader(t, t.Name(), reader) + mockedClient.AssertExpectations(t) +} diff --git a/web/routes_logs_test.go b/web/routes_logs_test.go index 794f0a92..083ec41a 100644 --- a/web/routes_logs_test.go +++ b/web/routes_logs_test.go @@ -1,6 +1,8 @@ package web import ( + "bytes" + "encoding/binary" "errors" "io" "time" @@ -27,9 +29,11 @@ func Test_handler_streamLogs_happy(t *testing.T) { require.NoError(t, err, "NewRequest should not return an error.") mockedClient := new(MockedClient) - reader := io.NopCloser(strings.NewReader("OUTINFO Testing logs...")) - mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) - mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(reader, nil) + + data := makeMessage("INFO Testing logs...", docker.STDOUT) + + mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Tty: false}, nil) + mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil) handler := createDefaultHandler(mockedClient) rr := httptest.NewRecorder() @@ -49,9 +53,11 @@ func Test_handler_streamLogs_happy_with_id(t *testing.T) { require.NoError(t, err, "NewRequest should not return an error.") mockedClient := new(MockedClient) - reader := io.NopCloser(strings.NewReader("OUT2020-05-13T18:55:37.772853839Z INFO Testing logs...")) + + data := makeMessage("2020-05-13T18:55:37.772853839Z INFO Testing logs...", docker.STDOUT) + mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) - mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(reader, nil) + mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, "", docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil) handler := createDefaultHandler(mockedClient) rr := httptest.NewRecorder() @@ -139,7 +145,8 @@ func Test_handler_streamLogs_error_std(t *testing.T) { // for /api/logs func Test_handler_between_dates(t *testing.T) { - req, err := http.NewRequest("GET", "/api/logs/localhost/123456", nil) + id := "123456" + req, err := http.NewRequest("GET", "/api/logs/localhost/"+id, nil) require.NoError(t, err, "NewRequest should not return an error.") from, _ := time.Parse(time.RFC3339, "2018-01-01T00:00:00Z") @@ -154,8 +161,13 @@ func Test_handler_between_dates(t *testing.T) { req.URL.RawQuery = q.Encode() mockedClient := new(MockedClient) - reader := io.NopCloser(strings.NewReader("OUT2020-05-13T18:55:37.772853839Z INFO Testing logs...\nERR2020-05-13T18:55:37.772853839Z INFO Testing logs...\n")) - mockedClient.On("ContainerLogsBetweenDates", mock.Anything, "123456", from, to, docker.STDALL).Return(reader, nil) + + first := makeMessage("2020-05-13T18:55:37.772853839Z INFO Testing stdout logs...\n", docker.STDOUT) + second := makeMessage("2020-05-13T18:56:37.772853839Z INFO Testing stderr logs...\n", docker.STDERR) + data := append(first, second...) + + mockedClient.On("ContainerLogsBetweenDates", mock.Anything, id, from, to, docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil) + mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil) handler := createDefaultHandler(mockedClient) rr := httptest.NewRecorder() @@ -163,3 +175,12 @@ func Test_handler_between_dates(t *testing.T) { abide.AssertHTTPResponse(t, t.Name(), rr.Result()) mockedClient.AssertExpectations(t) } + +func makeMessage(message string, stream docker.StdType) []byte { + data := make([]byte, 8) + binary.BigEndian.PutUint32(data[4:], uint32(len(message))) + data[0] = byte(stream / 2) + data = append(data, []byte(message)...) + + return data +}