1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-24 06:28:42 +01:00

fix: fixes deadlocks in store (#3617)

This commit is contained in:
Amir Raminfar
2025-02-11 12:32:56 -08:00
committed by GitHub
parent 571de4084e
commit 8c9c9a83af
6 changed files with 109 additions and 83 deletions

View File

@@ -31,7 +31,8 @@
</ul>
<ul class="flex flex-row flex-wrap gap-x-2 text-sm md:gap-3">
<li class="flex items-center gap-1">
<octicon:container-24 class="inline-block" /> {{ $t("label.container", hostContainers[host.id]?.length) }}
<octicon:container-24 class="inline-block" />
{{ $t("label.container", hostContainers[host.id]?.length ?? 0) }}
</li>
<li class="flex items-center gap-1"><mdi:docker class="inline-block" /> {{ host.dockerVersion }}</li>
</ul>
@@ -107,7 +108,10 @@ useIntervalFn(
stat.totalCPU += container.stat.cpu;
stat.totalMem += container.stat.memoryUsage;
}
weightedStats[host].mostRecent = stat;
if (weightedStats[host]) {
// TODO fix this init
weightedStats[host].mostRecent = stat;
}
}
},
1000,

View File

@@ -176,23 +176,30 @@ func (s *ContainerStore) FindContainer(id string, labels ContainerLabels) (Conta
defer cancel()
if newContainer, err := s.client.FindContainer(ctx, id); err == nil {
return &newContainer, false
} else {
log.Error().Err(err).Msg("failed to fetch container")
return c, false
}
}
return c, false
}); ok {
event := ContainerEvent{
Name: "update",
Host: s.client.Host().ID,
ActorID: id,
}
s.subscribers.Range(func(c context.Context, events chan<- ContainerEvent) bool {
select {
case events <- event:
case <-c.Done():
s.subscribers.Delete(c)
go func() {
event := ContainerEvent{
Name: "update",
Host: newContainer.Host,
ActorID: id,
Container: newContainer,
}
return true
})
s.subscribers.Range(func(c context.Context, events chan<- ContainerEvent) bool {
select {
case events <- event:
case <-c.Done():
s.subscribers.Delete(c)
}
return true
})
}()
return *newContainer, nil
}
}
@@ -248,7 +255,7 @@ func (s *ContainerStore) init() {
for {
select {
case event := <-s.events:
log.Trace().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
log.Debug().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
switch event.Name {
case "create":
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -262,7 +269,6 @@ func (s *ContainerStore) init() {
})
if valid {
log.Debug().Str("id", container.ID).Msg("container started")
s.containers.Store(container.ID, &container)
s.newContainerSubscribers.Range(func(c context.Context, containers chan<- Container) bool {
select {
@@ -287,7 +293,6 @@ func (s *ContainerStore) init() {
})
if valid {
log.Debug().Str("id", container.ID).Msg("container started")
s.containers.Store(container.ID, &container)
s.newContainerSubscribers.Range(func(c context.Context, containers chan<- Container) bool {
select {
@@ -304,42 +309,41 @@ func (s *ContainerStore) init() {
s.containers.Delete(event.ActorID)
case "update":
s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
started := false
updatedContainer, _ := s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
if loaded {
log.Debug().Str("id", c.ID).Msg("container updated")
started := false
if newContainer, err := s.client.FindContainer(context.Background(), c.ID); err == nil {
if newContainer.State == "running" && c.State != "running" {
started = true
}
c.Name = newContainer.Name
c.State = newContainer.State
c.Labels = newContainer.Labels
c.StartedAt = newContainer.StartedAt
c.FinishedAt = newContainer.FinishedAt
c.Created = newContainer.Created
} else {
log.Error().Err(err).Str("id", c.ID).Msg("failed to update container")
}
if started {
s.subscribers.Range(func(ctx context.Context, events chan<- ContainerEvent) bool {
select {
case events <- ContainerEvent{
Name: "start",
ActorID: c.ID,
}:
case <-ctx.Done():
s.subscribers.Delete(ctx)
}
return true
})
newContainer := event.Container
if newContainer.State == "running" && c.State != "running" {
started = true
}
c.Name = newContainer.Name
c.State = newContainer.State
c.Labels = newContainer.Labels
c.StartedAt = newContainer.StartedAt
c.FinishedAt = newContainer.FinishedAt
c.Created = newContainer.Created
c.Host = newContainer.Host
return c, false
} else {
return c, true
}
})
if started {
s.subscribers.Range(func(ctx context.Context, events chan<- ContainerEvent) bool {
select {
case events <- ContainerEvent{
Name: "start",
ActorID: updatedContainer.ID,
Host: updatedContainer.Host,
}:
case <-ctx.Done():
s.subscribers.Delete(ctx)
}
return true
})
}
case "die":
s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) {
if loaded {

View File

@@ -42,6 +42,7 @@ type ContainerEvent struct {
ActorID string `json:"actorId"`
ActorAttributes map[string]string `json:"actorAttributes,omitempty"`
Time time.Time `json:"time"`
Container *Container `json:"-"`
}
type ContainerLabels map[string][]string

View File

@@ -25,6 +25,7 @@ type K8sClient struct {
Clientset *kubernetes.Clientset
namespace string
config *rest.Config
host container.Host
}
func NewK8sClient(namespace string) (*K8sClient, error) {
@@ -56,12 +57,49 @@ func NewK8sClient(namespace string) (*K8sClient, error) {
return nil, err
}
nodes, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return nil, err
}
if len(nodes.Items) == 0 {
return nil, fmt.Errorf("nodes not found")
}
node := nodes.Items[0]
return &K8sClient{
Clientset: clientset,
namespace: namespace,
config: config,
host: container.Host{
ID: node.Status.NodeInfo.MachineID,
Name: node.Name,
},
}, nil
}
func podToContainers(pod *corev1.Pod) []container.Container {
started := time.Time{}
if pod.Status.StartTime != nil {
started = pod.Status.StartTime.Time
}
var containers []container.Container
for _, c := range pod.Spec.Containers {
containers = append(containers, container.Container{
ID: pod.Name + ":" + c.Name,
Name: pod.Name + "/" + c.Name,
Image: c.Image,
Created: pod.CreationTimestamp.Time,
State: phaseToState(pod.Status.Phase),
StartedAt: started,
Command: strings.Join(c.Command, " "),
Host: pod.Spec.NodeName,
Tty: c.TTY,
Stats: utils.NewRingBuffer[container.ContainerStat](300),
})
}
return containers
}
func (k *K8sClient) ListContainers(ctx context.Context, labels container.ContainerLabels) ([]container.Container, error) {
pods, err := k.Clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
@@ -77,7 +115,6 @@ func (k *K8sClient) ListContainers(ctx context.Context, labels container.Contain
Image: c.Image,
Created: pod.CreationTimestamp.Time,
State: phaseToState(pod.Status.Phase),
Tty: c.TTY,
Host: pod.Spec.NodeName,
})
}
@@ -111,24 +148,9 @@ func (k *K8sClient) FindContainer(ctx context.Context, id string) (container.Con
return container.Container{}, err
}
for _, c := range pod.Spec.Containers {
if c.Name == containerName {
started := time.Time{}
if pod.Status.StartTime != nil {
started = pod.Status.StartTime.Time
}
return container.Container{
ID: pod.Name + ":" + c.Name,
Name: pod.Name + "/" + c.Name,
Image: c.Image,
Created: pod.CreationTimestamp.Time,
State: phaseToState(pod.Status.Phase),
StartedAt: started,
Command: strings.Join(c.Command, " "),
Host: pod.Spec.NodeName,
Tty: c.TTY,
Stats: utils.NewRingBuffer[container.ContainerStat](300),
}, nil
for _, c := range podToContainers(pod) {
if c.ID == id {
return c, nil
}
}
@@ -183,14 +205,13 @@ func (k *K8sClient) ContainerEvents(ctx context.Context, ch chan<- container.Con
name = "update"
}
log.Debug().Interface("event.Type", event.Type).Str("name", name).Interface("StartTime", pod.Status.StartTime).Msg("Sending container event")
for _, c := range pod.Spec.Containers {
for _, c := range podToContainers(pod) {
ch <- container.ContainerEvent{
Name: name,
ActorID: pod.Name + ":" + c.Name,
Host: pod.Spec.NodeName,
Time: time.Now(),
Name: name,
ActorID: c.ID,
Host: pod.Spec.NodeName,
Time: time.Now(),
Container: &c,
}
}
}
@@ -208,7 +229,7 @@ func (k *K8sClient) Ping(ctx context.Context) error {
}
func (k *K8sClient) Host() container.Host {
return container.Host{}
return k.host
}
func (k *K8sClient) ContainerActions(ctx context.Context, action container.ContainerAction, containerID string) error {

View File

@@ -122,7 +122,7 @@ func (m *K8sClusterService) Hosts() []container.Host {
}
func (m *K8sClusterService) LocalHost() (container.Host, error) {
return container.Host{}, nil
return m.client.client.Host(), nil
}
func (m *K8sClusterService) SubscribeAvailableHosts(ctx context.Context, hosts chan<- container.Host) {

View File

@@ -68,12 +68,12 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
if !ok {
return
}
log.Debug().Str("event", event.Name).Str("id", event.ActorID).Msg("container event from store")
switch event.Name {
case "start", "die", "destroy", "rename":
if event.Name == "start" || event.Name == "rename" {
log.Debug().Str("action", event.Name).Str("id", event.ActorID).Msg("container event")
if containers, err := h.hostService.ListContainersForHost(event.Host, userLabels); err == nil {
log.Debug().Str("host", event.Host).Int("count", len(containers)).Msg("updating containers for host")
if err := sseWriter.Event("containers-changed", containers); err != nil {
log.Error().Err(err).Msg("error writing containers to event stream")
return
@@ -87,15 +87,11 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
}
case "update":
log.Debug().Str("id", event.ActorID).Msg("container updated")
if containerService, err := h.hostService.FindContainer(event.Host, event.ActorID, userLabels); err == nil {
if err := sseWriter.Event("container-updated", containerService.Container); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
return
}
if err := sseWriter.Event("container-updated", event.Container); err != nil {
log.Error().Err(err).Msg("error writing event to event stream")
return
}
case "health_status: healthy", "health_status: unhealthy":
log.Debug().Str("container", event.ActorID).Str("health", event.Name).Msg("container health status")
healthy := "unhealthy"
if event.Name == "health_status: healthy" {
healthy = "healthy"
@@ -135,7 +131,7 @@ func sendBeaconEvent(h *handler, r *http.Request, runningContainers int) {
local, err := h.hostService.LocalHost()
if err == nil {
b.ServerID = local.ID // TODO : fix this for k8s
b.ServerID = local.ID
}
if err := analytics.SendBeacon(b); err != nil {