mirror of
https://github.com/amir20/dozzle.git
synced 2026-01-04 12:05:07 +01:00
feat: updates agents to be more resilient by reconnecting. also adds big performance issues in swarm mode with little updates to the UI. (#3145)
This commit is contained in:
@@ -3,13 +3,12 @@ package cli
|
||||
import (
|
||||
"embed"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
docker_support "github.com/amir20/dozzle/internal/support/docker"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func CreateMultiHostService(embededCerts embed.FS, args Args) *docker_support.MultiHostService {
|
||||
func CreateMultiHostService(embeddedCerts embed.FS, args Args) *docker_support.MultiHostService {
|
||||
var clients []docker_support.ClientService
|
||||
if len(args.RemoteHost) > 0 {
|
||||
log.Warnf(`Remote host flag is deprecated and will be removed in future versions. Agents will replace remote hosts as a safer and performant option. See https://github.com/amir20/dozzle/issues/3066 for discussion.`)
|
||||
@@ -33,18 +32,6 @@ func CreateMultiHostService(embededCerts embed.FS, args Args) *docker_support.Mu
|
||||
log.Warnf("Could not create client for %s: %s", host.ID, err)
|
||||
}
|
||||
}
|
||||
certs, err := ReadCertificates(embededCerts)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not read certificates: %v", err)
|
||||
}
|
||||
for _, remoteAgent := range args.RemoteAgent {
|
||||
client, err := agent.NewClient(remoteAgent, certs)
|
||||
if err != nil {
|
||||
log.Warnf("Could not connect to remote agent %s: %s", remoteAgent, err)
|
||||
continue
|
||||
}
|
||||
clients = append(clients, docker_support.NewAgentService(client))
|
||||
}
|
||||
|
||||
localClient, err := docker.NewLocalClient(args.Filter, args.Hostname)
|
||||
if err == nil {
|
||||
@@ -59,5 +46,11 @@ func CreateMultiHostService(embededCerts embed.FS, args Args) *docker_support.Mu
|
||||
}
|
||||
}
|
||||
|
||||
return docker_support.NewMultiHostService(clients)
|
||||
certs, err := ReadCertificates(embeddedCerts)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not read certificates: %v", err)
|
||||
}
|
||||
|
||||
clientManager := docker_support.NewRetriableClientManager(args.RemoteAgent, certs, clients...)
|
||||
return docker_support.NewMultiHostService(clientManager)
|
||||
}
|
||||
|
||||
@@ -2,15 +2,10 @@ package docker_support
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
)
|
||||
|
||||
type ContainerFilter = func(*docker.Container) bool
|
||||
@@ -24,106 +19,31 @@ func (h *HostUnavailableError) Error() string {
|
||||
return fmt.Sprintf("host %s unavailable: %v", h.Host.ID, h.Err)
|
||||
}
|
||||
|
||||
type ClientManager interface {
|
||||
Find(id string) (ClientService, bool)
|
||||
List() []ClientService
|
||||
RetryAndList() ([]ClientService, []error)
|
||||
Subscribe(ctx context.Context, channel chan<- docker.Host)
|
||||
Hosts() []docker.Host
|
||||
}
|
||||
|
||||
type MultiHostService struct {
|
||||
clients map[string]ClientService
|
||||
manager ClientManager
|
||||
SwarmMode bool
|
||||
}
|
||||
|
||||
func NewMultiHostService(clients []ClientService) *MultiHostService {
|
||||
func NewMultiHostService(manager ClientManager) *MultiHostService {
|
||||
m := &MultiHostService{
|
||||
clients: make(map[string]ClientService),
|
||||
manager: manager,
|
||||
}
|
||||
|
||||
for _, client := range clients {
|
||||
if _, ok := m.clients[client.Host().ID]; ok {
|
||||
log.Warnf("duplicate host %s found, skipping", client.Host())
|
||||
continue
|
||||
} else {
|
||||
log.Debugf("found a new host %s", client.Host())
|
||||
}
|
||||
m.clients[client.Host().ID] = client
|
||||
}
|
||||
log.Debugf("created multi host service manager %s", manager)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func NewSwarmService(localClient docker.Client, certificates tls.Certificate) *MultiHostService {
|
||||
m := &MultiHostService{
|
||||
clients: make(map[string]ClientService),
|
||||
SwarmMode: true,
|
||||
}
|
||||
|
||||
localService := NewDockerClientService(localClient)
|
||||
m.clients[localClient.Host().ID] = localService
|
||||
|
||||
discover := func() {
|
||||
ips, err := net.LookupIP("tasks.dozzle")
|
||||
if err != nil {
|
||||
log.Fatalf("error looking up swarm services: %v", err)
|
||||
}
|
||||
|
||||
found := 0
|
||||
replaced := 0
|
||||
for _, ip := range ips {
|
||||
clientAgent, err := agent.NewClient(ip.String()+":7007", certificates)
|
||||
if err != nil {
|
||||
log.Warnf("error creating client for %s: %v", ip, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if clientAgent.Host().ID == localClient.Host().ID {
|
||||
closeAgent(clientAgent)
|
||||
continue
|
||||
}
|
||||
|
||||
service := NewAgentService(clientAgent)
|
||||
if existing, ok := m.clients[service.Host().ID]; !ok {
|
||||
log.Debugf("adding swarm service %s", service.Host().ID)
|
||||
m.clients[service.Host().ID] = service
|
||||
found++
|
||||
} else if existing.Host().Endpoint != service.Host().Endpoint {
|
||||
log.Debugf("swarm service %s already exists with different endpoint %s and old value %s", service.Host().ID, service.Host().Endpoint, existing.Host().Endpoint)
|
||||
delete(m.clients, existing.Host().ID)
|
||||
m.clients[service.Host().ID] = service
|
||||
replaced++
|
||||
if existingAgent, ok := existing.(*agentService); ok {
|
||||
closeAgent(existingAgent.client)
|
||||
}
|
||||
} else {
|
||||
closeAgent(clientAgent)
|
||||
}
|
||||
}
|
||||
|
||||
if found > 0 {
|
||||
log.Infof("found %d new dozzle replicas", found)
|
||||
}
|
||||
if replaced > 0 {
|
||||
log.Infof("replaced %d dozzle replicas", replaced)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := backoff.NewTicker(backoff.NewExponentialBackOff(
|
||||
backoff.WithMaxElapsedTime(0)),
|
||||
)
|
||||
for range ticker.C {
|
||||
log.Tracef("discovering swarm services")
|
||||
discover()
|
||||
}
|
||||
}()
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func closeAgent(agent *agent.Client) {
|
||||
log.Tracef("closing agent %s", agent.Host())
|
||||
if err := agent.Close(); err != nil {
|
||||
log.Warnf("error closing agent: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MultiHostService) FindContainer(host string, id string) (*containerService, error) {
|
||||
client, ok := m.clients[host]
|
||||
client, ok := m.manager.Find(host)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("host %s not found", host)
|
||||
}
|
||||
@@ -140,7 +60,7 @@ func (m *MultiHostService) FindContainer(host string, id string) (*containerServ
|
||||
}
|
||||
|
||||
func (m *MultiHostService) ListContainersForHost(host string) ([]docker.Container, error) {
|
||||
client, ok := m.clients[host]
|
||||
client, ok := m.manager.Find(host)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("host %s not found", host)
|
||||
}
|
||||
@@ -150,13 +70,15 @@ func (m *MultiHostService) ListContainersForHost(host string) ([]docker.Containe
|
||||
|
||||
func (m *MultiHostService) ListAllContainers() ([]docker.Container, []error) {
|
||||
containers := make([]docker.Container, 0)
|
||||
var errors []error
|
||||
clients, errors := m.manager.RetryAndList()
|
||||
|
||||
for _, client := range m.clients {
|
||||
for _, client := range clients {
|
||||
list, err := client.ListContainers()
|
||||
if err != nil {
|
||||
log.Debugf("error listing containers for host %s: %v", client.Host().ID, err)
|
||||
errors = append(errors, &HostUnavailableError{Host: client.Host(), Err: err})
|
||||
host := client.Host()
|
||||
host.Available = false
|
||||
errors = append(errors, &HostUnavailableError{Host: host, Err: err})
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -178,7 +100,7 @@ func (m *MultiHostService) ListAllContainersFiltered(filter ContainerFilter) ([]
|
||||
}
|
||||
|
||||
func (m *MultiHostService) SubscribeEventsAndStats(ctx context.Context, events chan<- docker.ContainerEvent, stats chan<- docker.ContainerStat) {
|
||||
for _, client := range m.clients {
|
||||
for _, client := range m.manager.List() {
|
||||
client.SubscribeEvents(ctx, events)
|
||||
client.SubscribeStats(ctx, stats)
|
||||
}
|
||||
@@ -186,7 +108,7 @@ func (m *MultiHostService) SubscribeEventsAndStats(ctx context.Context, events c
|
||||
|
||||
func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, containers chan<- docker.Container, filter ContainerFilter) {
|
||||
newContainers := make(chan docker.Container)
|
||||
for _, client := range m.clients {
|
||||
for _, client := range m.manager.List() {
|
||||
client.SubscribeContainersStarted(ctx, newContainers)
|
||||
}
|
||||
go func() {
|
||||
@@ -208,27 +130,22 @@ func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, conta
|
||||
}
|
||||
|
||||
func (m *MultiHostService) TotalClients() int {
|
||||
return len(m.clients)
|
||||
return len(m.manager.List())
|
||||
}
|
||||
|
||||
func (m *MultiHostService) Hosts() []docker.Host {
|
||||
hosts := make([]docker.Host, 0, len(m.clients))
|
||||
for _, client := range m.clients {
|
||||
hosts = append(hosts, client.Host())
|
||||
}
|
||||
|
||||
return hosts
|
||||
return m.manager.Hosts()
|
||||
}
|
||||
|
||||
func (m *MultiHostService) LocalHost() (docker.Host, error) {
|
||||
host := docker.Host{}
|
||||
|
||||
for _, host := range m.Hosts() {
|
||||
if host.Endpoint == "local" {
|
||||
|
||||
if host.Type == "local" {
|
||||
return host, nil
|
||||
}
|
||||
}
|
||||
|
||||
return host, fmt.Errorf("local host not found")
|
||||
return docker.Host{}, fmt.Errorf("local host not found")
|
||||
}
|
||||
|
||||
func (m *MultiHostService) SubscribeAvailableHosts(ctx context.Context, hosts chan<- docker.Host) {
|
||||
m.manager.Subscribe(ctx, hosts)
|
||||
}
|
||||
|
||||
149
internal/support/docker/retriable_client_manager.go
Normal file
149
internal/support/docker/retriable_client_manager.go
Normal file
@@ -0,0 +1,149 @@
|
||||
package docker_support
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type RetriableClientManager struct {
|
||||
clients map[string]ClientService
|
||||
failedAgents []string
|
||||
certs tls.Certificate
|
||||
mu sync.RWMutex
|
||||
subscribers *xsync.MapOf[context.Context, chan<- docker.Host]
|
||||
}
|
||||
|
||||
func NewRetriableClientManager(agents []string, certs tls.Certificate, clients ...ClientService) *RetriableClientManager {
|
||||
log.Debugf("creating retriable client manager with %d clients and %d agents", len(clients), len(agents))
|
||||
|
||||
clientMap := make(map[string]ClientService)
|
||||
for _, client := range clients {
|
||||
if _, ok := clientMap[client.Host().ID]; ok {
|
||||
log.Warnf("duplicate client found for host %s", client.Host().ID)
|
||||
} else {
|
||||
clientMap[client.Host().ID] = client
|
||||
}
|
||||
}
|
||||
|
||||
failed := make([]string, 0)
|
||||
for _, endpoint := range agents {
|
||||
if agent, err := agent.NewClient(endpoint, certs); err == nil {
|
||||
if _, ok := clientMap[agent.Host().ID]; ok {
|
||||
log.Warnf("duplicate client found for host %s", agent.Host().ID)
|
||||
} else {
|
||||
clientMap[agent.Host().ID] = NewAgentService(agent)
|
||||
}
|
||||
} else {
|
||||
log.Warnf("error creating agent client for %s: %v", endpoint, err)
|
||||
failed = append(failed, endpoint)
|
||||
}
|
||||
}
|
||||
|
||||
return &RetriableClientManager{
|
||||
clients: clientMap,
|
||||
failedAgents: failed,
|
||||
certs: certs,
|
||||
subscribers: xsync.NewMapOf[context.Context, chan<- docker.Host](),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *RetriableClientManager) Subscribe(ctx context.Context, channel chan<- docker.Host) {
|
||||
m.subscribers.Store(ctx, channel)
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
m.subscribers.Delete(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
func (m *RetriableClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
m.mu.Lock()
|
||||
errors := make([]error, 0)
|
||||
if len(m.failedAgents) > 0 {
|
||||
newFailed := make([]string, 0)
|
||||
for _, endpoint := range m.failedAgents {
|
||||
if agent, err := agent.NewClient(endpoint, m.certs); err == nil {
|
||||
m.clients[agent.Host().ID] = NewAgentService(agent)
|
||||
|
||||
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
|
||||
host := agent.Host()
|
||||
host.Available = true
|
||||
|
||||
// We don't want to block the subscribers in event.go
|
||||
go func() {
|
||||
select {
|
||||
case channel <- host:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
} else {
|
||||
log.Warnf("error creating agent client for %s: %v", endpoint, err)
|
||||
errors = append(errors, err)
|
||||
newFailed = append(newFailed, endpoint)
|
||||
}
|
||||
}
|
||||
m.failedAgents = newFailed
|
||||
}
|
||||
|
||||
m.mu.Unlock()
|
||||
|
||||
return m.List(), errors
|
||||
}
|
||||
|
||||
func (m *RetriableClientManager) List() []ClientService {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
clients := make([]ClientService, 0, len(m.clients))
|
||||
for _, client := range m.clients {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
return clients
|
||||
}
|
||||
|
||||
func (m *RetriableClientManager) Find(id string) (ClientService, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
client, ok := m.clients[id]
|
||||
return client, ok
|
||||
}
|
||||
|
||||
func (m *RetriableClientManager) String() string {
|
||||
return fmt.Sprintf("RetriableClientManager{clients: %d, failedAgents: %d}", len(m.clients), len(m.failedAgents))
|
||||
}
|
||||
|
||||
func (m *RetriableClientManager) Hosts() []docker.Host {
|
||||
clients := m.List()
|
||||
|
||||
hosts := make([]docker.Host, 0, len(clients))
|
||||
for _, client := range clients {
|
||||
host := client.Host()
|
||||
host.Available = true
|
||||
hosts = append(hosts, host)
|
||||
}
|
||||
|
||||
for _, endpoint := range m.failedAgents {
|
||||
hosts = append(hosts, docker.Host{
|
||||
ID: endpoint,
|
||||
Name: endpoint,
|
||||
Endpoint: endpoint,
|
||||
Available: false,
|
||||
Type: "agent",
|
||||
})
|
||||
}
|
||||
|
||||
return hosts
|
||||
}
|
||||
170
internal/support/docker/swarm_client_manager.go
Normal file
170
internal/support/docker/swarm_client_manager.go
Normal file
@@ -0,0 +1,170 @@
|
||||
package docker_support
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"github.com/samber/lo"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type SwarmClientManager struct {
|
||||
clients map[string]ClientService
|
||||
certs tls.Certificate
|
||||
mu sync.RWMutex
|
||||
subscribers *xsync.MapOf[context.Context, chan<- docker.Host]
|
||||
localClient docker.Client
|
||||
localIPs []string
|
||||
}
|
||||
|
||||
func localIPs() []string {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
ips := make([]string, 0)
|
||||
for _, address := range addrs {
|
||||
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||||
if ipnet.IP.To4() != nil {
|
||||
ips = append(ips, ipnet.IP.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
return ips
|
||||
}
|
||||
|
||||
func NewSwarmClientManager(localClient docker.Client, certs tls.Certificate) *SwarmClientManager {
|
||||
clientMap := make(map[string]ClientService)
|
||||
localService := NewDockerClientService(localClient)
|
||||
clientMap[localClient.Host().ID] = localService
|
||||
|
||||
return &SwarmClientManager{
|
||||
localClient: localClient,
|
||||
clients: clientMap,
|
||||
certs: certs,
|
||||
subscribers: xsync.NewMapOf[context.Context, chan<- docker.Host](),
|
||||
localIPs: localIPs(),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) Subscribe(ctx context.Context, channel chan<- docker.Host) {
|
||||
m.subscribers.Store(ctx, channel)
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
m.subscribers.Delete(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
m.mu.Lock()
|
||||
errors := make([]error, 0)
|
||||
|
||||
ips, err := net.LookupIP("tasks.dozzle")
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("error looking up swarm services: %v", err)
|
||||
errors = append(errors, err)
|
||||
return m.List(), errors
|
||||
}
|
||||
|
||||
clients := lo.Values(m.clients)
|
||||
endpoints := lo.KeyBy(clients, func(client ClientService) string {
|
||||
return client.Host().Endpoint
|
||||
})
|
||||
|
||||
log.Debugf("tasks.dozzle = %v, localIP = %v, clients.endpoints = %v", ips, m.localIPs, lo.Keys(endpoints))
|
||||
|
||||
for _, ip := range ips {
|
||||
if lo.Contains(m.localIPs, ip.String()) {
|
||||
log.Debugf("skipping local ip %s", ip.String())
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := endpoints[ip.String()+":7007"]; ok {
|
||||
log.Debugf("skipping existing client for %s", ip.String())
|
||||
continue
|
||||
}
|
||||
|
||||
agent, err := agent.NewClient(ip.String()+":7007", m.certs)
|
||||
if err != nil {
|
||||
log.Warnf("error creating client for %s: %v", ip, err)
|
||||
errors = append(errors, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if agent.Host().ID == m.localClient.Host().ID {
|
||||
log.Debugf("skipping local client with ID %s", agent.Host().ID)
|
||||
if err := agent.Close(); err != nil {
|
||||
log.Warnf("error closing local client: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
client := NewAgentService(agent)
|
||||
m.clients[agent.Host().ID] = client
|
||||
log.Infof("added client for %s", agent.Host().ID)
|
||||
|
||||
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
|
||||
host := agent.Host()
|
||||
host.Available = true
|
||||
host.Type = "swarm"
|
||||
|
||||
// We don't want to block the subscribers in event.go
|
||||
go func() {
|
||||
select {
|
||||
case channel <- host:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
m.mu.Unlock()
|
||||
|
||||
return m.List(), errors
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) List() []ClientService {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
return lo.Values(m.clients)
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) Find(id string) (ClientService, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
client, ok := m.clients[id]
|
||||
return client, ok
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) Hosts() []docker.Host {
|
||||
clients := m.List()
|
||||
|
||||
hosts := make([]docker.Host, 0, len(clients))
|
||||
|
||||
for _, client := range clients {
|
||||
host := client.Host()
|
||||
host.Available = true
|
||||
host.Type = "swarm"
|
||||
hosts = append(hosts, host)
|
||||
}
|
||||
|
||||
return hosts
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) String() string {
|
||||
return fmt.Sprintf("SwarmClientManager{clients: %d}", len(m.clients))
|
||||
}
|
||||
Reference in New Issue
Block a user