From 21fe5b84f29f9f67b9261a2717a1bd5b9105dbba Mon Sep 17 00:00:00 2001 From: Amir Raminfar Date: Sat, 15 Feb 2025 09:35:32 -0800 Subject: [PATCH] feat: supports namespaces properly in k8s mode (#3631) --- internal/container/container_store.go | 6 ++-- internal/container/types.go | 29 ++++++++-------- internal/docker/client.go | 21 +++++------ internal/k8s/client.go | 50 +++++++++++---------------- internal/k8s/stats_collector.go | 2 +- internal/support/cli/args.go | 1 + main.go | 2 +- 7 files changed, 53 insertions(+), 58 deletions(-) diff --git a/internal/container/container_store.go b/internal/container/container_store.go index ac0a4e4b..13895ff0 100644 --- a/internal/container/container_store.go +++ b/internal/container/container_store.go @@ -84,7 +84,7 @@ func (s *ContainerStore) checkConnectivity() error { } running := lo.Filter(containers, func(item Container, index int) bool { - return item.State != "exited" + return item.State != "exited" && !item.FullyLoaded }) sem := semaphore.NewWeighted(maxFetchParallelism) @@ -168,8 +168,8 @@ func (s *ContainerStore) FindContainer(id string, labels ContainerLabels) (Conta } if container, ok := s.containers.Load(id); ok { - if container.StartedAt.IsZero() { - log.Debug().Str("id", id).Msg("container doesn't have detailed information, fetching it") + if !container.FullyLoaded { + log.Debug().Str("id", id).Msg("container is not fully loaded, fetching it") if newContainer, ok := s.containers.Compute(id, func(c *Container, loaded bool) (*Container, bool) { if loaded { ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) diff --git a/internal/container/types.go b/internal/container/types.go index f03fe390..8f94e31a 100644 --- a/internal/container/types.go +++ b/internal/container/types.go @@ -11,20 +11,21 @@ import ( // Container represents an internal representation of docker containers type Container struct { - ID string `json:"id"` - Name string `json:"name"` - Image string `json:"image"` - Command string `json:"command"` - Created time.Time `json:"created"` - StartedAt time.Time `json:"startedAt"` - FinishedAt time.Time `json:"finishedAt"` - State string `json:"state"` - Health string `json:"health,omitempty"` - Host string `json:"host,omitempty"` - Tty bool `json:"-"` - Labels map[string]string `json:"labels,omitempty"` - Stats *utils.RingBuffer[ContainerStat] `json:"stats,omitempty"` - Group string `json:"group,omitempty"` + ID string `json:"id"` + Name string `json:"name"` + Image string `json:"image"` + Command string `json:"command"` + Created time.Time `json:"created"` + StartedAt time.Time `json:"startedAt"` + FinishedAt time.Time `json:"finishedAt"` + State string `json:"state"` + Health string `json:"health,omitempty"` + Host string `json:"host,omitempty"` + Tty bool `json:"-"` + Labels map[string]string `json:"labels,omitempty"` + Stats *utils.RingBuffer[ContainerStat] `json:"stats,omitempty"` + Group string `json:"group,omitempty"` + FullyLoaded bool `json:"-,omitempty"` } // ContainerStat represent stats instant for a container diff --git a/internal/docker/client.go b/internal/docker/client.go index 0bbb5714..8c346695 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -337,16 +337,17 @@ func newContainerFromJSON(c types.ContainerJSON, host string) container.Containe } container := container.Container{ - ID: c.ID[:12], - Name: name, - Image: c.Config.Image, - Command: strings.Join(c.Config.Entrypoint, " ") + " " + strings.Join(c.Config.Cmd, " "), - State: c.State.Status, - Host: host, - Labels: c.Config.Labels, - Stats: utils.NewRingBuffer[container.ContainerStat](300), // 300 seconds of stats - Group: group, - Tty: c.Config.Tty, + ID: c.ID[:12], + Name: name, + Image: c.Config.Image, + Command: strings.Join(c.Config.Entrypoint, " ") + " " + strings.Join(c.Config.Cmd, " "), + State: c.State.Status, + Host: host, + Labels: c.Config.Labels, + Stats: utils.NewRingBuffer[container.ContainerStat](300), // 300 seconds of stats + Group: group, + Tty: c.Config.Tty, + FullyLoaded: true, } if createdAt, err := time.Parse(time.RFC3339Nano, c.Created); err == nil { diff --git a/internal/k8s/client.go b/internal/k8s/client.go index c47e5ba5..e37fa51b 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -85,16 +85,17 @@ func podToContainers(pod *corev1.Pod) []container.Container { var containers []container.Container for _, c := range pod.Spec.Containers { containers = append(containers, container.Container{ - ID: pod.Name + ":" + c.Name, - Name: pod.Name + "/" + c.Name, - Image: c.Image, - Created: pod.CreationTimestamp.Time, - State: phaseToState(pod.Status.Phase), - StartedAt: started, - Command: strings.Join(c.Command, " "), - Host: pod.Spec.NodeName, - Tty: c.TTY, - Stats: utils.NewRingBuffer[container.ContainerStat](300), + ID: pod.Namespace + ":" + pod.Name + ":" + c.Name, + Name: pod.Name + "/" + c.Name, + Image: c.Image, + Created: pod.CreationTimestamp.Time, + State: phaseToState(pod.Status.Phase), + StartedAt: started, + Command: strings.Join(c.Command, " "), + Host: pod.Spec.NodeName, + Tty: c.TTY, + Stats: utils.NewRingBuffer[container.ContainerStat](300), + FullyLoaded: true, }) } return containers @@ -108,16 +109,7 @@ func (k *K8sClient) ListContainers(ctx context.Context, labels container.Contain var containers []container.Container for _, pod := range pods.Items { - for _, c := range pod.Spec.Containers { - containers = append(containers, container.Container{ - ID: pod.Name + ":" + c.Name, - Name: pod.Name + "/" + c.Name, - Image: c.Image, - Created: pod.CreationTimestamp.Time, - State: phaseToState(pod.Status.Phase), - Host: pod.Spec.NodeName, - }) - } + containers = append(containers, podToContainers(&pod)...) } return containers, nil } @@ -141,9 +133,9 @@ func phaseToState(phase corev1.PodPhase) string { func (k *K8sClient) FindContainer(ctx context.Context, id string) (container.Container, error) { log.Debug().Str("id", id).Msg("Finding container") - podName, containerName := parsePodContainerID(id) + namespace, podName, containerName := parsePodContainerID(id) - pod, err := k.Clientset.CoreV1().Pods(k.namespace).Get(ctx, podName, metav1.GetOptions{}) + pod, err := k.Clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) if err != nil { return container.Container{}, err } @@ -158,7 +150,7 @@ func (k *K8sClient) FindContainer(ctx context.Context, id string) (container.Con } func (k *K8sClient) ContainerLogs(ctx context.Context, id string, since time.Time, stdType container.StdType) (io.ReadCloser, error) { - podName, containerName := parsePodContainerID(id) + namespace, podName, containerName := parsePodContainerID(id) opts := &corev1.PodLogOptions{ Container: containerName, Follow: true, @@ -167,11 +159,11 @@ func (k *K8sClient) ContainerLogs(ctx context.Context, id string, since time.Tim SinceTime: &metav1.Time{Time: since}, } - return k.Clientset.CoreV1().Pods(k.namespace).GetLogs(podName, opts).Stream(ctx) + return k.Clientset.CoreV1().Pods(namespace).GetLogs(podName, opts).Stream(ctx) } func (k *K8sClient) ContainerLogsBetweenDates(ctx context.Context, id string, start time.Time, end time.Time, stdType container.StdType) (io.ReadCloser, error) { - podName, containerName := parsePodContainerID(id) + namespace, podName, containerName := parsePodContainerID(id) opts := &corev1.PodLogOptions{ Container: containerName, Follow: false, @@ -179,7 +171,7 @@ func (k *K8sClient) ContainerLogsBetweenDates(ctx context.Context, id string, st SinceTime: &metav1.Time{Time: start}, } - return k.Clientset.CoreV1().Pods(k.namespace).GetLogs(podName, opts).Stream(ctx) + return k.Clientset.CoreV1().Pods(namespace).GetLogs(podName, opts).Stream(ctx) } func (k *K8sClient) ContainerEvents(ctx context.Context, ch chan<- container.ContainerEvent) error { @@ -224,7 +216,7 @@ func (k *K8sClient) ContainerStats(ctx context.Context, id string, stats chan<- } func (k *K8sClient) Ping(ctx context.Context) error { - _, err := k.Clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{Limit: 1}) + _, err := k.Clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{Limit: 1}) return err } @@ -238,7 +230,7 @@ func (k *K8sClient) ContainerActions(ctx context.Context, action container.Conta } // Helper function to parse pod and container names from container ID -func parsePodContainerID(id string) (string, string) { +func parsePodContainerID(id string) (string, string, string) { parts := strings.Split(id, ":") - return parts[0], parts[1] + return parts[0], parts[1], parts[2] } diff --git a/internal/k8s/stats_collector.go b/internal/k8s/stats_collector.go index 2e9147f7..d19bab45 100644 --- a/internal/k8s/stats_collector.go +++ b/internal/k8s/stats_collector.go @@ -102,7 +102,7 @@ func (sc *K8sStatsCollector) Start(parentCtx context.Context) bool { for _, pod := range metricList.Items { for _, c := range pod.Containers { stat := container.ContainerStat{ - ID: pod.Name + ":" + c.Name, + ID: pod.Namespace + ":" + pod.Name + ":" + c.Name, CPUPercent: float64(c.Usage.Cpu().MilliValue()) / 1000 * 100, MemoryUsage: c.Usage.Memory().AsApproximateFloat64(), } diff --git a/internal/support/cli/args.go b/internal/support/cli/args.go index 1cfecb58..5b793a36 100644 --- a/internal/support/cli/args.go +++ b/internal/support/cli/args.go @@ -29,6 +29,7 @@ type Args struct { Mode string `arg:"env:DOZZLE_MODE" default:"server" help:"sets the mode to run in (server, swarm)"` TimeoutString string `arg:"--timeout,env:DOZZLE_TIMEOUT" default:"10s" help:"sets the timeout for docker client"` Timeout time.Duration `arg:"-"` + Namespace string `arg:"env:DOZZLE_NAMESPACE" default:"" help:"sets the namespace to use in k8s"` Healthcheck *HealthcheckCmd `arg:"subcommand:healthcheck" help:"checks if the server is running"` Generate *GenerateCmd `arg:"subcommand:generate" help:"generates a configuration file for simple auth"` Agent *AgentCmd `arg:"subcommand:agent" help:"starts the agent"` diff --git a/main.go b/main.go index d6d0c033..c05e64b0 100644 --- a/main.go +++ b/main.go @@ -203,7 +203,7 @@ func main() { } }() } else if args.Mode == "k8s" { - localClient, err := k8s.NewK8sClient("default") + localClient, err := k8s.NewK8sClient(args.Namespace) if err != nil { log.Fatal().Err(err).Msg("Could not create k8s client") }