1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-21 13:23:07 +01:00

feat: collects all stats like cpu and mem in background for up to 5 minutes (#2740)

This commit is contained in:
Amir Raminfar
2024-02-01 12:43:44 -08:00
committed by GitHub
parent 2c398cc227
commit 8677a34087
30 changed files with 518 additions and 254 deletions

View File

@@ -290,6 +290,7 @@ declare global {
const useSeoMeta: typeof import('@vueuse/head')['useSeoMeta']
const useSessionStorage: typeof import('@vueuse/core')['useSessionStorage']
const useShare: typeof import('@vueuse/core')['useShare']
const useSimpleRefHistory: typeof import('./utils/index')['useSimpleRefHistory']
const useSlots: typeof import('vue')['useSlots']
const useSorted: typeof import('@vueuse/core')['useSorted']
const useSpeechRecognition: typeof import('@vueuse/core')['useSpeechRecognition']
@@ -646,6 +647,7 @@ declare module 'vue' {
readonly useSeoMeta: UnwrapRef<typeof import('@vueuse/head')['useSeoMeta']>
readonly useSessionStorage: UnwrapRef<typeof import('@vueuse/core')['useSessionStorage']>
readonly useShare: UnwrapRef<typeof import('@vueuse/core')['useShare']>
readonly useSimpleRefHistory: UnwrapRef<typeof import('./utils/index')['useSimpleRefHistory']>
readonly useSlots: UnwrapRef<typeof import('vue')['useSlots']>
readonly useSorted: UnwrapRef<typeof import('@vueuse/core')['useSorted']>
readonly useSpeechRecognition: UnwrapRef<typeof import('@vueuse/core')['useSpeechRecognition']>
@@ -995,6 +997,7 @@ declare module '@vue/runtime-core' {
readonly useSeoMeta: UnwrapRef<typeof import('@vueuse/head')['useSeoMeta']>
readonly useSessionStorage: UnwrapRef<typeof import('@vueuse/core')['useSessionStorage']>
readonly useShare: UnwrapRef<typeof import('@vueuse/core')['useShare']>
readonly useSimpleRefHistory: UnwrapRef<typeof import('./utils/index')['useSimpleRefHistory']>
readonly useSlots: UnwrapRef<typeof import('vue')['useSlots']>
readonly useSorted: UnwrapRef<typeof import('@vueuse/core')['useSorted']>
readonly useSpeechRecognition: UnwrapRef<typeof import('@vueuse/core')['useSpeechRecognition']>

View File

@@ -31,9 +31,9 @@ function createFuzzySearchModal() {
initialState: {
container: {
containers: [
new Container("123", new Date(), "image", "test", "command", "host", {}, "status", "running"),
new Container("345", new Date(), "image", "foo bar", "command", "host", {}, "status", "running"),
new Container("567", new Date(), "image", "baz", "command", "host", {}, "status", "exited"),
new Container("123", new Date(), "image", "test", "command", "host", {}, "status", "running", []),
new Container("345", new Date(), "image", "foo bar", "command", "host", {}, "status", "running", []),
new Container("567", new Date(), "image", "baz", "command", "host", {}, "status", "exited", []),
],
},
},

View File

@@ -11,11 +11,11 @@ const { container } = useContainerContext();
const cpuData = computedWithControl(
() => container.value.stat,
() => {
const history = container.value.statHistory;
const history = container.value.statsHistory;
const points: Point<unknown>[] = history.map((stat, i) => ({
x: i,
y: Math.max(0, stat.snapshot.cpu),
value: Math.max(0, stat.snapshot.cpu).toFixed(2) + "%",
y: Math.max(0, stat.cpu),
value: Math.max(0, stat.cpu).toFixed(2) + "%",
}));
return points;
},
@@ -24,11 +24,11 @@ const cpuData = computedWithControl(
const memoryData = computedWithControl(
() => container.value.stat,
() => {
const history = container.value.statHistory;
const history = container.value.statsHistory;
const points: Point<string>[] = history.map((stat, i) => ({
x: i,
y: stat.snapshot.memory,
value: formatBytes(stat.snapshot.memoryUsage),
y: stat.memory,
value: formatBytes(stat.memoryUsage),
}));
return points;
},

View File

@@ -12,7 +12,7 @@ import { area, curveStep } from "d3-shape";
const d3 = { extent, scaleLinear, area, curveStep };
const { data, width = 150, height = 30 } = defineProps<{ data: Point<unknown>[]; width?: number; height?: number }>();
const x = d3.scaleLinear().range([width, 0]);
const x = d3.scaleLinear().range([0, width]);
const y = d3.scaleLinear().range([height, 0]);
const selectedPoint = defineEmit<[value: Point<unknown>]>();

View File

@@ -18,7 +18,7 @@ describe("Container", () => {
];
test.each(names)("name %s should be %s and %s", (name, expectedName, expectedSwarmId) => {
const c = new Container("id", new Date(), "image", name!, "command", "host", {}, "status", "created");
const c = new Container("id", new Date(), "image", name!, "command", "host", {}, "status", "created", []);
expect(c.name).toBe(expectedName);
expect(c.swarmId).toBe(expectedSwarmId);
});

View File

@@ -1,6 +1,5 @@
import type { ContainerHealth, ContainerStat, ContainerState } from "@/types/Container";
import type { UseThrottledRefHistoryReturn } from "@vueuse/core";
import { useExponentialMovingAverage } from "@/utils";
import { useExponentialMovingAverage, useSimpleRefHistory } from "@/utils";
import { Ref } from "vue";
type Stat = Omit<ContainerStat, "id">;
@@ -19,7 +18,7 @@ const hosts = computed(() =>
export class Container {
private _stat: Ref<Stat>;
private readonly throttledStatHistory: UseThrottledRefHistoryReturn<Stat, Stat>;
private readonly _statsHistory: Ref<Stat[]>;
public readonly swarmId: string | null = null;
public readonly isSwarm: boolean = false;
private readonly movingAverageStat: Ref<Stat>;
@@ -34,10 +33,11 @@ export class Container {
public readonly labels = {} as Record<string, string>,
public status: string,
public state: ContainerState,
stats: Stat[],
public health?: ContainerHealth,
) {
this._stat = ref({ cpu: 0, memory: 0, memoryUsage: 0 });
this.throttledStatHistory = useThrottledRefHistory(this._stat, { capacity: 300, deep: true, throttle: 1000 });
this._statsHistory = useSimpleRefHistory(this._stat, { capacity: 300, deep: true, initial: stats });
this.movingAverageStat = useExponentialMovingAverage(this._stat, 0.2);
const match = name.match(SWARM_ID_REGEX);
@@ -48,8 +48,8 @@ export class Container {
}
}
get statHistory() {
return unref(this.throttledStatHistory.history);
get statsHistory() {
return unref(this._statsHistory);
}
get movingAverage() {

View File

@@ -64,7 +64,7 @@ export const useContainerStore = defineStore("container", () => {
const event = JSON.parse((e as MessageEvent).data) as { actorId: string };
const container = allContainersById.value[event.actorId];
if (container) {
container.state = "dead";
container.state = "exited";
}
});
@@ -127,6 +127,7 @@ export const useContainerStore = defineStore("container", () => {
c.labels,
c.status,
c.state,
c.stats,
c.health,
);
}),

View File

@@ -15,6 +15,7 @@ export type ContainerJson = {
readonly state: ContainerState;
readonly host: string;
readonly labels: Record<string, string>;
readonly stats: ContainerStat[];
readonly health?: ContainerHealth;
};

View File

@@ -51,3 +51,27 @@ export function useExponentialMovingAverage<T extends Record<string, number>>(so
return ema;
}
interface UseSimpleRefHistoryOptions<T> {
capacity: number;
deep?: boolean;
initial?: T[];
}
export function useSimpleRefHistory<T>(source: Ref<T>, options: UseSimpleRefHistoryOptions<T>) {
const { capacity, deep = true, initial = [] as T[] } = options;
const history = ref<T[]>(initial) as Ref<T[]>;
watch(
source,
(value) => {
history.value.push(value);
if (history.value.length > capacity) {
history.value.shift();
}
},
{ deep },
);
return history;
}

View File

@@ -12,6 +12,7 @@ import (
"strings"
"time"
"github.com/amir20/dozzle/internal/utils"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
@@ -55,18 +56,30 @@ type DockerCLI interface {
ContainerRestart(ctx context.Context, containerID string, options container.StopOptions) error
}
type Client struct {
type Client interface {
ListContainers() ([]Container, error)
FindContainer(string) (Container, error)
ContainerLogs(context.Context, string, string, StdType) (io.ReadCloser, error)
Events(context.Context, chan<- ContainerEvent) <-chan error
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time, StdType) (io.ReadCloser, error)
ContainerStats(context.Context, string, chan<- ContainerStat) error
Ping(context.Context) (types.Ping, error)
Host() *Host
ContainerActions(action string, containerID string) error
}
type _client struct {
cli DockerCLI
filters filters.Args
host *Host
}
func NewClient(cli DockerCLI, filters filters.Args, host *Host) *Client {
return &Client{cli, filters, host}
func NewClient(cli DockerCLI, filters filters.Args, host *Host) Client {
return &_client{cli, filters, host}
}
// NewClientWithFilters creates a new instance of Client with docker filters
func NewClientWithFilters(f map[string][]string) (*Client, error) {
func NewClientWithFilters(f map[string][]string) (Client, error) {
filterArgs := filters.NewArgs()
for key, values := range f {
for _, value := range values {
@@ -85,7 +98,7 @@ func NewClientWithFilters(f map[string][]string) (*Client, error) {
return NewClient(cli, filterArgs, &Host{Name: "localhost", ID: "localhost"}), nil
}
func NewClientWithTlsAndFilter(f map[string][]string, host Host) (*Client, error) {
func NewClientWithTlsAndFilter(f map[string][]string, host Host) (Client, error) {
filterArgs := filters.NewArgs()
for key, values := range f {
for _, value := range values {
@@ -121,7 +134,7 @@ func NewClientWithTlsAndFilter(f map[string][]string, host Host) (*Client, error
return NewClient(cli, filterArgs, &host), nil
}
func (d *Client) FindContainer(id string) (Container, error) {
func (d *_client) FindContainer(id string) (Container, error) {
var container Container
containers, err := d.ListContainers()
if err != nil {
@@ -149,7 +162,7 @@ func (d *Client) FindContainer(id string) (Container, error) {
return container, nil
}
func (d *Client) ContainerActions(action string, containerID string) error {
func (d *_client) ContainerActions(action string, containerID string) error {
switch action {
case "start":
return d.cli.ContainerStart(context.Background(), containerID, container.StartOptions{})
@@ -162,7 +175,7 @@ func (d *Client) ContainerActions(action string, containerID string) error {
}
}
func (d *Client) ListContainers() ([]Container, error) {
func (d *_client) ListContainers() ([]Container, error) {
containerListOptions := container.ListOptions{
Filters: d.filters,
All: true,
@@ -191,6 +204,7 @@ func (d *Client) ListContainers() ([]Container, error) {
Host: d.host.ID,
Health: findBetweenParentheses(c.Status),
Labels: c.Labels,
Stats: utils.NewRingBuffer[ContainerStat](300), // 300 seconds of stats
}
containers = append(containers, container)
}
@@ -202,68 +216,60 @@ func (d *Client) ListContainers() ([]Container, error) {
return containers, nil
}
func (d *Client) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error {
func (d *_client) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error {
response, err := d.cli.ContainerStats(ctx, id, true)
if err != nil {
return err
}
go func() {
log.Debugf("starting to stream stats for: %s", id)
defer response.Body.Close()
decoder := json.NewDecoder(response.Body)
var v *types.StatsJSON
for {
if err := decoder.Decode(&v); err != nil {
if err == context.Canceled || err == io.EOF {
log.Debugf("stopping stats streaming for container %s", id)
return
}
log.Errorf("decoder for stats api returned an unknown error %v", err)
}
log.Debugf("starting to stream stats for: %s", id)
defer response.Body.Close()
decoder := json.NewDecoder(response.Body)
var v *types.StatsJSON
for {
if err := decoder.Decode(&v); err != nil {
return err
}
var (
memPercent, cpuPercent float64
mem, memLimit float64
previousCPU uint64
previousSystem uint64
)
daemonOSType := response.OSType
var (
memPercent, cpuPercent float64
mem, memLimit float64
previousCPU uint64
previousSystem uint64
)
daemonOSType := response.OSType
if daemonOSType != "windows" {
previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
previousSystem = v.PreCPUStats.SystemUsage
cpuPercent = calculateCPUPercentUnix(previousCPU, previousSystem, v)
mem = calculateMemUsageUnixNoCache(v.MemoryStats)
memLimit = float64(v.MemoryStats.Limit)
memPercent = calculateMemPercentUnixNoCache(memLimit, mem)
} else {
cpuPercent = calculateCPUPercentWindows(v)
mem = float64(v.MemoryStats.PrivateWorkingSet)
}
if daemonOSType != "windows" {
previousCPU = v.PreCPUStats.CPUUsage.TotalUsage
previousSystem = v.PreCPUStats.SystemUsage
cpuPercent = calculateCPUPercentUnix(previousCPU, previousSystem, v)
mem = calculateMemUsageUnixNoCache(v.MemoryStats)
memLimit = float64(v.MemoryStats.Limit)
memPercent = calculateMemPercentUnixNoCache(memLimit, mem)
} else {
cpuPercent = calculateCPUPercentWindows(v)
mem = float64(v.MemoryStats.PrivateWorkingSet)
}
log.Tracef("containerId = %s, cpuPercent = %f, memPercent = %f, memUsage = %f, daemonOSType = %s", id, cpuPercent, memPercent, mem, daemonOSType)
log.Tracef("containerId = %s, cpuPercent = %f, memPercent = %f, memUsage = %f, daemonOSType = %s", id, cpuPercent, memPercent, mem, daemonOSType)
if cpuPercent > 0 || mem > 0 {
select {
case <-ctx.Done():
return
case stats <- ContainerStat{
ID: id,
CPUPercent: cpuPercent,
MemoryPercent: memPercent,
MemoryUsage: mem,
}:
}
if cpuPercent > 0 || mem > 0 {
select {
case <-ctx.Done():
return nil
case stats <- ContainerStat{
ID: id,
CPUPercent: cpuPercent,
MemoryPercent: memPercent,
MemoryUsage: mem,
}:
}
}
}()
return nil
}
}
func (d *Client) ContainerLogs(ctx context.Context, id string, since string, stdType StdType) (io.ReadCloser, error) {
func (d *_client) ContainerLogs(ctx context.Context, id string, since string, stdType StdType) (io.ReadCloser, error) {
log.WithField("id", id).WithField("since", since).WithField("stdType", stdType).Debug("streaming logs for container")
if since != "" {
@@ -291,7 +297,7 @@ func (d *Client) ContainerLogs(ctx context.Context, id string, since string, std
return reader, nil
}
func (d *Client) Events(ctx context.Context, messages chan<- ContainerEvent) <-chan error {
func (d *_client) Events(ctx context.Context, messages chan<- ContainerEvent) <-chan error {
dockerMessages, errors := d.cli.Events(ctx, types.EventsOptions{})
go func() {
@@ -319,7 +325,7 @@ func (d *Client) Events(ctx context.Context, messages chan<- ContainerEvent) <-c
return errors
}
func (d *Client) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType StdType) (io.ReadCloser, error) {
func (d *_client) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType StdType) (io.ReadCloser, error) {
options := container.LogsOptions{
ShowStdout: stdType&STDOUT != 0,
ShowStderr: stdType&STDERR != 0,
@@ -338,11 +344,11 @@ func (d *Client) ContainerLogsBetweenDates(ctx context.Context, id string, from
return reader, nil
}
func (d *Client) Ping(ctx context.Context) (types.Ping, error) {
func (d *_client) Ping(ctx context.Context) (types.Ping, error) {
return d.cli.Ping(ctx)
}
func (d *Client) Host() *Host {
func (d *_client) Host() *Host {
return d.host
}

View File

@@ -89,7 +89,7 @@ func (m *mockedProxy) ContainerRestart(ctx context.Context, containerID string,
func Test_dockerClient_ListContainers_null(t *testing.T) {
proxy := new(mockedProxy)
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(nil, nil)
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
list, err := client.ListContainers()
assert.Empty(t, list, "list should be empty")
@@ -101,7 +101,7 @@ func Test_dockerClient_ListContainers_null(t *testing.T) {
func Test_dockerClient_ListContainers_error(t *testing.T) {
proxy := new(mockedProxy)
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(nil, errors.New("test"))
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
list, err := client.ListContainers()
assert.Nil(t, list, "list should be nil")
@@ -124,25 +124,15 @@ func Test_dockerClient_ListContainers_happy(t *testing.T) {
proxy := new(mockedProxy)
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(containers, nil)
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
list, err := client.ListContainers()
require.NoError(t, err, "error should not return an error.")
assert.Equal(t, list, []Container{
{
ID: "1234567890_a",
Name: "a_test_container",
Names: []string{"/a_test_container"},
Host: "localhost",
},
{
ID: "abcdefghijkl",
Name: "z_test_container",
Names: []string{"/z_test_container"},
Host: "localhost",
},
})
Ids := []string{"1234567890_a", "abcdefghijkl"}
for i, container := range list {
assert.Equal(t, container.ID, Ids[i])
}
proxy.AssertExpectations(t)
}
@@ -161,7 +151,7 @@ func Test_dockerClient_ContainerLogs_happy(t *testing.T) {
options := container.LogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true, Since: "since"}
proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil)
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
logReader, _ := client.ContainerLogs(context.Background(), id, "since", STDALL)
actual, _ := io.ReadAll(logReader)
@@ -175,7 +165,7 @@ func Test_dockerClient_ContainerLogs_error(t *testing.T) {
proxy.On("ContainerLogs", mock.Anything, id, mock.Anything).Return(nil, errors.New("test"))
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
reader, err := client.ContainerLogs(context.Background(), id, "", STDALL)
@@ -202,18 +192,12 @@ func Test_dockerClient_FindContainer_happy(t *testing.T) {
json := types.ContainerJSON{Config: &container.Config{Tty: false}}
proxy.On("ContainerInspect", mock.Anything, "abcdefghijkl").Return(json, nil)
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
container, err := client.FindContainer("abcdefghijkl")
require.NoError(t, err, "error should not be thrown")
assert.Equal(t, container, Container{
ID: "abcdefghijkl",
Name: "z_test_container",
Names: []string{"/z_test_container"},
Host: "localhost",
Tty: false,
})
assert.Equal(t, container.ID, "abcdefghijkl")
proxy.AssertExpectations(t)
}
@@ -231,7 +215,7 @@ func Test_dockerClient_FindContainer_error(t *testing.T) {
proxy := new(mockedProxy)
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(containers, nil)
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
_, err := client.FindContainer("not_valid")
require.Error(t, err, "error should be thrown")
@@ -252,7 +236,7 @@ func Test_dockerClient_ContainerActions_happy(t *testing.T) {
}
proxy := new(mockedProxy)
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
json := types.ContainerJSON{Config: &container.Config{Tty: false}}
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(containers, nil)
proxy.On("ContainerInspect", mock.Anything, "abcdefghijkl").Return(json, nil)
@@ -263,13 +247,7 @@ func Test_dockerClient_ContainerActions_happy(t *testing.T) {
container, err := client.FindContainer("abcdefghijkl")
require.NoError(t, err, "error should not be thrown")
assert.Equal(t, container, Container{
ID: "abcdefghijkl",
Name: "z_test_container",
Names: []string{"/z_test_container"},
Host: "localhost",
Tty: false,
})
assert.Equal(t, container.ID, "abcdefghijkl")
actions := []string{"start", "stop", "restart"}
for _, action := range actions {
@@ -294,7 +272,7 @@ func Test_dockerClient_ContainerActions_error(t *testing.T) {
}
proxy := new(mockedProxy)
client := &Client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
client := &_client{proxy, filters.NewArgs(), &Host{ID: "localhost"}}
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(containers, nil)
proxy.On("ContainerStart", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test"))

View File

@@ -0,0 +1,121 @@
package docker
import (
"context"
log "github.com/sirupsen/logrus"
)
type ContainerStore struct {
containers map[string]*Container
client Client
statsCollector *StatsCollector
subscribers []chan ContainerEvent
}
func NewContainerStore(client Client) *ContainerStore {
s := &ContainerStore{
containers: make(map[string]*Container),
client: client,
statsCollector: NewStatsCollector(client),
}
go s.init(context.Background())
go s.statsCollector.StartCollecting(context.Background())
return s
}
func (s *ContainerStore) List() []Container {
containers := make([]Container, 0, len(s.containers))
for _, c := range s.containers {
containers = append(containers, *c)
}
return containers
}
func (s *ContainerStore) Client() Client {
return s.client
}
func (s *ContainerStore) Subscribe(events chan ContainerEvent) {
s.subscribers = append(s.subscribers, events)
}
func (s *ContainerStore) Unsubscribe(toRemove chan ContainerEvent) {
for i, sub := range s.subscribers {
if sub == toRemove {
s.subscribers = append(s.subscribers[:i], s.subscribers[i+1:]...)
break
}
}
}
func (s *ContainerStore) SubscribeStats(stats chan ContainerStat) {
s.statsCollector.Subscribe(stats)
}
func (s *ContainerStore) UnsubscribeStats(toRemove chan ContainerStat) {
s.statsCollector.Unsubscribe(toRemove)
}
func (s *ContainerStore) init(ctx context.Context) {
containers, err := s.client.ListContainers()
if err != nil {
log.Fatalf("error while listing containers: %v", err)
}
for _, c := range containers {
c := c // create a new variable to avoid capturing the loop variable
s.containers[c.ID] = &c
}
events := make(chan ContainerEvent)
s.client.Events(ctx, events)
stats := make(chan ContainerStat)
s.statsCollector.Subscribe(stats)
defer s.statsCollector.Unsubscribe(stats)
for {
select {
case event := <-events:
log.Debugf("received event: %+v", event)
switch event.Name {
case "start":
if container, err := s.client.FindContainer(event.ActorID); err == nil {
s.containers[container.ID] = &container
}
case "destroy":
log.Debugf("container %s destroyed", event.ActorID)
delete(s.containers, event.ActorID)
case "die":
if container, ok := s.containers[event.ActorID]; ok {
log.Debugf("container %s died", container.ID)
container.State = "exited"
}
case "health_status: healthy", "health_status: unhealthy":
healthy := "unhealthy"
if event.Name == "health_status: healthy" {
healthy = "healthy"
}
if container, ok := s.containers[event.ActorID]; ok {
log.Debugf("container %s is %s", container.ID, healthy)
container.Health = healthy
}
}
for _, sub := range s.subscribers {
sub <- event
}
case stat := <-stats:
if container, ok := s.containers[stat.ID]; ok {
container.Stats.Push(stat)
}
case <-ctx.Done():
return
}
}
}

View File

@@ -0,0 +1,92 @@
package docker
import (
"context"
"errors"
"io"
log "github.com/sirupsen/logrus"
)
type StatsCollector struct {
stream chan ContainerStat
subscribers []chan ContainerStat
client Client
cancelers map[string]context.CancelFunc
}
func NewStatsCollector(client Client) *StatsCollector {
return &StatsCollector{
stream: make(chan ContainerStat),
subscribers: []chan ContainerStat{},
client: client,
cancelers: make(map[string]context.CancelFunc),
}
}
func (c *StatsCollector) Subscribe(stats chan ContainerStat) {
c.subscribers = append(c.subscribers, stats)
}
func (c *StatsCollector) Unsubscribe(subscriber chan ContainerStat) {
for i, s := range c.subscribers {
if s == subscriber {
c.subscribers = append(c.subscribers[:i], c.subscribers[i+1:]...)
break
}
}
}
func (sc *StatsCollector) StartCollecting(ctx context.Context) {
if containers, err := sc.client.ListContainers(); err == nil {
for _, c := range containers {
if c.State == "running" {
go func(client Client, id string) {
ctx, cancel := context.WithCancel(ctx)
sc.cancelers[id] = cancel
if err := client.ContainerStats(ctx, id, sc.stream); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
log.Errorf("unexpected error when streaming container stats: %v", err)
}
}
}(sc.client, c.ID)
}
}
} else {
log.Errorf("error while listing containers: %v", err)
}
go func() {
events := make(chan ContainerEvent)
sc.client.Events(ctx, events)
for event := range events {
switch event.Name {
case "start":
go func(client Client, id string) {
if err := client.ContainerStats(ctx, id, sc.stream); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
log.Errorf("unexpected error when streaming container stats: %v", err)
}
}
}(sc.client, event.ActorID)
case "die":
if cancel, ok := sc.cancelers[event.ActorID]; ok {
cancel()
delete(sc.cancelers, event.ActorID)
}
}
}
}()
for {
select {
case <-ctx.Done():
return
case stat := <-sc.stream:
for _, subscriber := range sc.subscribers {
subscriber <- stat
}
}
}
}

View File

@@ -2,23 +2,26 @@ package docker
import (
"math"
"github.com/amir20/dozzle/internal/utils"
)
// Container represents an internal representation of docker containers
type Container struct {
ID string `json:"id"`
Names []string `json:"names"`
Name string `json:"name"`
Image string `json:"image"`
ImageID string `json:"imageId"`
Command string `json:"command"`
Created int64 `json:"created"`
State string `json:"state"`
Status string `json:"status"`
Health string `json:"health,omitempty"`
Host string `json:"host,omitempty"`
Tty bool `json:"-"`
Labels map[string]string `json:"labels,omitempty"`
ID string `json:"id"`
Names []string `json:"names"`
Name string `json:"name"`
Image string `json:"image"`
ImageID string `json:"imageId"`
Command string `json:"command"`
Created int64 `json:"created"`
State string `json:"state"`
Status string `json:"status"`
Health string `json:"health,omitempty"`
Host string `json:"host,omitempty"`
Tty bool `json:"-"`
Labels map[string]string `json:"labels,omitempty"`
Stats *utils.RingBuffer[ContainerStat] `json:"stats,omitempty"`
}
// ContainerStat represent stats instant for a container

View File

@@ -0,0 +1,45 @@
package utils
import "encoding/json"
type RingBuffer[T any] struct {
Size int
data []T
start int
}
func NewRingBuffer[T any](size int) *RingBuffer[T] {
return &RingBuffer[T]{
Size: size,
data: make([]T, 0, size),
}
}
func (r *RingBuffer[T]) Push(data T) {
if len(r.data) == r.Size {
r.data[r.start] = data
r.start = (r.start + 1) % r.Size
} else {
r.data = append(r.data, data)
}
}
func (r *RingBuffer[T]) Data() []T {
if len(r.data) == r.Size {
return append(r.data[r.start:], r.data[:r.start]...)
} else {
return r.data
}
}
func (r *RingBuffer[T]) Len() int {
return len(r.data)
}
func (r *RingBuffer[T]) Full() bool {
return len(r.data) == r.Size
}
func (r *RingBuffer[T]) MarshalJSON() ([]byte, error) {
return json.Marshal(r.Data())
}

View File

@@ -0,0 +1,39 @@
package utils
import (
"reflect"
"testing"
)
func TestRingBuffer(t *testing.T) {
rb := NewRingBuffer[int](3)
if rb.Len() != 0 {
t.Errorf("Expected length to be 0, got %d", rb.Len())
}
rb.Push(1)
rb.Push(2)
rb.Push(3)
if rb.Len() != 3 {
t.Errorf("Expected length to be 3, got %d", rb.Len())
}
if !rb.Full() {
t.Errorf("Expected buffer to be full")
}
data := rb.Data()
expectedData := []int{1, 2, 3}
if !reflect.DeepEqual(data, expectedData) {
t.Errorf("Expected data to be %v, got %v", expectedData, data)
}
rb.Push(4)
data = rb.Data()
expectedData = []int{2, 3, 4}
if !reflect.DeepEqual(data, expectedData) {
t.Errorf("Expected data to be %v, got %v", expectedData, data)
}
}

View File

@@ -132,7 +132,7 @@ data: []
event: containers-changed
data: []
data: [{"id":"1234","names":null,"name":"test","image":"test","imageId":"","command":"","created":0,"state":"","status":"","stats":[]}]
event: container-start

View File

@@ -1,12 +1,9 @@
package web
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"github.com/amir20/dozzle/internal/analytics"
"github.com/amir20/dozzle/internal/docker"
@@ -29,9 +26,6 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
events := make(chan docker.ContainerEvent)
stats := make(chan docker.ContainerStat)
b := analytics.BeaconEvent{
Name: "events",
Version: h.config.Version,
@@ -44,48 +38,29 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
HasActions: h.config.EnableActions,
}
{
wg := sync.WaitGroup{}
wg.Add(len(h.clients))
results := make(chan []docker.Container, len(h.clients))
allContainers := make([]docker.Container, 0)
events := make(chan docker.ContainerEvent)
stats := make(chan docker.ContainerStat)
for _, client := range h.clients {
client.Events(ctx, events)
go func(client DockerClient) {
defer wg.Done()
if containers, err := client.ListContainers(); err == nil {
results <- containers
go func(client DockerClient) {
for _, c := range containers {
if c.State == "running" {
if err := client.ContainerStats(ctx, c.ID, stats); err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("error while streaming container stats: %v", err)
}
}
}
}(client)
} else {
log.Errorf("error while listing containers: %v", err)
}
}(client)
}
wg.Wait()
close(results)
allContainers := []docker.Container{}
for containers := range results {
allContainers = append(allContainers, containers...)
}
if err := sendContainersJSON(allContainers, w); err != nil {
log.Errorf("error writing containers to event stream: %v", err)
}
b.RunningContainers = len(allContainers)
f.Flush()
for _, store := range h.stores {
allContainers = append(allContainers, store.List()...)
store.SubscribeStats(stats)
store.Subscribe(events)
}
defer func() {
for _, store := range h.stores {
store.UnsubscribeStats(stats)
store.Unsubscribe(events)
}
}()
if err := sendContainersJSON(allContainers, w); err != nil {
log.Errorf("error writing containers to event stream: %v", err)
}
b.RunningContainers = len(allContainers)
f.Flush()
if !h.config.NoAnalytics {
go func() {
if err := analytics.SendBeacon(b); err != nil {
@@ -109,17 +84,9 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
}
switch event.Name {
case "start", "die":
log.Debugf("triggering docker event: %v", event.Name)
if event.Name == "start" {
log.Debugf("found new container with id: %v", event.ActorID)
if err := h.clients[event.Host].ContainerStats(ctx, event.ActorID, stats); err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("error when streaming new container stats: %v", err)
}
containers, err := h.clients[event.Host].ListContainers()
if err != nil {
log.Errorf("error when listing containers: %v", err)
}
containers := h.stores[event.Host].List()
if err := sendContainersJSON(containers, w); err != nil {
log.Errorf("error encoding containers to stream: %v", err)
return
@@ -150,11 +117,9 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
return
}
f.Flush()
default:
log.Tracef("ignoring docker event: %v", event.Name)
// do nothing
}
case <-ctx.Done():
log.Debugf("context done, closing event stream")
return
}
}

View File

@@ -2,12 +2,14 @@ package web
import (
"context"
"time"
"net/http"
"net/http/httptest"
"testing"
"github.com/amir20/dozzle/internal/docker"
"github.com/amir20/dozzle/internal/utils"
"github.com/beme/abide"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@@ -35,36 +37,26 @@ func Test_handler_streamEvents_happy(t *testing.T) {
ActorID: "1234",
Host: "localhost",
}
time.Sleep(100 * time.Millisecond)
cancel()
}()
})
mockedClient.On("FindContainer", "1234").Return(docker.Container{
ID: "1234",
Name: "test",
Image: "test",
Stats: utils.NewRingBuffer[docker.ContainerStat](300), // 300 seconds of stats
}, nil)
handler := createDefaultHandler(mockedClient)
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
abide.AssertHTTPResponse(t, t.Name(), rr.Result())
mockedClient.AssertExpectations(t)
}
func Test_handler_streamEvents_error_request(t *testing.T) {
req, err := http.NewRequest("GET", "/api/events/stream", nil)
require.NoError(t, err, "NewRequest should not return an error.")
mockedClient := new(MockedClient)
errChannel := make(chan error)
mockedClient.On("Events", mock.Anything, mock.Anything).Return(errChannel)
mockedClient.On("ListContainers").Return([]docker.Container{}, nil)
ctx, cancel := context.WithCancel(context.Background())
req = req.WithContext(ctx)
go func() {
cancel()
}()
handler := createDefaultHandler(mockedClient)
clients := map[string]docker.Client{
"localhost": mockedClient,
}
// This is needed so that the server is initialized for store
server := CreateServer(clients, nil, Config{Base: "/", Authorization: Authorization{Provider: NONE}})
handler := server.Handler
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
abide.AssertHTTPResponse(t, t.Name(), rr.Result())
mockedClient.AssertExpectations(t)

View File

@@ -4,12 +4,13 @@ import (
"fmt"
"net/http"
"github.com/amir20/dozzle/internal/docker"
log "github.com/sirupsen/logrus"
)
func (h *handler) healthcheck(w http.ResponseWriter, r *http.Request) {
log.Trace("Executing healthcheck request")
var client DockerClient
var client docker.Client
for _, v := range h.clients {
client = v
break

View File

@@ -1,17 +1,14 @@
package web
import (
"context"
"io"
"io/fs"
"time"
"net/http"
"strings"
"github.com/amir20/dozzle/internal/auth"
"github.com/amir20/dozzle/internal/docker"
"github.com/docker/docker/api/types"
"github.com/go-chi/chi/v5"
log "github.com/sirupsen/logrus"
)
@@ -47,30 +44,25 @@ type Authorizer interface {
}
type handler struct {
clients map[string]DockerClient
clients map[string]docker.Client
stores map[string]*docker.ContainerStore
content fs.FS
config *Config
}
// Client is a proxy around the docker client
type DockerClient interface {
ListContainers() ([]docker.Container, error)
FindContainer(string) (docker.Container, error)
ContainerLogs(context.Context, string, string, docker.StdType) (io.ReadCloser, error)
Events(context.Context, chan<- docker.ContainerEvent) <-chan error
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time, docker.StdType) (io.ReadCloser, error)
ContainerStats(context.Context, string, chan<- docker.ContainerStat) error
Ping(context.Context) (types.Ping, error)
Host() *docker.Host
ContainerActions(action string, containerID string) error
}
func CreateServer(clients map[string]docker.Client, content fs.FS, config Config) *http.Server {
stores := make(map[string]*docker.ContainerStore)
for host, client := range clients {
stores[host] = docker.NewContainerStore(client)
}
func CreateServer(clients map[string]DockerClient, content fs.FS, config Config) *http.Server {
handler := &handler{
clients: clients,
content: content,
config: &config,
stores: stores,
}
return &http.Server{Addr: config.Addr, Handler: createRouter(handler)}
}
@@ -135,7 +127,7 @@ func createRouter(h *handler) *chi.Mux {
return r
}
func (h *handler) clientFromRequest(r *http.Request) DockerClient {
func (h *handler) clientFromRequest(r *http.Request) docker.Client {
host := chi.URLParam(r, "host")
if host == "" {

View File

@@ -17,7 +17,7 @@ import (
type MockedClient struct {
mock.Mock
DockerClient
docker.Client
}
func (m *MockedClient) FindContainer(id string) (docker.Container, error) {
@@ -59,7 +59,7 @@ func (m *MockedClient) Host() *docker.Host {
return args.Get(0).(*docker.Host)
}
func createHandler(client DockerClient, content fs.FS, config Config) *chi.Mux {
func createHandler(client docker.Client, content fs.FS, config Config) *chi.Mux {
if client == nil {
client = new(MockedClient)
client.(*MockedClient).On("ListContainers").Return([]docker.Container{}, nil)
@@ -74,7 +74,7 @@ func createHandler(client DockerClient, content fs.FS, config Config) *chi.Mux {
content = afero.NewIOFS(fs)
}
clients := map[string]DockerClient{
clients := map[string]docker.Client{
"localhost": client,
}
return createRouter(&handler{
@@ -84,6 +84,6 @@ func createHandler(client DockerClient, content fs.FS, config Config) *chi.Mux {
})
}
func createDefaultHandler(client DockerClient) *chi.Mux {
func createDefaultHandler(client docker.Client) *chi.Mux {
return createHandler(client, nil, Config{Base: "/", Authorization: Authorization{Provider: NONE}})
}

19
main.go
View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"embed"
"errors"
"io/fs"
"net/http"
"os"
@@ -120,12 +121,12 @@ func doStartEvent(arg args) {
}
func createClients(args args,
localClientFactory func(map[string][]string) (*docker.Client, error),
remoteClientFactory func(map[string][]string, docker.Host) (*docker.Client, error),
hostname string) map[string]web.DockerClient {
clients := make(map[string]web.DockerClient)
localClientFactory func(map[string][]string) (docker.Client, error),
remoteClientFactory func(map[string][]string, docker.Host) (docker.Client, error),
hostname string) map[string]docker.Client {
clients := make(map[string]docker.Client)
if localClient := createLocalClient(args, localClientFactory); localClient != nil {
if localClient, err := createLocalClient(args, localClientFactory); err == nil {
if hostname != "" {
localClient.Host().Name = hostname
}
@@ -154,7 +155,7 @@ func createClients(args args,
return clients
}
func createServer(args args, clients map[string]web.DockerClient) *http.Server {
func createServer(args args, clients map[string]docker.Client) *http.Server {
_, dev := os.LookupEnv("DEV")
var provider web.AuthProvider = web.NONE
@@ -221,7 +222,7 @@ func createServer(args args, clients map[string]web.DockerClient) *http.Server {
return web.CreateServer(clients, assets, config)
}
func createLocalClient(args args, localClientFactory func(map[string][]string) (*docker.Client, error)) *docker.Client {
func createLocalClient(args args, localClientFactory func(map[string][]string) (docker.Client, error)) (docker.Client, error) {
for i := 1; ; i++ {
dockerClient, err := localClientFactory(args.Filter)
if err == nil {
@@ -230,7 +231,7 @@ func createLocalClient(args args, localClientFactory func(map[string][]string) (
log.Debugf("Could not connect to local Docker Engine: %s", err)
} else {
log.Debugf("Connected to local Docker Engine")
return dockerClient
return dockerClient, nil
}
}
if args.WaitForDockerSeconds > 0 {
@@ -242,7 +243,7 @@ func createLocalClient(args args, localClientFactory func(map[string][]string) (
break
}
}
return nil
return nil, errors.New("could not connect to local Docker Engine")
}
func parseArgs() args {

View File

@@ -26,7 +26,7 @@ func (f *fakeCLI) ContainerList(context.Context, container.ListOptions) ([]types
func Test_valid_localhost(t *testing.T) {
client := new(fakeCLI)
client.On("ContainerList").Return([]types.Container{}, nil)
fakeClientFactory := func(filter map[string][]string) (*docker.Client, error) {
fakeClientFactory := func(filter map[string][]string) (docker.Client, error) {
return docker.NewClient(client, filters.NewArgs(), &docker.Host{
ID: "localhost",
}), nil
@@ -34,7 +34,7 @@ func Test_valid_localhost(t *testing.T) {
args := args{}
actualClient := createLocalClient(args, fakeClientFactory)
actualClient, _ := createLocalClient(args, fakeClientFactory)
assert.NotNil(t, actualClient)
client.AssertExpectations(t)
@@ -43,7 +43,7 @@ func Test_valid_localhost(t *testing.T) {
func Test_invalid_localhost(t *testing.T) {
client := new(fakeCLI)
client.On("ContainerList").Return([]types.Container{}, errors.New("error"))
fakeClientFactory := func(filter map[string][]string) (*docker.Client, error) {
fakeClientFactory := func(filter map[string][]string) (docker.Client, error) {
return docker.NewClient(client, filters.NewArgs(), &docker.Host{
ID: "localhost",
}), nil
@@ -51,7 +51,7 @@ func Test_invalid_localhost(t *testing.T) {
args := args{}
actualClient := createLocalClient(args, fakeClientFactory)
actualClient, _ := createLocalClient(args, fakeClientFactory)
assert.Nil(t, actualClient)
client.AssertExpectations(t)
@@ -60,7 +60,7 @@ func Test_invalid_localhost(t *testing.T) {
func Test_valid_remote(t *testing.T) {
local := new(fakeCLI)
local.On("ContainerList").Return([]types.Container{}, errors.New("error"))
fakeLocalClientFactory := func(filter map[string][]string) (*docker.Client, error) {
fakeLocalClientFactory := func(filter map[string][]string) (docker.Client, error) {
return docker.NewClient(local, filters.NewArgs(), &docker.Host{
ID: "localhost",
}), nil
@@ -68,7 +68,7 @@ func Test_valid_remote(t *testing.T) {
remote := new(fakeCLI)
remote.On("ContainerList").Return([]types.Container{}, nil)
fakeRemoteClientFactory := func(filter map[string][]string, host docker.Host) (*docker.Client, error) {
fakeRemoteClientFactory := func(filter map[string][]string, host docker.Host) (docker.Client, error) {
return docker.NewClient(remote, filters.NewArgs(), &docker.Host{
ID: "test",
}), nil
@@ -90,7 +90,7 @@ func Test_valid_remote(t *testing.T) {
func Test_valid_remote_and_local(t *testing.T) {
local := new(fakeCLI)
local.On("ContainerList").Return([]types.Container{}, nil)
fakeLocalClientFactory := func(filter map[string][]string) (*docker.Client, error) {
fakeLocalClientFactory := func(filter map[string][]string) (docker.Client, error) {
return docker.NewClient(local, filters.NewArgs(), &docker.Host{
ID: "localhost",
}), nil
@@ -98,7 +98,7 @@ func Test_valid_remote_and_local(t *testing.T) {
remote := new(fakeCLI)
remote.On("ContainerList").Return([]types.Container{}, nil)
fakeRemoteClientFactory := func(filter map[string][]string, host docker.Host) (*docker.Client, error) {
fakeRemoteClientFactory := func(filter map[string][]string, host docker.Host) (docker.Client, error) {
return docker.NewClient(remote, filters.NewArgs(), &docker.Host{
ID: "test",
}), nil
@@ -119,13 +119,13 @@ func Test_valid_remote_and_local(t *testing.T) {
func Test_no_clients(t *testing.T) {
local := new(fakeCLI)
local.On("ContainerList").Return([]types.Container{}, errors.New("error"))
fakeLocalClientFactory := func(filter map[string][]string) (*docker.Client, error) {
fakeLocalClientFactory := func(filter map[string][]string) (docker.Client, error) {
return docker.NewClient(local, filters.NewArgs(), &docker.Host{
ID: "localhost",
}), nil
}
fakeRemoteClientFactory := func(filter map[string][]string, host docker.Host) (*docker.Client, error) {
fakeRemoteClientFactory := func(filter map[string][]string, host docker.Host) (docker.Client, error) {
client := new(fakeCLI)
return docker.NewClient(client, filters.NewArgs(), &docker.Host{
ID: "test",