diff --git a/assets/components/HostList.vue b/assets/components/HostList.vue index a5f3f93a..e7737897 100644 --- a/assets/components/HostList.vue +++ b/assets/components/HostList.vue @@ -31,7 +31,8 @@ @@ -107,7 +108,10 @@ useIntervalFn( stat.totalCPU += container.stat.cpu; stat.totalMem += container.stat.memoryUsage; } - weightedStats[host].mostRecent = stat; + if (weightedStats[host]) { + // TODO fix this init + weightedStats[host].mostRecent = stat; + } } }, 1000, diff --git a/internal/container/container_store.go b/internal/container/container_store.go index 260e90f1..ac0a4e4b 100644 --- a/internal/container/container_store.go +++ b/internal/container/container_store.go @@ -176,23 +176,30 @@ func (s *ContainerStore) FindContainer(id string, labels ContainerLabels) (Conta defer cancel() if newContainer, err := s.client.FindContainer(ctx, id); err == nil { return &newContainer, false + } else { + log.Error().Err(err).Msg("failed to fetch container") + return c, false } } return c, false }); ok { - event := ContainerEvent{ - Name: "update", - Host: s.client.Host().ID, - ActorID: id, - } - s.subscribers.Range(func(c context.Context, events chan<- ContainerEvent) bool { - select { - case events <- event: - case <-c.Done(): - s.subscribers.Delete(c) + go func() { + event := ContainerEvent{ + Name: "update", + Host: newContainer.Host, + ActorID: id, + Container: newContainer, } - return true - }) + + s.subscribers.Range(func(c context.Context, events chan<- ContainerEvent) bool { + select { + case events <- event: + case <-c.Done(): + s.subscribers.Delete(c) + } + return true + }) + }() return *newContainer, nil } } @@ -248,7 +255,7 @@ func (s *ContainerStore) init() { for { select { case event := <-s.events: - log.Trace().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event") + log.Debug().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event") switch event.Name { case "create": ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -262,7 +269,6 @@ func (s *ContainerStore) init() { }) if valid { - log.Debug().Str("id", container.ID).Msg("container started") s.containers.Store(container.ID, &container) s.newContainerSubscribers.Range(func(c context.Context, containers chan<- Container) bool { select { @@ -287,7 +293,6 @@ func (s *ContainerStore) init() { }) if valid { - log.Debug().Str("id", container.ID).Msg("container started") s.containers.Store(container.ID, &container) s.newContainerSubscribers.Range(func(c context.Context, containers chan<- Container) bool { select { @@ -304,42 +309,41 @@ func (s *ContainerStore) init() { s.containers.Delete(event.ActorID) case "update": - s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) { + started := false + updatedContainer, _ := s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) { if loaded { - log.Debug().Str("id", c.ID).Msg("container updated") - started := false - if newContainer, err := s.client.FindContainer(context.Background(), c.ID); err == nil { - if newContainer.State == "running" && c.State != "running" { - started = true - } - c.Name = newContainer.Name - c.State = newContainer.State - c.Labels = newContainer.Labels - c.StartedAt = newContainer.StartedAt - c.FinishedAt = newContainer.FinishedAt - c.Created = newContainer.Created - } else { - log.Error().Err(err).Str("id", c.ID).Msg("failed to update container") - } - if started { - s.subscribers.Range(func(ctx context.Context, events chan<- ContainerEvent) bool { - select { - case events <- ContainerEvent{ - Name: "start", - ActorID: c.ID, - }: - case <-ctx.Done(): - s.subscribers.Delete(ctx) - } - return true - }) + newContainer := event.Container + if newContainer.State == "running" && c.State != "running" { + started = true } + c.Name = newContainer.Name + c.State = newContainer.State + c.Labels = newContainer.Labels + c.StartedAt = newContainer.StartedAt + c.FinishedAt = newContainer.FinishedAt + c.Created = newContainer.Created + c.Host = newContainer.Host return c, false } else { return c, true } }) + if started { + s.subscribers.Range(func(ctx context.Context, events chan<- ContainerEvent) bool { + select { + case events <- ContainerEvent{ + Name: "start", + ActorID: updatedContainer.ID, + Host: updatedContainer.Host, + }: + case <-ctx.Done(): + s.subscribers.Delete(ctx) + } + return true + }) + } + case "die": s.containers.Compute(event.ActorID, func(c *Container, loaded bool) (*Container, bool) { if loaded { diff --git a/internal/container/types.go b/internal/container/types.go index 3a599450..f03fe390 100644 --- a/internal/container/types.go +++ b/internal/container/types.go @@ -42,6 +42,7 @@ type ContainerEvent struct { ActorID string `json:"actorId"` ActorAttributes map[string]string `json:"actorAttributes,omitempty"` Time time.Time `json:"time"` + Container *Container `json:"-"` } type ContainerLabels map[string][]string diff --git a/internal/k8s/client.go b/internal/k8s/client.go index 18cf8350..c47e5ba5 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -25,6 +25,7 @@ type K8sClient struct { Clientset *kubernetes.Clientset namespace string config *rest.Config + host container.Host } func NewK8sClient(namespace string) (*K8sClient, error) { @@ -56,12 +57,49 @@ func NewK8sClient(namespace string) (*K8sClient, error) { return nil, err } + nodes, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + if len(nodes.Items) == 0 { + return nil, fmt.Errorf("nodes not found") + } + node := nodes.Items[0] + return &K8sClient{ Clientset: clientset, namespace: namespace, config: config, + host: container.Host{ + ID: node.Status.NodeInfo.MachineID, + Name: node.Name, + }, }, nil } + +func podToContainers(pod *corev1.Pod) []container.Container { + started := time.Time{} + if pod.Status.StartTime != nil { + started = pod.Status.StartTime.Time + } + var containers []container.Container + for _, c := range pod.Spec.Containers { + containers = append(containers, container.Container{ + ID: pod.Name + ":" + c.Name, + Name: pod.Name + "/" + c.Name, + Image: c.Image, + Created: pod.CreationTimestamp.Time, + State: phaseToState(pod.Status.Phase), + StartedAt: started, + Command: strings.Join(c.Command, " "), + Host: pod.Spec.NodeName, + Tty: c.TTY, + Stats: utils.NewRingBuffer[container.ContainerStat](300), + }) + } + return containers +} + func (k *K8sClient) ListContainers(ctx context.Context, labels container.ContainerLabels) ([]container.Container, error) { pods, err := k.Clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{}) if err != nil { @@ -77,7 +115,6 @@ func (k *K8sClient) ListContainers(ctx context.Context, labels container.Contain Image: c.Image, Created: pod.CreationTimestamp.Time, State: phaseToState(pod.Status.Phase), - Tty: c.TTY, Host: pod.Spec.NodeName, }) } @@ -111,24 +148,9 @@ func (k *K8sClient) FindContainer(ctx context.Context, id string) (container.Con return container.Container{}, err } - for _, c := range pod.Spec.Containers { - if c.Name == containerName { - started := time.Time{} - if pod.Status.StartTime != nil { - started = pod.Status.StartTime.Time - } - return container.Container{ - ID: pod.Name + ":" + c.Name, - Name: pod.Name + "/" + c.Name, - Image: c.Image, - Created: pod.CreationTimestamp.Time, - State: phaseToState(pod.Status.Phase), - StartedAt: started, - Command: strings.Join(c.Command, " "), - Host: pod.Spec.NodeName, - Tty: c.TTY, - Stats: utils.NewRingBuffer[container.ContainerStat](300), - }, nil + for _, c := range podToContainers(pod) { + if c.ID == id { + return c, nil } } @@ -183,14 +205,13 @@ func (k *K8sClient) ContainerEvents(ctx context.Context, ch chan<- container.Con name = "update" } - log.Debug().Interface("event.Type", event.Type).Str("name", name).Interface("StartTime", pod.Status.StartTime).Msg("Sending container event") - - for _, c := range pod.Spec.Containers { + for _, c := range podToContainers(pod) { ch <- container.ContainerEvent{ - Name: name, - ActorID: pod.Name + ":" + c.Name, - Host: pod.Spec.NodeName, - Time: time.Now(), + Name: name, + ActorID: c.ID, + Host: pod.Spec.NodeName, + Time: time.Now(), + Container: &c, } } } @@ -208,7 +229,7 @@ func (k *K8sClient) Ping(ctx context.Context) error { } func (k *K8sClient) Host() container.Host { - return container.Host{} + return k.host } func (k *K8sClient) ContainerActions(ctx context.Context, action container.ContainerAction, containerID string) error { diff --git a/internal/support/k8s/k8s_cluster_service.go b/internal/support/k8s/k8s_cluster_service.go index a2c33671..ee5aa2f1 100644 --- a/internal/support/k8s/k8s_cluster_service.go +++ b/internal/support/k8s/k8s_cluster_service.go @@ -122,7 +122,7 @@ func (m *K8sClusterService) Hosts() []container.Host { } func (m *K8sClusterService) LocalHost() (container.Host, error) { - return container.Host{}, nil + return m.client.client.Host(), nil } func (m *K8sClusterService) SubscribeAvailableHosts(ctx context.Context, hosts chan<- container.Host) { diff --git a/internal/web/events.go b/internal/web/events.go index a6ed1979..6f0754a6 100644 --- a/internal/web/events.go +++ b/internal/web/events.go @@ -68,12 +68,12 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { if !ok { return } + log.Debug().Str("event", event.Name).Str("id", event.ActorID).Msg("container event from store") switch event.Name { case "start", "die", "destroy", "rename": if event.Name == "start" || event.Name == "rename" { - log.Debug().Str("action", event.Name).Str("id", event.ActorID).Msg("container event") - if containers, err := h.hostService.ListContainersForHost(event.Host, userLabels); err == nil { + log.Debug().Str("host", event.Host).Int("count", len(containers)).Msg("updating containers for host") if err := sseWriter.Event("containers-changed", containers); err != nil { log.Error().Err(err).Msg("error writing containers to event stream") return @@ -87,15 +87,11 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) { } case "update": - log.Debug().Str("id", event.ActorID).Msg("container updated") - if containerService, err := h.hostService.FindContainer(event.Host, event.ActorID, userLabels); err == nil { - if err := sseWriter.Event("container-updated", containerService.Container); err != nil { - log.Error().Err(err).Msg("error writing event to event stream") - return - } + if err := sseWriter.Event("container-updated", event.Container); err != nil { + log.Error().Err(err).Msg("error writing event to event stream") + return } case "health_status: healthy", "health_status: unhealthy": - log.Debug().Str("container", event.ActorID).Str("health", event.Name).Msg("container health status") healthy := "unhealthy" if event.Name == "health_status: healthy" { healthy = "healthy" @@ -135,7 +131,7 @@ func sendBeaconEvent(h *handler, r *http.Request, runningContainers int) { local, err := h.hostService.LocalHost() if err == nil { - b.ServerID = local.ID // TODO : fix this for k8s + b.ServerID = local.ID } if err := analytics.SendBeacon(b); err != nil {