mirror of
https://github.com/amir20/dozzle.git
synced 2026-01-04 20:14:59 +01:00
fix: refreshes versions for agent properly after restart (#3160)
This commit is contained in:
@@ -23,9 +23,9 @@ import (
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
client pb.AgentServiceClient
|
||||
host docker.Host
|
||||
conn *grpc.ClientConn
|
||||
client pb.AgentServiceClient
|
||||
conn *grpc.ClientConn
|
||||
endpoint string
|
||||
}
|
||||
|
||||
func NewClient(endpoint string, certificates tls.Certificate, opts ...grpc.DialOption) (*Client, error) {
|
||||
@@ -51,26 +51,11 @@ func NewClient(endpoint string, certificates tls.Certificate, opts ...grpc.DialO
|
||||
}
|
||||
|
||||
client := pb.NewAgentServiceClient(conn)
|
||||
info, err := client.HostInfo(context.Background(), &pb.HostInfoRequest{})
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("failed to get host info: %w", err)
|
||||
}
|
||||
|
||||
return &Client{
|
||||
client: client,
|
||||
conn: conn,
|
||||
|
||||
host: docker.Host{
|
||||
ID: info.Host.Id,
|
||||
Name: info.Host.Name,
|
||||
NCPU: int(info.Host.CpuCores),
|
||||
MemTotal: int64(info.Host.Memory),
|
||||
Endpoint: endpoint,
|
||||
Type: "agent",
|
||||
DockerVersion: info.Host.DockerVersion,
|
||||
AgentVersion: info.Host.AgentVersion,
|
||||
},
|
||||
client: client,
|
||||
conn: conn,
|
||||
endpoint: endpoint,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -361,8 +346,26 @@ func (c *Client) ListContainers() ([]docker.Container, error) {
|
||||
return containers, nil
|
||||
}
|
||||
|
||||
func (c *Client) Host() docker.Host {
|
||||
return c.host
|
||||
func (c *Client) Host() (docker.Host, error) {
|
||||
info, err := c.client.HostInfo(context.Background(), &pb.HostInfoRequest{})
|
||||
if err != nil {
|
||||
return docker.Host{
|
||||
Endpoint: c.endpoint,
|
||||
Type: "agent",
|
||||
Available: false,
|
||||
}, err
|
||||
}
|
||||
|
||||
return docker.Host{
|
||||
ID: info.Host.Id,
|
||||
Name: info.Host.Name,
|
||||
NCPU: int(info.Host.CpuCores),
|
||||
MemTotal: int64(info.Host.Memory),
|
||||
Endpoint: c.endpoint,
|
||||
Type: "agent",
|
||||
DockerVersion: info.Host.DockerVersion,
|
||||
AgentVersion: info.Host.AgentVersion,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) Close() error {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
type agentService struct {
|
||||
client *agent.Client
|
||||
host docker.Host
|
||||
}
|
||||
|
||||
func NewAgentService(client *agent.Client) ClientService {
|
||||
@@ -39,8 +40,16 @@ func (a *agentService) ListContainers() ([]docker.Container, error) {
|
||||
return a.client.ListContainers()
|
||||
}
|
||||
|
||||
func (a *agentService) Host() docker.Host {
|
||||
return a.client.Host()
|
||||
func (a *agentService) Host() (docker.Host, error) {
|
||||
host, err := a.client.Host()
|
||||
if err != nil {
|
||||
host := a.host
|
||||
host.Available = false
|
||||
return host, err
|
||||
}
|
||||
|
||||
a.host = host
|
||||
return a.host, err
|
||||
}
|
||||
|
||||
func (a *agentService) SubscribeStats(ctx context.Context, stats chan<- docker.ContainerStat) {
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
type ClientService interface {
|
||||
FindContainer(id string) (docker.Container, error)
|
||||
ListContainers() ([]docker.Container, error)
|
||||
Host() docker.Host
|
||||
Host() (docker.Host, error)
|
||||
ContainerAction(container docker.Container, action docker.ContainerAction) error
|
||||
LogsBetweenDates(ctx context.Context, container docker.Container, from time.Time, to time.Time, stdTypes docker.StdType) (<-chan *docker.LogEvent, error)
|
||||
RawLogs(ctx context.Context, container docker.Container, from time.Time, to time.Time, stdTypes docker.StdType) (io.ReadCloser, error)
|
||||
@@ -82,8 +82,8 @@ func (d *dockerClientService) ListContainers() ([]docker.Container, error) {
|
||||
return d.store.ListContainers()
|
||||
}
|
||||
|
||||
func (d *dockerClientService) Host() docker.Host {
|
||||
return d.client.Host()
|
||||
func (d *dockerClientService) Host() (docker.Host, error) {
|
||||
return d.client.Host(), nil
|
||||
}
|
||||
|
||||
func (d *dockerClientService) SubscribeStats(ctx context.Context, stats chan<- docker.ContainerStat) {
|
||||
|
||||
@@ -75,8 +75,8 @@ func (m *MultiHostService) ListAllContainers() ([]docker.Container, []error) {
|
||||
for _, client := range clients {
|
||||
list, err := client.ListContainers()
|
||||
if err != nil {
|
||||
log.Debugf("error listing containers for host %s: %v", client.Host().ID, err)
|
||||
host := client.Host()
|
||||
host, _ := client.Host()
|
||||
log.Debugf("error listing containers for host %s: %v", host.ID, err)
|
||||
host.Available = false
|
||||
errors = append(errors, &HostUnavailableError{Host: host, Err: err})
|
||||
continue
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
lop "github.com/samber/lo/parallel"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -26,24 +27,39 @@ func NewRetriableClientManager(agents []string, certs tls.Certificate, clients .
|
||||
|
||||
clientMap := make(map[string]ClientService)
|
||||
for _, client := range clients {
|
||||
if _, ok := clientMap[client.Host().ID]; ok {
|
||||
log.Warnf("duplicate client found for host %s", client.Host().ID)
|
||||
host, err := client.Host()
|
||||
if err != nil {
|
||||
log.Warnf("error fetching host info for client %s: %v", host.ID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := clientMap[host.ID]; ok {
|
||||
log.Warnf("duplicate client found for host %s", host.ID)
|
||||
} else {
|
||||
clientMap[client.Host().ID] = client
|
||||
clientMap[host.ID] = client
|
||||
}
|
||||
}
|
||||
|
||||
failed := make([]string, 0)
|
||||
for _, endpoint := range agents {
|
||||
if agent, err := agent.NewClient(endpoint, certs); err == nil {
|
||||
if _, ok := clientMap[agent.Host().ID]; ok {
|
||||
log.Warnf("duplicate client found for host %s", agent.Host().ID)
|
||||
} else {
|
||||
clientMap[agent.Host().ID] = NewAgentService(agent)
|
||||
}
|
||||
} else {
|
||||
agent, err := agent.NewClient(endpoint, certs)
|
||||
if err != nil {
|
||||
log.Warnf("error creating agent client for %s: %v", endpoint, err)
|
||||
failed = append(failed, endpoint)
|
||||
continue
|
||||
}
|
||||
|
||||
host, err := agent.Host()
|
||||
if err != nil {
|
||||
log.Warnf("error fetching host info for agent %s: %v", endpoint, err)
|
||||
failed = append(failed, endpoint)
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := clientMap[host.ID]; ok {
|
||||
log.Warnf("duplicate client found for host %s", host.ID)
|
||||
} else {
|
||||
clientMap[host.ID] = NewAgentService(agent)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,29 +86,36 @@ func (m *RetriableClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
if len(m.failedAgents) > 0 {
|
||||
newFailed := make([]string, 0)
|
||||
for _, endpoint := range m.failedAgents {
|
||||
if agent, err := agent.NewClient(endpoint, m.certs); err == nil {
|
||||
m.clients[agent.Host().ID] = NewAgentService(agent)
|
||||
|
||||
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
|
||||
host := agent.Host()
|
||||
host.Available = true
|
||||
|
||||
// We don't want to block the subscribers in event.go
|
||||
go func() {
|
||||
select {
|
||||
case channel <- host:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
} else {
|
||||
agent, err := agent.NewClient(endpoint, m.certs)
|
||||
if err != nil {
|
||||
log.Warnf("error creating agent client for %s: %v", endpoint, err)
|
||||
errors = append(errors, err)
|
||||
newFailed = append(newFailed, endpoint)
|
||||
continue
|
||||
}
|
||||
|
||||
host, err := agent.Host()
|
||||
if err != nil {
|
||||
log.Warnf("error fetching host info for agent %s: %v", endpoint, err)
|
||||
errors = append(errors, err)
|
||||
newFailed = append(newFailed, endpoint)
|
||||
continue
|
||||
}
|
||||
|
||||
m.clients[host.ID] = NewAgentService(agent)
|
||||
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
|
||||
host.Available = true
|
||||
|
||||
// We don't want to block the subscribers in event.go
|
||||
go func() {
|
||||
select {
|
||||
case channel <- host:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
m.failedAgents = newFailed
|
||||
}
|
||||
@@ -128,12 +151,17 @@ func (m *RetriableClientManager) String() string {
|
||||
func (m *RetriableClientManager) Hosts() []docker.Host {
|
||||
clients := m.List()
|
||||
|
||||
hosts := make([]docker.Host, 0, len(clients))
|
||||
for _, client := range clients {
|
||||
host := client.Host()
|
||||
host.Available = true
|
||||
hosts = append(hosts, host)
|
||||
}
|
||||
hosts := lop.Map(clients, func(client ClientService, _ int) docker.Host {
|
||||
host, err := client.Host()
|
||||
log.Debugf("host: %v, err: %v", host, err)
|
||||
if err != nil {
|
||||
host.Available = false
|
||||
} else {
|
||||
host.Available = true
|
||||
}
|
||||
|
||||
return host
|
||||
})
|
||||
|
||||
for _, endpoint := range m.failedAgents {
|
||||
hosts = append(hosts, docker.Host{
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"github.com/samber/lo"
|
||||
lop "github.com/samber/lo/parallel"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -78,7 +79,8 @@ func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
|
||||
clients := lo.Values(m.clients)
|
||||
endpoints := lo.KeyBy(clients, func(client ClientService) string {
|
||||
return client.Host().Endpoint
|
||||
host, _ := client.Host()
|
||||
return host.Endpoint
|
||||
})
|
||||
|
||||
log.Debugf("tasks.dozzle = %v, localIP = %v, clients.endpoints = %v", ips, m.localIPs, lo.Keys(endpoints))
|
||||
@@ -101,8 +103,18 @@ func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
continue
|
||||
}
|
||||
|
||||
if agent.Host().ID == m.localClient.Host().ID {
|
||||
log.Debugf("skipping local client with ID %s", agent.Host().ID)
|
||||
host, err := agent.Host()
|
||||
if err != nil {
|
||||
log.Warnf("error getting host data for agent %s: %v", ip, err)
|
||||
errors = append(errors, err)
|
||||
if err := agent.Close(); err != nil {
|
||||
log.Warnf("error closing local client: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if host.ID == m.localClient.Host().ID {
|
||||
log.Debugf("skipping local client with ID %s", host.ID)
|
||||
if err := agent.Close(); err != nil {
|
||||
log.Warnf("error closing local client: %v", err)
|
||||
}
|
||||
@@ -110,11 +122,10 @@ func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
}
|
||||
|
||||
client := NewAgentService(agent)
|
||||
m.clients[agent.Host().ID] = client
|
||||
log.Infof("added client for %s", agent.Host().ID)
|
||||
m.clients[host.ID] = client
|
||||
log.Infof("added client for %s", host.ID)
|
||||
|
||||
m.subscribers.Range(func(ctx context.Context, channel chan<- docker.Host) bool {
|
||||
host := agent.Host()
|
||||
host.Available = true
|
||||
host.Type = "swarm"
|
||||
|
||||
@@ -153,16 +164,18 @@ func (m *SwarmClientManager) Find(id string) (ClientService, bool) {
|
||||
func (m *SwarmClientManager) Hosts() []docker.Host {
|
||||
clients := m.List()
|
||||
|
||||
hosts := make([]docker.Host, 0, len(clients))
|
||||
|
||||
for _, client := range clients {
|
||||
host := client.Host()
|
||||
host.Available = true
|
||||
return lop.Map(clients, func(client ClientService, _ int) docker.Host {
|
||||
host, err := client.Host()
|
||||
if err != nil {
|
||||
host.Available = false
|
||||
} else {
|
||||
host.Available = true
|
||||
}
|
||||
host.Type = "swarm"
|
||||
hosts = append(hosts, host)
|
||||
}
|
||||
|
||||
return hosts
|
||||
return host
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) String() string {
|
||||
|
||||
Reference in New Issue
Block a user