1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-21 21:33:18 +01:00
Files
dozzle/internal/k8s/client.go
2025-08-15 15:29:25 +00:00

390 lines
9.7 KiB
Go

package k8s
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"
"os"
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/utils"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
lop "github.com/samber/lo/parallel"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
)
type K8sClient struct {
Clientset *kubernetes.Clientset
namespace []string
config *rest.Config
host container.Host
}
func NewK8sClient(namespace []string) (*K8sClient, error) {
var config *rest.Config
var err error
if len(namespace) == 0 {
namespace = []string{metav1.NamespaceAll}
}
// Check if we're running in cluster
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" {
config, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
log.Info().Msg("Running in-cluster mode")
} else {
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
kubeconfig = os.Getenv("HOME") + "/.kube/config"
}
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
log.Info().Msgf("Running in local mode with kubeconfig: %s", kubeconfig)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
nodes, err := clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return nil, err
}
if len(nodes.Items) == 0 {
return nil, fmt.Errorf("nodes not found")
}
node := nodes.Items[0]
return &K8sClient{
Clientset: clientset,
namespace: namespace,
config: config,
host: container.Host{
ID: node.Status.NodeInfo.MachineID,
Name: node.Name,
},
}, nil
}
func podToContainers(pod *corev1.Pod) []container.Container {
started := time.Time{}
if pod.Status.StartTime != nil {
started = pod.Status.StartTime.Time
}
var containers []container.Container
for _, c := range pod.Spec.Containers {
containers = append(containers, container.Container{
ID: pod.Namespace + ":" + pod.Name + ":" + c.Name,
Name: pod.Name + "/" + c.Name,
Image: c.Image,
Created: pod.CreationTimestamp.Time,
State: phaseToState(pod.Status.Phase),
StartedAt: started,
Command: strings.Join(c.Command, " "),
Host: pod.Spec.NodeName,
Tty: c.TTY,
Stats: utils.NewRingBuffer[container.ContainerStat](300),
FullyLoaded: true,
})
}
return containers
}
func (k *K8sClient) ListContainers(ctx context.Context, labels container.ContainerLabels) ([]container.Container, error) {
selector := ""
if labels.Exists() {
for key, values := range labels {
for _, value := range values {
if selector != "" {
selector += ","
}
selector += fmt.Sprintf("%s=%s", key, value)
}
}
log.Debug().Str("selector", selector).Msg("Listing containers with labels")
}
containerList := lop.Map(k.namespace, func(namespace string, index int) lo.Tuple2[[]container.Container, error] {
pods, err := k.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
if err != nil {
return lo.T2[[]container.Container, error](nil, fmt.Errorf("failed to list pods in namespace %s: %w", namespace, err))
}
var containers []container.Container
for _, pod := range pods.Items {
containers = append(containers, podToContainers(&pod)...)
}
return lo.T2[[]container.Container, error](containers, nil)
})
var containers []container.Container
var lastError error
success := false
for _, t2 := range containerList {
items, err := t2.Unpack()
if err != nil {
log.Error().Err(err).Msg("failed to fetch containers")
lastError = err
continue
}
success = true
containers = append(containers, items...)
}
if !success {
return nil, lastError
}
return containers, nil
}
func phaseToState(phase corev1.PodPhase) string {
switch phase {
case corev1.PodPending:
return "created"
case corev1.PodRunning:
return "running"
case corev1.PodSucceeded:
return "exited"
case corev1.PodFailed:
return "exited"
case corev1.PodUnknown:
return "unknown"
default:
return "unknown"
}
}
func (k *K8sClient) FindContainer(ctx context.Context, id string) (container.Container, error) {
log.Debug().Str("id", id).Msg("Finding container")
namespace, podName, containerName := parsePodContainerID(id)
pod, err := k.Clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return container.Container{}, err
}
for _, c := range podToContainers(pod) {
if c.ID == id {
return c, nil
}
}
return container.Container{}, fmt.Errorf("container %s not found in pod %s", containerName, podName)
}
func (k *K8sClient) ContainerLogs(ctx context.Context, id string, since time.Time, stdType container.StdType) (io.ReadCloser, error) {
namespace, podName, containerName := parsePodContainerID(id)
var lines int64 = 500
opts := &corev1.PodLogOptions{
Container: containerName,
Follow: true,
Previous: false,
Timestamps: true,
SinceTime: &metav1.Time{Time: since},
TailLines: &lines,
}
return k.Clientset.CoreV1().Pods(namespace).GetLogs(podName, opts).Stream(ctx)
}
func (k *K8sClient) ContainerLogsBetweenDates(ctx context.Context, id string, start time.Time, end time.Time, stdType container.StdType) (io.ReadCloser, error) {
namespace, podName, containerName := parsePodContainerID(id)
opts := &corev1.PodLogOptions{
Container: containerName,
Follow: false,
Timestamps: true,
SinceTime: &metav1.Time{Time: start},
}
return k.Clientset.CoreV1().Pods(namespace).GetLogs(podName, opts).Stream(ctx)
}
func (k *K8sClient) ContainerEvents(ctx context.Context, ch chan<- container.ContainerEvent) error {
watchers := lo.Map(k.namespace, func(namespace string, index int) watch.Interface {
watcher, err := k.Clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{})
if err != nil {
log.Error().Err(err).Msg("Failed to watch pods")
return nil
}
return watcher
})
if len(watchers) == 0 {
return errors.New("no namespaces to watch")
}
wg := sync.WaitGroup{}
for _, watcher := range watchers {
wg.Go(func() {
for event := range watcher.ResultChan() {
log.Debug().Interface("event.type", event.Type).Msg("Received kubernetes event")
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
name := ""
switch event.Type {
case "ADDED":
name = "create"
case "DELETED":
name = "destroy"
case "MODIFIED":
name = "update"
}
for _, c := range podToContainers(pod) {
ch <- container.ContainerEvent{
Name: name,
ActorID: c.ID,
Host: pod.Spec.NodeName,
Time: time.Now(),
Container: &c,
}
}
}
})
}
wg.Wait()
return nil
}
func (k *K8sClient) ContainerStats(ctx context.Context, id string, stats chan<- container.ContainerStat) error {
panic("not implemented")
}
func (k *K8sClient) Ping(ctx context.Context) error {
_, err := k.Clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{Limit: 1})
return err
}
func (k *K8sClient) Host() container.Host {
return k.host
}
func (k *K8sClient) ContainerActions(ctx context.Context, action container.ContainerAction, containerID string) error {
panic("not implemented")
}
func (k *K8sClient) ContainerAttach(ctx context.Context, id string) (io.WriteCloser, io.Reader, error) {
namespace, podName, containerName := parsePodContainerID(id)
log.Debug().Str("container", containerName).Str("pod", podName).Msg("Executing command in pod")
req := k.Clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("attach")
option := &corev1.PodAttachOptions{
Container: containerName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return nil, nil, err
}
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
go func() {
err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdinReader,
Stdout: stdoutWriter,
Tty: true,
})
if err != nil {
log.Error().Err(err).Msg("Error streaming command")
}
}()
return stdinWriter, stdoutReader, nil
}
func (k *K8sClient) ContainerExec(ctx context.Context, id string, cmd []string) (io.WriteCloser, io.Reader, error) {
namespace, podName, containerName := parsePodContainerID(id)
log.Debug().Str("container", containerName).Str("pod", podName).Msg("Executing command in pod")
req := k.Clientset.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec")
option := &corev1.PodExecOptions{
Command: cmd,
Container: containerName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return nil, nil, err
}
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
go func() {
err := exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdinReader,
Stdout: stdoutWriter,
Tty: true,
})
if err != nil {
log.Error().Err(err).Msg("Error streaming command")
}
}()
return stdinWriter, stdoutReader, nil
}
// Helper function to parse pod and container names from container ID
func parsePodContainerID(id string) (string, string, string) {
parts := strings.Split(id, ":")
return parts[0], parts[1], parts[2]
}