1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-21 13:23:07 +01:00

feat: supports k8s cluster (#3599)

This commit is contained in:
Amir Raminfar
2025-02-10 09:29:39 -08:00
committed by GitHub
parent 209ce312d4
commit 13da2a4222
50 changed files with 1591 additions and 734 deletions

View File

@@ -29,7 +29,7 @@ func (s StdType) String() string {
}
type Client interface {
ListContainers(context.Context, ContainerFilter) ([]Container, error)
ListContainers(context.Context, ContainerLabels) ([]Container, error)
FindContainer(context.Context, string) (Container, error)
ContainerLogs(context.Context, string, time.Time, StdType) (io.ReadCloser, error)
ContainerEvents(context.Context, chan<- ContainerEvent) error
@@ -38,5 +38,4 @@ type Client interface {
Ping(context.Context) error
Host() Host
ContainerActions(ctx context.Context, action ContainerAction, containerID string) error
IsSwarmMode() bool
}

View File

@@ -13,34 +13,40 @@ import (
"golang.org/x/sync/semaphore"
)
type StatsCollector interface {
Start(parentCtx context.Context) bool
Subscribe(ctx context.Context, stats chan<- ContainerStat)
Stop()
}
type ContainerStore struct {
containers *xsync.MapOf[string, *Container]
subscribers *xsync.MapOf[context.Context, chan<- ContainerEvent]
newContainerSubscribers *xsync.MapOf[context.Context, chan<- Container]
client Client
statsCollector *StatsCollector
statsCollector StatsCollector
wg sync.WaitGroup
connected atomic.Bool
events chan ContainerEvent
ctx context.Context
filter ContainerFilter
labels ContainerLabels
}
const defaultTimeout = 10 * time.Second
func NewContainerStore(ctx context.Context, client Client, filter ContainerFilter) *ContainerStore {
log.Debug().Str("host", client.Host().Name).Interface("filter", filter).Msg("initializing container store")
func NewContainerStore(ctx context.Context, client Client, statsCollect StatsCollector, labels ContainerLabels) *ContainerStore {
log.Debug().Str("host", client.Host().Name).Interface("labels", labels).Msg("initializing container store")
s := &ContainerStore{
containers: xsync.NewMapOf[string, *Container](),
client: client,
subscribers: xsync.NewMapOf[context.Context, chan<- ContainerEvent](),
newContainerSubscribers: xsync.NewMapOf[context.Context, chan<- Container](),
statsCollector: NewStatsCollector(client, filter),
statsCollector: statsCollect,
wg: sync.WaitGroup{},
events: make(chan ContainerEvent),
ctx: ctx,
filter: filter,
labels: labels,
}
s.wg.Add(1)
@@ -68,7 +74,7 @@ func (s *ContainerStore) checkConnectivity() error {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
if containers, err := s.client.ListContainers(ctx, s.filter); err != nil {
if containers, err := s.client.ListContainers(ctx, s.labels); err != nil {
return err
} else {
s.containers.Clear()
@@ -109,7 +115,7 @@ func (s *ContainerStore) checkConnectivity() error {
return nil
}
func (s *ContainerStore) ListContainers(filter ContainerFilter) ([]Container, error) {
func (s *ContainerStore) ListContainers(labels ContainerLabels) ([]Container, error) {
s.wg.Wait()
if err := s.checkConnectivity(); err != nil {
@@ -117,8 +123,8 @@ func (s *ContainerStore) ListContainers(filter ContainerFilter) ([]Container, er
}
containers := make([]Container, 0)
if filter.Exists() {
validContainers, err := s.client.ListContainers(s.ctx, filter)
if labels.Exists() {
validContainers, err := s.client.ListContainers(s.ctx, labels)
if err != nil {
return nil, err
}
@@ -143,11 +149,10 @@ func (s *ContainerStore) ListContainers(filter ContainerFilter) ([]Container, er
return containers, nil
}
func (s *ContainerStore) FindContainer(id string, filter ContainerFilter) (Container, error) {
func (s *ContainerStore) FindContainer(id string, labels ContainerLabels) (Container, error) {
s.wg.Wait()
if filter.Exists() {
validContainers, err := s.client.ListContainers(s.ctx, filter)
if labels.Exists() {
validContainers, err := s.client.ListContainers(s.ctx, labels)
if err != nil {
return Container{}, err
}
@@ -245,11 +250,36 @@ func (s *ContainerStore) init() {
case event := <-s.events:
log.Trace().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
switch event.Name {
case "create":
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if container, err := s.client.FindContainer(ctx, event.ActorID); err == nil {
list, _ := s.client.ListContainers(ctx, s.labels)
// make sure the container is in the list of containers when using filter
valid := lo.ContainsBy(list, func(item Container) bool {
return item.ID == container.ID
})
if valid {
log.Debug().Str("id", container.ID).Msg("container started")
s.containers.Store(container.ID, &container)
s.newContainerSubscribers.Range(func(c context.Context, containers chan<- Container) bool {
select {
case containers <- container:
case <-c.Done():
}
return true
})
}
}
cancel()
case "start":
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
if container, err := s.client.FindContainer(ctx, event.ActorID); err == nil {
list, _ := s.client.ListContainers(ctx, s.filter)
list, _ := s.client.ListContainers(ctx, s.labels)
// make sure the container is in the list of containers when using filter
valid := lo.ContainsBy(list, func(item Container) bool {
@@ -273,6 +303,43 @@ func (s *ContainerStore) init() {
log.Debug().Str("id", event.ActorID).Msg("container destroyed")
s.containers.Delete(event.ActorID)
case "update":
s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
if loaded {
log.Debug().Str("id", c.ID).Msg("container updated")
started := false
if newContainer, err := s.client.FindContainer(context.Background(), c.ID); err == nil {
if newContainer.State == "running" && c.State != "running" {
started = true
}
c.Name = newContainer.Name
c.State = newContainer.State
c.Labels = newContainer.Labels
c.StartedAt = newContainer.StartedAt
c.FinishedAt = newContainer.FinishedAt
c.Created = newContainer.Created
} else {
log.Error().Err(err).Str("id", c.ID).Msg("failed to update container")
}
if started {
s.subscribers.Range(func(ctx context.Context, events chan<- ContainerEvent) bool {
select {
case events <- ContainerEvent{
Name: "start",
ActorID: c.ID,
}:
case <-ctx.Done():
s.subscribers.Delete(ctx)
}
return true
})
}
return c, false
} else {
return c, true
}
})
case "die":
s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
if loaded {

View File

@@ -14,7 +14,7 @@ type mockedClient struct {
Client
}
func (m *mockedClient) ListContainers(ctx context.Context, filter ContainerFilter) ([]Container, error) {
func (m *mockedClient) ListContainers(ctx context.Context, filter ContainerLabels) ([]Container, error) {
args := m.Called(ctx, filter)
return args.Get(0).([]Container), args.Error(1)
}
@@ -65,58 +65,15 @@ func TestContainerStore_List(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store := NewContainerStore(ctx, client, ContainerFilter{})
containers, _ := store.ListContainers(ContainerFilter{})
collector := &fakeStatsCollector{}
store := NewContainerStore(ctx, client, collector, ContainerLabels{})
containers, _ := store.ListContainers(ContainerLabels{})
assert.Equal(t, containers[0].ID, "1234")
}
//TODO fix this test
// func TestContainerStore_die(t *testing.T) {
// client := new(mockedClient)
// client.On("ListContainers", mock.Anything, mock.Anything).Return([]Container{
// {
// ID: "1234",
// Name: "test",
// State: "running",
// Stats: utils.NewRingBuffer[ContainerStat](300),
// },
// }, nil)
type fakeStatsCollector struct{}
// client.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- container.ContainerEvent")).Return(nil).
// Run(func(args mock.Arguments) {
// ctx := args.Get(0).(context.Context)
// events := args.Get(1).(chan<- ContainerEvent)
// events <- ContainerEvent{
// Name: "die",
// ActorID: "1234",
// Host: "localhost",
// }
// <-ctx.Done()
// })
// client.On("Host").Return(Host{
// ID: "localhost",
// })
// client.On("ContainerStats", mock.Anything, "1234", mock.AnythingOfType("chan<- container.ContainerStat")).Return(nil)
// client.On("FindContainer", mock.Anything, "1234").Return(Container{
// ID: "1234",
// Name: "test",
// Image: "test",
// Stats: utils.NewRingBuffer[ContainerStat](300),
// }, nil)
// ctx, cancel := context.WithCancel(context.Background())
// t.Cleanup(cancel)
// store := NewContainerStore(ctx, client, ContainerFilter{})
// // Wait until we get the event
// events := make(chan ContainerEvent)
// store.SubscribeEvents(ctx, events)
// <-events
// containers, _ := store.ListContainers(ContainerFilter{})
// assert.Equal(t, containers[0].State, "exited")
// }
func (f *fakeStatsCollector) Subscribe(_ context.Context, _ chan<- ContainerStat) {}
func (f *fakeStatsCollector) Start(_ context.Context) bool { return true }
func (f *fakeStatsCollector) Stop() {}

View File

@@ -1,14 +1,10 @@
package container
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"hash/fnv"
"io"
"strings"
"sync"
"time"
@@ -23,30 +19,26 @@ import (
type EventGenerator struct {
Events chan *LogEvent
Errors chan error
reader *bufio.Reader
reader LogReader
next *LogEvent
buffer chan *LogEvent
tty bool
wg sync.WaitGroup
containerID string
ctx context.Context
}
var bufPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
var ErrBadHeader = fmt.Errorf("dozzle/docker: unable to read header")
func NewEventGenerator(ctx context.Context, reader io.Reader, container Container) *EventGenerator {
type LogReader interface {
Read() (string, StdType, error)
}
func NewEventGenerator(ctx context.Context, reader LogReader, container Container) *EventGenerator {
generator := &EventGenerator{
reader: bufio.NewReader(reader),
reader: reader,
buffer: make(chan *LogEvent, 100),
Errors: make(chan error, 1),
Events: make(chan *LogEvent),
tty: container.Tty,
containerID: container.ID,
ctx: ctx,
}
@@ -90,7 +82,7 @@ loop:
func (g *EventGenerator) consumeReader() {
for {
message, streamType, readerError := readEvent(g.reader, g.tty)
message, streamType, readerError := g.reader.Read()
if message != "" {
logEvent := createEvent(message, streamType)
logEvent.ContainerID = g.containerID
@@ -123,50 +115,6 @@ func (g *EventGenerator) peek() *LogEvent {
}
}
func readEvent(reader *bufio.Reader, tty bool) (string, StdType, error) {
header := []byte{0, 0, 0, 0, 0, 0, 0, 0}
buffer := bufPool.Get().(*bytes.Buffer)
buffer.Reset()
defer bufPool.Put(buffer)
var streamType StdType = STDOUT
if tty {
message, err := reader.ReadString('\n')
if err != nil {
return message, streamType, err
}
return message, streamType, nil
} else {
n, err := io.ReadFull(reader, header)
if err != nil {
return "", streamType, err
}
if n != 8 {
log.Warn().Bytes("header", header).Msg("short read")
message, _ := reader.ReadString('\n')
return message, streamType, ErrBadHeader
}
switch header[0] {
case 1:
streamType = STDOUT
case 2:
streamType = STDERR
default:
log.Warn().Bytes("header", header).Msg("unknown stream type")
}
count := binary.BigEndian.Uint32(header[4:])
if count == 0 {
return "", streamType, nil
}
_, err = io.CopyN(buffer, reader, int64(count))
if err != nil {
return "", streamType, err
}
return buffer.String(), streamType, nil
}
}
func createEvent(message string, streamType StdType) *LogEvent {
h := fnv.New32a()
h.Write([]byte(message))

View File

@@ -1,12 +1,9 @@
package container
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"io"
"reflect"
"strings"
"sync"
"testing"
"time"
@@ -18,9 +15,8 @@ import (
func TestEventGenerator_Events_tty(t *testing.T) {
input := "example input"
reader := bufio.NewReader(strings.NewReader(input))
g := NewEventGenerator(context.Background(), reader, Container{Tty: true})
g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: true})
event := <-g.Events
require.NotNil(t, event, "Expected event to not be nil, but got nil")
@@ -29,9 +25,8 @@ func TestEventGenerator_Events_tty(t *testing.T) {
func TestEventGenerator_Events_non_tty(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))
g := NewEventGenerator(context.Background(), reader, Container{Tty: false})
g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false})
event := <-g.Events
require.NotNil(t, event, "Expected event to not be nil, but got nil")
@@ -40,9 +35,8 @@ func TestEventGenerator_Events_non_tty(t *testing.T) {
func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))
g := NewEventGenerator(context.Background(), reader, Container{Tty: false})
g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false})
<-g.Events
_, ok := <-g.Events
@@ -51,20 +45,31 @@ func TestEventGenerator_Events_non_tty_close_channel(t *testing.T) {
func TestEventGenerator_Events_routines_done(t *testing.T) {
input := "example input"
reader := bytes.NewReader(makeMessage(input, STDOUT))
g := NewEventGenerator(context.Background(), reader, Container{Tty: false})
g := NewEventGenerator(context.Background(), makeFakeReader(input, STDOUT), Container{Tty: false})
<-g.Events
assert.False(t, waitTimeout(&g.wg, 1*time.Second), "Expected routines to be done")
}
func makeMessage(message string, stream StdType) []byte {
data := make([]byte, 8)
binary.BigEndian.PutUint32(data[4:], uint32(len(message)))
data[0] = byte(stream / 2)
data = append(data, []byte(message)...)
type mockLogReader struct {
messages []string
types []StdType
i int
}
return data
func (m *mockLogReader) Read() (string, StdType, error) {
if m.i >= len(m.messages) {
return "", 0, io.EOF
}
m.i++
return m.messages[m.i-1], m.types[m.i-1], nil
}
func makeFakeReader(message string, stream StdType) LogReader {
return &mockLogReader{
messages: []string{message},
types: []StdType{stream},
}
}
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
@@ -156,23 +161,3 @@ func Test_createEvent(t *testing.T) {
})
}
}
type mockReadCloser struct {
bytes []byte
}
func (m mockReadCloser) Read(p []byte) (int, error) {
return copy(p, m.bytes), nil
}
func Benchmark_readEvent(b *testing.B) {
b.ReportAllocs()
data := makeMessage("2020-05-13T18:55:37.772853839Z {\"key\": \"value\"}\n", STDOUT)
reader := bufio.NewReader(mockReadCloser{bytes: data})
for i := 0; i < b.N; i++ {
readEvent(reader, true)
}
}

View File

@@ -1,155 +0,0 @@
package container
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog/log"
)
type StatsCollector struct {
stream chan ContainerStat
subscribers *xsync.MapOf[context.Context, chan<- ContainerStat]
client Client
cancelers *xsync.MapOf[string, context.CancelFunc]
stopper context.CancelFunc
timer *time.Timer
mu sync.Mutex
totalStarted atomic.Int32
filter ContainerFilter
}
var timeToStop = 6 * time.Hour
func NewStatsCollector(client Client, filter ContainerFilter) *StatsCollector {
return &StatsCollector{
stream: make(chan ContainerStat),
subscribers: xsync.NewMapOf[context.Context, chan<- ContainerStat](),
client: client,
cancelers: xsync.NewMapOf[string, context.CancelFunc](),
filter: filter,
}
}
func (c *StatsCollector) Subscribe(ctx context.Context, stats chan<- ContainerStat) {
c.subscribers.Store(ctx, stats)
go func() {
<-ctx.Done()
c.subscribers.Delete(ctx)
}()
}
func (c *StatsCollector) forceStop() {
c.mu.Lock()
defer c.mu.Unlock()
if c.stopper != nil {
c.stopper()
c.stopper = nil
log.Debug().Str("host", c.client.Host().ID).Msg("stopped container stats collector")
}
}
func (c *StatsCollector) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
if c.totalStarted.Add(-1) == 0 {
c.timer = time.AfterFunc(timeToStop, func() {
c.forceStop()
})
}
}
func (c *StatsCollector) reset() {
c.mu.Lock()
defer c.mu.Unlock()
if c.timer != nil {
c.timer.Stop()
}
c.timer = nil
}
func streamStats(parent context.Context, sc *StatsCollector, id string) {
ctx, cancel := context.WithCancel(parent)
sc.cancelers.Store(id, cancel)
log.Debug().Str("container", id).Str("host", sc.client.Host().Name).Msg("starting to stream stats")
if err := sc.client.ContainerStats(ctx, id, sc.stream); err != nil {
log.Debug().Str("container", id).Str("host", sc.client.Host().Name).Err(err).Msg("stopping to stream stats")
if !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
log.Error().Str("container", id).Str("host", sc.client.Host().Name).Err(err).Msg("unexpected error while streaming stats")
}
}
}
// Start starts the stats collector and blocks until it's stopped. It returns true if the collector was stopped, false if it was already running
func (sc *StatsCollector) Start(parentCtx context.Context) bool {
sc.reset()
sc.totalStarted.Add(1)
sc.mu.Lock()
if sc.stopper != nil {
sc.mu.Unlock()
return false
}
var ctx context.Context
ctx, sc.stopper = context.WithCancel(parentCtx)
sc.mu.Unlock()
timeoutCtx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
if containers, err := sc.client.ListContainers(timeoutCtx, sc.filter); err == nil {
for _, c := range containers {
if c.State == "running" {
go streamStats(ctx, sc, c.ID)
}
}
} else {
log.Error().Str("host", sc.client.Host().Name).Err(err).Msg("failed to list containers")
}
cancel()
events := make(chan ContainerEvent)
go func() {
log.Debug().Str("host", sc.client.Host().Name).Msg("starting to listen to docker events")
err := sc.client.ContainerEvents(context.Background(), events)
if !errors.Is(err, context.Canceled) {
log.Error().Str("host", sc.client.Host().Name).Err(err).Msg("unexpected error while listening to docker events")
}
sc.forceStop()
}()
go func() {
for event := range events {
switch event.Name {
case "start":
go streamStats(ctx, sc, event.ActorID)
case "die":
if cancel, ok := sc.cancelers.LoadAndDelete(event.ActorID); ok {
cancel()
}
}
}
}()
for {
select {
case <-ctx.Done():
log.Info().Str("host", sc.client.Host().Name).Msg("stopped container stats collector")
return true
case stat := <-sc.stream:
sc.subscribers.Range(func(c context.Context, stats chan<- ContainerStat) bool {
select {
case stats <- stat:
case <-c.Done():
sc.subscribers.Delete(c)
}
return true
})
}
}
}

View File

@@ -1,84 +0,0 @@
package container
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func startedCollector(ctx context.Context) *StatsCollector {
client := new(mockedClient)
client.On("ListContainers", mock.Anything, mock.Anything).Return([]Container{
{
ID: "1234",
Name: "test",
State: "running",
},
}, nil)
client.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- container.ContainerEvent")).
Return(nil).
Run(func(args mock.Arguments) {
ctx := args.Get(0).(context.Context)
<-ctx.Done()
})
client.On("ContainerStats", mock.Anything, mock.Anything, mock.AnythingOfType("chan<- container.ContainerStat")).
Return(nil).
Run(func(args mock.Arguments) {
stats := args.Get(2).(chan<- ContainerStat)
stats <- ContainerStat{
ID: "1234",
}
})
client.On("Host").Return(Host{
ID: "localhost",
})
collector := NewStatsCollector(client, ContainerFilter{})
stats := make(chan ContainerStat)
collector.Subscribe(ctx, stats)
go collector.Start(ctx)
<-stats
return collector
}
func TestCancelers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
collector := startedCollector(ctx)
_, ok := collector.cancelers.Load("1234")
assert.True(t, ok, "canceler should be stored")
assert.False(t, collector.Start(ctx), "second start should return false")
assert.Equal(t, int32(2), collector.totalStarted.Load(), "total started should be 2")
collector.Stop()
assert.Equal(t, int32(1), collector.totalStarted.Load(), "total started should be 1")
}
func TestSecondStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
collector := startedCollector(ctx)
assert.False(t, collector.Start(ctx), "second start should return false")
assert.Equal(t, int32(2), collector.totalStarted.Load(), "total started should be 2")
collector.Stop()
assert.Equal(t, int32(1), collector.totalStarted.Load(), "total started should be 1")
}
func TestStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
collector := startedCollector(ctx)
collector.Stop()
assert.Equal(t, int32(0), collector.totalStarted.Load(), "total started should be 1")
}

View File

@@ -44,10 +44,10 @@ type ContainerEvent struct {
Time time.Time `json:"time"`
}
type ContainerFilter map[string][]string
type ContainerLabels map[string][]string
func ParseContainerFilter(commaValues string) (ContainerFilter, error) {
filter := make(ContainerFilter)
func ParseContainerFilter(commaValues string) (ContainerLabels, error) {
filter := make(ContainerLabels)
if commaValues == "" {
return filter, nil
}
@@ -65,7 +65,7 @@ func ParseContainerFilter(commaValues string) (ContainerFilter, error) {
return filter, nil
}
func (f ContainerFilter) Exists() bool {
func (f ContainerLabels) Exists() bool {
return len(f) > 0
}