diff --git a/docs/guide/k8s.md b/docs/guide/k8s.md index 22f8a99f..e85353cd 100644 --- a/docs/guide/k8s.md +++ b/docs/guide/k8s.md @@ -139,6 +139,9 @@ spec: value: "default" ``` +> [!NOTE] +> Dozzle supports multiple namespaces, you can set the `DOZZLE_NAMESPACES` environment variable to a comma separated list of namespaces. When multiple namespaces are specified, Dozzle will monitor each namespace separately and combine the results. + ### Labels and Filters `DOZZLE_FILTER` behave similarlty to Docker filters. You can limit the scope of Dozzle using the `DOZZLE_FILTER` environment variable. For example, to scope only to `env=prod`: diff --git a/internal/k8s/client.go b/internal/k8s/client.go index 50cbee79..2b0b4d5d 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -2,9 +2,11 @@ package k8s import ( "context" + "errors" "fmt" "io" "strings" + "sync" "time" "os" @@ -13,9 +15,13 @@ import ( "github.com/amir20/dozzle/internal/utils" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" "github.com/rs/zerolog/log" + "github.com/samber/lo" + lop "github.com/samber/lo/parallel" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -25,15 +31,19 @@ import ( type K8sClient struct { Clientset *kubernetes.Clientset - namespace string + namespace []string config *rest.Config host container.Host } -func NewK8sClient(namespace string) (*K8sClient, error) { +func NewK8sClient(namespace []string) (*K8sClient, error) { var config *rest.Config var err error + if len(namespace) == 0 { + namespace = []string{metav1.NamespaceAll} + } + // Check if we're running in cluster if os.Getenv("KUBERNETES_SERVICE_HOST") != "" { config, err = rest.InClusterConfig() @@ -116,15 +126,36 @@ func (k *K8sClient) ListContainers(ctx context.Context, labels container.Contain } log.Debug().Str("selector", selector).Msg("Listing containers with labels") } - pods, err := k.Clientset.CoreV1().Pods(k.namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) - if err != nil { - return nil, err - } + containerList := lop.Map(k.namespace, func(namespace string, index int) lo.Tuple2[[]container.Container, error] { + pods, err := k.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) + if err != nil { + return lo.T2[[]container.Container, error](nil, fmt.Errorf("failed to list pods in namespace %s: %w", namespace, err)) + } + var containers []container.Container + for _, pod := range pods.Items { + containers = append(containers, podToContainers(&pod)...) + } + return lo.T2[[]container.Container, error](containers, nil) + }) var containers []container.Container - for _, pod := range pods.Items { - containers = append(containers, podToContainers(&pod)...) + var lastError error + success := false + for _, t2 := range containerList { + items, err := t2.Unpack() + if err != nil { + log.Error().Err(err).Msg("failed to fetch containers") + lastError = err + continue + } + success = true + containers = append(containers, items...) } + + if !success { + return nil, lastError + } + return containers, nil } @@ -193,39 +224,57 @@ func (k *K8sClient) ContainerLogsBetweenDates(ctx context.Context, id string, st } func (k *K8sClient) ContainerEvents(ctx context.Context, ch chan<- container.ContainerEvent) error { - watch, err := k.Clientset.CoreV1().Pods(k.namespace).Watch(ctx, metav1.ListOptions{}) - if err != nil { - return err + watchers := lo.Map(k.namespace, func(namespace string, index int) watch.Interface { + watcher, err := k.Clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{}) + if err != nil { + log.Error().Err(err).Msg("Failed to watch pods") + return nil + } + return watcher + }) + + if len(watchers) == 0 { + return errors.New("no namespaces to watch") } - for event := range watch.ResultChan() { - log.Debug().Interface("event.type", event.Type).Msg("Received kubernetes event") - pod, ok := event.Object.(*corev1.Pod) - if !ok { - continue - } + wg := sync.WaitGroup{} + wg.Add(len(watchers)) - name := "" - switch event.Type { - case "ADDED": - name = "create" - case "DELETED": - name = "destroy" - case "MODIFIED": - name = "update" - } + for _, watcher := range watchers { + go func(w watch.Interface) { + defer wg.Done() + for event := range w.ResultChan() { + log.Debug().Interface("event.type", event.Type).Msg("Received kubernetes event") + pod, ok := event.Object.(*corev1.Pod) + if !ok { + continue + } - for _, c := range podToContainers(pod) { - ch <- container.ContainerEvent{ - Name: name, - ActorID: c.ID, - Host: pod.Spec.NodeName, - Time: time.Now(), - Container: &c, + name := "" + switch event.Type { + case "ADDED": + name = "create" + case "DELETED": + name = "destroy" + case "MODIFIED": + name = "update" + } + + for _, c := range podToContainers(pod) { + ch <- container.ContainerEvent{ + Name: name, + ActorID: c.ID, + Host: pod.Spec.NodeName, + Time: time.Now(), + Container: &c, + } + } } - } + }(watcher) } + wg.Wait() + return nil } diff --git a/internal/k8s/stats_collector.go b/internal/k8s/stats_collector.go index 902d07e6..43c4fc45 100644 --- a/internal/k8s/stats_collector.go +++ b/internal/k8s/stats_collector.go @@ -7,6 +7,8 @@ import ( "time" "github.com/amir20/dozzle/internal/container" + lop "github.com/samber/lo/parallel" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -95,28 +97,30 @@ func (sc *K8sStatsCollector) Start(parentCtx context.Context) bool { for { select { case <-ticker.C: - metricList, err := sc.metrics.MetricsV1beta1().PodMetricses(sc.client.namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - panic(err.Error()) - } - for _, pod := range metricList.Items { - for _, c := range pod.Containers { - stat := container.ContainerStat{ - ID: pod.Namespace + ":" + pod.Name + ":" + c.Name, - CPUPercent: float64(c.Usage.Cpu().MilliValue()) / 1000 * 100, - MemoryUsage: c.Usage.Memory().AsApproximateFloat64(), - } - log.Trace().Interface("stat", stat).Msg("k8s stats") - sc.subscribers.Range(func(c context.Context, stats chan<- container.ContainerStat) bool { - select { - case stats <- stat: - case <-c.Done(): - sc.subscribers.Delete(c) - } - return true - }) + lop.ForEach(sc.client.namespace, func(item string, index int) { + metricList, err := sc.metrics.MetricsV1beta1().PodMetricses(item).List(ctx, metav1.ListOptions{}) + if err != nil { + log.Panic().Err(err).Msg("failed to get pod metrics") } - } + for _, pod := range metricList.Items { + for _, c := range pod.Containers { + stat := container.ContainerStat{ + ID: pod.Namespace + ":" + pod.Name + ":" + c.Name, + CPUPercent: float64(c.Usage.Cpu().MilliValue()) / 1000 * 100, + MemoryUsage: c.Usage.Memory().AsApproximateFloat64(), + } + log.Trace().Interface("stat", stat).Msg("k8s stats") + sc.subscribers.Range(func(c context.Context, stats chan<- container.ContainerStat) bool { + select { + case stats <- stat: + case <-c.Done(): + sc.subscribers.Delete(c) + } + return true + }) + } + } + }) case <-ctx.Done(): return true } diff --git a/internal/support/cli/args.go b/internal/support/cli/args.go index d1bc5510..be01aad6 100644 --- a/internal/support/cli/args.go +++ b/internal/support/cli/args.go @@ -31,7 +31,7 @@ type Args struct { Mode string `arg:"env:DOZZLE_MODE" default:"server" help:"sets the mode to run in (server, swarm)"` TimeoutString string `arg:"--timeout,env:DOZZLE_TIMEOUT" default:"10s" help:"sets the timeout for docker client"` Timeout time.Duration `arg:"-"` - Namespace string `arg:"env:DOZZLE_NAMESPACE" default:"" help:"sets the namespace to use in k8s"` + Namespace []string `arg:"env:DOZZLE_NAMESPACE" help:"sets the namespace to use in k8s"` Healthcheck *HealthcheckCmd `arg:"subcommand:healthcheck" help:"checks if the server is running"` Generate *GenerateCmd `arg:"subcommand:generate" help:"generates a configuration file for simple auth"` Agent *AgentCmd `arg:"subcommand:agent" help:"starts the agent"` @@ -72,6 +72,10 @@ func ParseArgs() (Args, interface{}) { args.RemoteHost[i] = strings.TrimSpace(value) } + for i, value := range args.Namespace { + args.Namespace[i] = strings.TrimSpace(value) + } + if args.TimeoutString != "" { timeout, err := time.ParseDuration(args.TimeoutString) if err != nil {