diff --git a/internal/docker/container_store.go b/internal/docker/container_store.go index 5005c0e4..b5935ce5 100644 --- a/internal/docker/container_store.go +++ b/internal/docker/container_store.go @@ -28,7 +28,6 @@ func NewContainerStore(ctx context.Context, client Client) *ContainerStore { s.wg.Add(1) go s.init(ctx) - go s.statsCollector.StartCollecting(ctx) return s } @@ -49,9 +48,23 @@ func (s *ContainerStore) Client() Client { } func (s *ContainerStore) Subscribe(ctx context.Context, events chan ContainerEvent) { + go func() { + if s.statsCollector.Start(context.Background()) { + log.Debug("clearing container stats as stats collector has been stopped") + s.containers.Range(func(_ string, c *Container) bool { + c.Stats.Clear() + return true + }) + } + }() s.subscribers.Store(ctx, events) } +func (s *ContainerStore) Unsubscribe(ctx context.Context) { + s.subscribers.Delete(ctx) + s.statsCollector.Stop() +} + func (s *ContainerStore) SubscribeStats(ctx context.Context, stats chan ContainerStat) { s.statsCollector.Subscribe(ctx, stats) } diff --git a/internal/docker/stats_collector.go b/internal/docker/stats_collector.go index 1ae0a004..f1cf7847 100644 --- a/internal/docker/stats_collector.go +++ b/internal/docker/stats_collector.go @@ -4,18 +4,27 @@ import ( "context" "errors" "io" + "sync" + "sync/atomic" + "time" "github.com/puzpuzpuz/xsync/v3" log "github.com/sirupsen/logrus" ) type StatsCollector struct { - stream chan ContainerStat - subscribers *xsync.MapOf[context.Context, chan ContainerStat] - client Client - cancelers *xsync.MapOf[string, context.CancelFunc] + stream chan ContainerStat + subscribers *xsync.MapOf[context.Context, chan ContainerStat] + client Client + cancelers *xsync.MapOf[string, context.CancelFunc] + stopper context.CancelFunc + timer *time.Timer + mu sync.Mutex + totalStarted atomic.Int32 } +var timeToStop = 6 * time.Hour + func NewStatsCollector(client Client) *StatsCollector { return &StatsCollector{ stream: make(chan ContainerStat), @@ -29,7 +38,46 @@ func (c *StatsCollector) Subscribe(ctx context.Context, stats chan ContainerStat c.subscribers.Store(ctx, stats) } -func (sc *StatsCollector) StartCollecting(ctx context.Context) { +func (c *StatsCollector) forceStop() { + c.mu.Lock() + defer c.mu.Unlock() + if c.stopper != nil { + c.stopper() + c.stopper = nil + log.Debug("stopping container stats collector due to inactivity") + } +} + +func (c *StatsCollector) Stop() { + c.mu.Lock() + defer c.mu.Unlock() + if c.totalStarted.Add(-1) == 0 { + log.Debug("scheduled to stop container stats collector") + c.timer = time.AfterFunc(timeToStop, func() { + c.forceStop() + }) + } +} + +func (c *StatsCollector) reset() { + c.mu.Lock() + defer c.mu.Unlock() + if c.timer != nil { + c.timer.Stop() + } + c.timer = nil +} + +// Start starts the stats collector and blocks until it's stopped. It returns true if the collector was stopped, false if it was already running +func (sc *StatsCollector) Start(ctx context.Context) bool { + sc.reset() + if sc.totalStarted.Add(1) > 1 { + return false + } + sc.mu.Lock() + ctx, sc.stopper = context.WithCancel(ctx) + sc.mu.Unlock() + if containers, err := sc.client.ListContainers(); err == nil { for _, c := range containers { if c.State == "running" { @@ -75,7 +123,8 @@ func (sc *StatsCollector) StartCollecting(ctx context.Context) { for { select { case <-ctx.Done(): - return + log.Info("stopped collecting container stats") + return true case stat := <-sc.stream: sc.subscribers.Range(func(c context.Context, stats chan ContainerStat) bool { select { diff --git a/internal/utils/ring_buffer.go b/internal/utils/ring_buffer.go index d860fa17..fe1981f4 100644 --- a/internal/utils/ring_buffer.go +++ b/internal/utils/ring_buffer.go @@ -31,6 +31,13 @@ func (r *RingBuffer[T]) Push(data T) { } } +func (r *RingBuffer[T]) Clear() { + r.mutex.Lock() + defer r.mutex.Unlock() + r.data = r.data[:0] + r.start = 0 +} + func (r *RingBuffer[T]) Data() []T { r.mutex.RLock() defer r.mutex.RUnlock() diff --git a/internal/utils/ring_buffer_test.go b/internal/utils/ring_buffer_test.go index 6141892c..cc10b691 100644 --- a/internal/utils/ring_buffer_test.go +++ b/internal/utils/ring_buffer_test.go @@ -25,3 +25,36 @@ func TestRingBuffer(t *testing.T) { t.Errorf("Expected data to be %v, got %v", expectedData, data) } } + +func TestRingBuffer_MarshalJSON(t *testing.T) { + rb := NewRingBuffer[int](3) + + rb.Push(1) + rb.Push(2) + rb.Push(3) + + data, err := rb.MarshalJSON() + if err != nil { + t.Errorf("Expected error to be nil, got %v", err) + } + + expectedData := []byte("[1,2,3]") + if !reflect.DeepEqual(data, expectedData) { + t.Errorf("Expected data to be %v, got %v", expectedData, data) + } +} + +func TestRingBuffer_Clear(t *testing.T) { + rb := NewRingBuffer[int](3) + + rb.Push(1) + rb.Push(2) + rb.Push(3) + + rb.Clear() + data := rb.Data() + expectedData := []int{} + if !reflect.DeepEqual(data, expectedData) { + t.Errorf("Expected data to be %v, got %v", expectedData, data) + } +} diff --git a/internal/web/events.go b/internal/web/events.go index 73be2a08..efa8e222 100644 --- a/internal/web/events.go +++ b/internal/web/events.go @@ -49,6 +49,12 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { store.Subscribe(ctx, events) } + defer func() { + for _, store := range h.stores { + store.Unsubscribe(ctx) + } + }() + if err := sendContainersJSON(allContainers, w); err != nil { log.Errorf("error writing containers to event stream: %v", err) }