mirror of
https://github.com/amir20/dozzle.git
synced 2025-12-21 13:23:07 +01:00
perf: introduces a timeout when fetching host info (#3276)
This commit is contained in:
@@ -6,10 +6,11 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"github.com/goccy/go-json"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/goccy/go-json"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent/pb"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/amir20/dozzle/internal/utils"
|
||||
@@ -260,8 +261,8 @@ func (c *Client) StreamNewContainers(ctx context.Context, containers chan<- dock
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) FindContainer(containerID string) (docker.Container, error) {
|
||||
response, err := c.client.FindContainer(context.Background(), &pb.FindContainerRequest{ContainerId: containerID})
|
||||
func (c *Client) FindContainer(ctx context.Context, containerID string) (docker.Container, error) {
|
||||
response, err := c.client.FindContainer(ctx, &pb.FindContainerRequest{ContainerId: containerID})
|
||||
if err != nil {
|
||||
return docker.Container{}, err
|
||||
}
|
||||
@@ -294,8 +295,8 @@ func (c *Client) FindContainer(containerID string) (docker.Container, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) ListContainers() ([]docker.Container, error) {
|
||||
response, err := c.client.ListContainers(context.Background(), &pb.ListContainersRequest{})
|
||||
func (c *Client) ListContainers(ctx context.Context) ([]docker.Container, error) {
|
||||
response, err := c.client.ListContainers(ctx, &pb.ListContainersRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -332,8 +333,8 @@ func (c *Client) ListContainers() ([]docker.Container, error) {
|
||||
return containers, nil
|
||||
}
|
||||
|
||||
func (c *Client) Host() (docker.Host, error) {
|
||||
info, err := c.client.HostInfo(context.Background(), &pb.HostInfoRequest{})
|
||||
func (c *Client) Host(ctx context.Context) (docker.Host, error) {
|
||||
info, err := c.client.HostInfo(ctx, &pb.HostInfoRequest{})
|
||||
if err != nil {
|
||||
return docker.Host{
|
||||
Endpoint: c.endpoint,
|
||||
@@ -354,7 +355,7 @@ func (c *Client) Host() (docker.Host, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) ContainerAction(containerId string, action docker.ContainerAction) error {
|
||||
func (c *Client) ContainerAction(ctx context.Context, containerId string, action docker.ContainerAction) error {
|
||||
var containerAction pb.ContainerAction
|
||||
switch action {
|
||||
case docker.Start:
|
||||
@@ -368,7 +369,7 @@ func (c *Client) ContainerAction(containerId string, action docker.ContainerActi
|
||||
|
||||
}
|
||||
|
||||
_, err := c.client.ContainerAction(context.Background(), &pb.ContainerActionRequest{ContainerId: containerId, Action: containerAction})
|
||||
_, err := c.client.ContainerAction(ctx, &pb.ContainerActionRequest{ContainerId: containerId, Action: containerAction})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -31,13 +31,13 @@ type MockedClient struct {
|
||||
docker.Client
|
||||
}
|
||||
|
||||
func (m *MockedClient) FindContainer(id string) (docker.Container, error) {
|
||||
args := m.Called(id)
|
||||
func (m *MockedClient) FindContainer(ctx context.Context, id string) (docker.Container, error) {
|
||||
args := m.Called(ctx, id)
|
||||
return args.Get(0).(docker.Container), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockedClient) ContainerActions(action docker.ContainerAction, containerID string) error {
|
||||
args := m.Called(action, containerID)
|
||||
func (m *MockedClient) ContainerActions(ctx context.Context, action docker.ContainerAction, containerID string) error {
|
||||
args := m.Called(ctx, action, containerID)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
@@ -46,8 +46,8 @@ func (m *MockedClient) ContainerEvents(ctx context.Context, events chan<- docker
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockedClient) ListContainers() ([]docker.Container, error) {
|
||||
args := m.Called()
|
||||
func (m *MockedClient) ListContainers(ctx context.Context) ([]docker.Container, error) {
|
||||
args := m.Called(ctx)
|
||||
return args.Get(0).([]docker.Container), args.Error(1)
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ func init() {
|
||||
}
|
||||
|
||||
client = &MockedClient{}
|
||||
client.On("ListContainers").Return([]docker.Container{
|
||||
client.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{
|
||||
ID: "123456",
|
||||
Name: "test",
|
||||
@@ -111,7 +111,7 @@ func init() {
|
||||
time.Sleep(5 * time.Second)
|
||||
})
|
||||
|
||||
client.On("FindContainer", "123456").Return(docker.Container{
|
||||
client.On("FindContainer", mock.Anything, "123456").Return(docker.Container{
|
||||
ID: "123456",
|
||||
Name: "test",
|
||||
Host: "localhost",
|
||||
@@ -142,7 +142,7 @@ func TestFindContainer(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
container, _ := rpc.FindContainer("123456")
|
||||
container, _ := rpc.FindContainer(context.Background(), "123456")
|
||||
|
||||
assert.Equal(t, container, docker.Container{
|
||||
ID: "123456",
|
||||
@@ -167,7 +167,7 @@ func TestListContainers(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
containers, _ := rpc.ListContainers()
|
||||
containers, _ := rpc.ListContainers(context.Background())
|
||||
|
||||
assert.Equal(t, containers, []docker.Container{
|
||||
{
|
||||
|
||||
@@ -79,7 +79,7 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe
|
||||
return err
|
||||
}
|
||||
|
||||
container, err := s.client.FindContainer(in.ContainerId)
|
||||
container, err := s.client.FindContainer(out.Context(), in.ContainerId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -295,7 +295,7 @@ func (s *server) ContainerAction(ctx context.Context, in *pb.ContainerActionRequ
|
||||
return nil, status.Error(codes.InvalidArgument, "invalid action")
|
||||
}
|
||||
|
||||
err := s.client.ContainerActions(action, in.ContainerId)
|
||||
err := s.client.ContainerActions(ctx, action, in.ContainerId)
|
||||
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
|
||||
@@ -59,15 +59,15 @@ type DockerCLI interface {
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
ListContainers() ([]Container, error)
|
||||
FindContainer(string) (Container, error)
|
||||
ListContainers(context.Context) ([]Container, error)
|
||||
FindContainer(context.Context, string) (Container, error)
|
||||
ContainerLogs(context.Context, string, time.Time, StdType) (io.ReadCloser, error)
|
||||
ContainerEvents(context.Context, chan<- ContainerEvent) error
|
||||
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time, StdType) (io.ReadCloser, error)
|
||||
ContainerStats(context.Context, string, chan<- ContainerStat) error
|
||||
Ping(context.Context) (types.Ping, error)
|
||||
Host() Host
|
||||
ContainerActions(action ContainerAction, containerID string) error
|
||||
ContainerActions(ctx context.Context, action ContainerAction, containerID string) error
|
||||
IsSwarmMode() bool
|
||||
SystemInfo() system.Info
|
||||
}
|
||||
@@ -179,9 +179,9 @@ func NewRemoteClient(f map[string][]string, host Host) (Client, error) {
|
||||
}
|
||||
|
||||
// Finds a container by id, skipping the filters
|
||||
func (d *httpClient) FindContainer(id string) (Container, error) {
|
||||
func (d *httpClient) FindContainer(ctx context.Context, id string) (Container, error) {
|
||||
log.Debug().Str("id", id).Msg("Finding container")
|
||||
if json, err := d.cli.ContainerInspect(context.Background(), id); err == nil {
|
||||
if json, err := d.cli.ContainerInspect(ctx, id); err == nil {
|
||||
return newContainerFromJSON(json, d.host.ID), nil
|
||||
} else {
|
||||
return Container{}, err
|
||||
@@ -189,26 +189,26 @@ func (d *httpClient) FindContainer(id string) (Container, error) {
|
||||
|
||||
}
|
||||
|
||||
func (d *httpClient) ContainerActions(action ContainerAction, containerID string) error {
|
||||
func (d *httpClient) ContainerActions(ctx context.Context, action ContainerAction, containerID string) error {
|
||||
switch action {
|
||||
case Start:
|
||||
return d.cli.ContainerStart(context.Background(), containerID, container.StartOptions{})
|
||||
return d.cli.ContainerStart(ctx, containerID, container.StartOptions{})
|
||||
case Stop:
|
||||
return d.cli.ContainerStop(context.Background(), containerID, container.StopOptions{})
|
||||
return d.cli.ContainerStop(ctx, containerID, container.StopOptions{})
|
||||
case Restart:
|
||||
return d.cli.ContainerRestart(context.Background(), containerID, container.StopOptions{})
|
||||
return d.cli.ContainerRestart(ctx, containerID, container.StopOptions{})
|
||||
default:
|
||||
return fmt.Errorf("unknown action: %s", action)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *httpClient) ListContainers() ([]Container, error) {
|
||||
func (d *httpClient) ListContainers(ctx context.Context) ([]Container, error) {
|
||||
log.Debug().Interface("filter", d.filters).Str("host", d.host.Name).Msg("Listing containers")
|
||||
containerListOptions := container.ListOptions{
|
||||
Filters: d.filters,
|
||||
All: true,
|
||||
}
|
||||
list, err := d.cli.ContainerList(context.Background(), containerListOptions)
|
||||
list, err := d.cli.ContainerList(ctx, containerListOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ func Test_dockerClient_ListContainers_null(t *testing.T) {
|
||||
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(nil, nil)
|
||||
client := &httpClient{proxy, filters.NewArgs(), Host{ID: "localhost"}, system.Info{}}
|
||||
|
||||
list, err := client.ListContainers()
|
||||
list, err := client.ListContainers(context.Background())
|
||||
assert.Empty(t, list, "list should be empty")
|
||||
require.NoError(t, err, "error should not return an error.")
|
||||
|
||||
@@ -104,7 +104,7 @@ func Test_dockerClient_ListContainers_error(t *testing.T) {
|
||||
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(nil, errors.New("test"))
|
||||
client := &httpClient{proxy, filters.NewArgs(), Host{ID: "localhost"}, system.Info{}}
|
||||
|
||||
list, err := client.ListContainers()
|
||||
list, err := client.ListContainers(context.Background())
|
||||
assert.Nil(t, list, "list should be nil")
|
||||
require.Error(t, err, "test.")
|
||||
|
||||
@@ -127,7 +127,7 @@ func Test_dockerClient_ListContainers_happy(t *testing.T) {
|
||||
proxy.On("ContainerList", mock.Anything, mock.Anything).Return(containers, nil)
|
||||
client := &httpClient{proxy, filters.NewArgs(), Host{ID: "localhost"}, system.Info{}}
|
||||
|
||||
list, err := client.ListContainers()
|
||||
list, err := client.ListContainers(context.Background())
|
||||
require.NoError(t, err, "error should not return an error.")
|
||||
|
||||
Ids := []string{"1234567890_a", "abcdefghijkl"}
|
||||
@@ -191,7 +191,7 @@ func Test_dockerClient_FindContainer_happy(t *testing.T) {
|
||||
|
||||
client := &httpClient{proxy, filters.NewArgs(), Host{ID: "localhost"}, system.Info{}}
|
||||
|
||||
container, err := client.FindContainer("abcdefghijkl")
|
||||
container, err := client.FindContainer(context.Background(), "abcdefghijkl")
|
||||
require.NoError(t, err, "error should not be thrown")
|
||||
|
||||
assert.Equal(t, container.ID, "abcdefghijkl")
|
||||
@@ -204,7 +204,7 @@ func Test_dockerClient_FindContainer_error(t *testing.T) {
|
||||
proxy.On("ContainerInspect", mock.Anything, "not_valid").Return(types.ContainerJSON{}, errors.New("not found"))
|
||||
client := &httpClient{proxy, filters.NewArgs(), Host{ID: "localhost"}, system.Info{}}
|
||||
|
||||
_, err := client.FindContainer("not_valid")
|
||||
_, err := client.FindContainer(context.Background(), "not_valid")
|
||||
require.Error(t, err, "error should be thrown")
|
||||
|
||||
proxy.AssertExpectations(t)
|
||||
@@ -222,14 +222,14 @@ func Test_dockerClient_ContainerActions_happy(t *testing.T) {
|
||||
proxy.On("ContainerStop", mock.Anything, "abcdefghijkl", mock.Anything).Return(nil)
|
||||
proxy.On("ContainerRestart", mock.Anything, "abcdefghijkl", mock.Anything).Return(nil)
|
||||
|
||||
container, err := client.FindContainer("abcdefghijkl")
|
||||
container, err := client.FindContainer(context.Background(), "abcdefghijkl")
|
||||
require.NoError(t, err, "error should not be thrown")
|
||||
|
||||
assert.Equal(t, container.ID, "abcdefghijkl")
|
||||
|
||||
actions := []string{"start", "stop", "restart"}
|
||||
for _, action := range actions {
|
||||
err := client.ContainerActions(ContainerAction(action), container.ID)
|
||||
err := client.ContainerActions(context.Background(), ContainerAction(action), container.ID)
|
||||
require.NoError(t, err, "error should not be thrown")
|
||||
assert.Equal(t, err, nil)
|
||||
}
|
||||
@@ -246,12 +246,12 @@ func Test_dockerClient_ContainerActions_error(t *testing.T) {
|
||||
proxy.On("ContainerStop", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test"))
|
||||
proxy.On("ContainerRestart", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test"))
|
||||
|
||||
container, err := client.FindContainer("random-id")
|
||||
container, err := client.FindContainer(context.Background(), "random-id")
|
||||
require.Error(t, err, "error should be thrown")
|
||||
|
||||
actions := []string{"start", "stop", "restart"}
|
||||
for _, action := range actions {
|
||||
err := client.ContainerActions(ContainerAction(action), container.ID)
|
||||
err := client.ContainerActions(context.Background(), ContainerAction(action), container.ID)
|
||||
require.Error(t, err, "error should be thrown")
|
||||
assert.Error(t, err, "error should have been returned")
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"github.com/rs/zerolog/log"
|
||||
@@ -59,7 +60,9 @@ func (s *ContainerStore) checkConnectivity() error {
|
||||
s.connected.Store(false)
|
||||
}()
|
||||
|
||||
if containers, err := s.client.ListContainers(); err != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) // 3s is enough to fetch all containers
|
||||
defer cancel()
|
||||
if containers, err := s.client.ListContainers(ctx); err != nil {
|
||||
return err
|
||||
} else {
|
||||
s.containers.Clear()
|
||||
@@ -81,7 +84,9 @@ func (s *ContainerStore) checkConnectivity() error {
|
||||
}
|
||||
go func(c Container, i int) {
|
||||
defer sem.Release(1)
|
||||
if container, err := s.client.FindContainer(c.ID); err == nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) // 2s is hardcoded timeout for fetching container
|
||||
defer cancel()
|
||||
if container, err := s.client.FindContainer(ctx, c.ID); err == nil {
|
||||
s.containers.Store(c.ID, &container)
|
||||
}
|
||||
}(c, i)
|
||||
@@ -173,8 +178,10 @@ func (s *ContainerStore) init() {
|
||||
log.Trace().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
|
||||
switch event.Name {
|
||||
case "start":
|
||||
if container, err := s.client.FindContainer(event.ActorID); err == nil {
|
||||
list, _ := s.client.ListContainers()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
|
||||
if container, err := s.client.FindContainer(ctx, event.ActorID); err == nil {
|
||||
list, _ := s.client.ListContainers(ctx)
|
||||
|
||||
// make sure the container is in the list of containers when using filter
|
||||
valid := lo.ContainsBy(list, func(item Container) bool {
|
||||
@@ -193,6 +200,7 @@ func (s *ContainerStore) init() {
|
||||
})
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
case "destroy":
|
||||
log.Debug().Str("id", event.ActorID).Msg("container destroyed")
|
||||
s.containers.Delete(event.ActorID)
|
||||
|
||||
@@ -14,13 +14,13 @@ type mockedClient struct {
|
||||
Client
|
||||
}
|
||||
|
||||
func (m *mockedClient) ListContainers() ([]Container, error) {
|
||||
args := m.Called()
|
||||
func (m *mockedClient) ListContainers(ctx context.Context) ([]Container, error) {
|
||||
args := m.Called(ctx)
|
||||
return args.Get(0).([]Container), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockedClient) FindContainer(id string) (Container, error) {
|
||||
args := m.Called(id)
|
||||
func (m *mockedClient) FindContainer(ctx context.Context, id string) (Container, error) {
|
||||
args := m.Called(ctx, id)
|
||||
return args.Get(0).(Container), args.Error(1)
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func (m *mockedClient) Host() Host {
|
||||
func TestContainerStore_List(t *testing.T) {
|
||||
|
||||
client := new(mockedClient)
|
||||
client.On("ListContainers").Return([]Container{
|
||||
client.On("ListContainers", mock.Anything).Return([]Container{
|
||||
{
|
||||
ID: "1234",
|
||||
Name: "test",
|
||||
@@ -56,7 +56,7 @@ func TestContainerStore_List(t *testing.T) {
|
||||
ID: "localhost",
|
||||
})
|
||||
|
||||
client.On("FindContainer", "1234").Return(Container{
|
||||
client.On("FindContainer", mock.Anything, "1234").Return(Container{
|
||||
ID: "1234",
|
||||
Name: "test",
|
||||
Image: "test",
|
||||
@@ -74,7 +74,7 @@ func TestContainerStore_List(t *testing.T) {
|
||||
|
||||
func TestContainerStore_die(t *testing.T) {
|
||||
client := new(mockedClient)
|
||||
client.On("ListContainers").Return([]Container{
|
||||
client.On("ListContainers", mock.Anything).Return([]Container{
|
||||
{
|
||||
ID: "1234",
|
||||
Name: "test",
|
||||
@@ -100,7 +100,7 @@ func TestContainerStore_die(t *testing.T) {
|
||||
|
||||
client.On("ContainerStats", mock.Anything, "1234", mock.AnythingOfType("chan<- docker.ContainerStat")).Return(nil)
|
||||
|
||||
client.On("FindContainer", "1234").Return(Container{
|
||||
client.On("FindContainer", mock.Anything, "1234").Return(Container{
|
||||
ID: "1234",
|
||||
Name: "test",
|
||||
Image: "test",
|
||||
|
||||
@@ -88,16 +88,17 @@ func (sc *StatsCollector) Start(parentCtx context.Context) bool {
|
||||
sc.reset()
|
||||
sc.totalStarted.Add(1)
|
||||
|
||||
var ctx context.Context
|
||||
sc.mu.Lock()
|
||||
if sc.stopper != nil {
|
||||
sc.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
var ctx context.Context
|
||||
ctx, sc.stopper = context.WithCancel(parentCtx)
|
||||
sc.mu.Unlock()
|
||||
|
||||
if containers, err := sc.client.ListContainers(); err == nil {
|
||||
timeoutCtx, cancel := context.WithTimeout(parentCtx, 3*time.Second) // 3 seconds to list containers is hard limit
|
||||
if containers, err := sc.client.ListContainers(timeoutCtx); err == nil {
|
||||
for _, c := range containers {
|
||||
if c.State == "running" {
|
||||
go streamStats(ctx, sc, c.ID)
|
||||
@@ -106,6 +107,7 @@ func (sc *StatsCollector) Start(parentCtx context.Context) bool {
|
||||
} else {
|
||||
log.Error().Str("host", sc.client.Host().Name).Err(err).Msg("failed to list containers")
|
||||
}
|
||||
cancel()
|
||||
|
||||
events := make(chan ContainerEvent)
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
func startedCollector(ctx context.Context) *StatsCollector {
|
||||
client := new(mockedClient)
|
||||
client.On("ListContainers").Return([]Container{
|
||||
client.On("ListContainers", mock.Anything).Return([]Container{
|
||||
{
|
||||
ID: "1234",
|
||||
Name: "test",
|
||||
@@ -46,6 +46,7 @@ func startedCollector(ctx context.Context) *StatsCollector {
|
||||
|
||||
return collector
|
||||
}
|
||||
|
||||
func TestCancelers(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
@@ -1,18 +1,19 @@
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func RPCRequest(addr string, certs tls.Certificate) error {
|
||||
func RPCRequest(ctx context.Context, addr string, certs tls.Certificate) error {
|
||||
client, err := agent.NewClient(addr, certs)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to create agent client")
|
||||
}
|
||||
containers, err := client.ListContainers()
|
||||
containers, err := client.ListContainers(ctx)
|
||||
log.Trace().Int("containers", len(containers)).Msg("Healtcheck RPC request completed")
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package cli
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alexflint/go-arg"
|
||||
)
|
||||
@@ -24,6 +25,8 @@ type Args struct {
|
||||
RemoteAgent []string `arg:"env:DOZZLE_REMOTE_AGENT,--remote-agent,separate" help:"list of agents to connect remotely"`
|
||||
NoAnalytics bool `arg:"--no-analytics,env:DOZZLE_NO_ANALYTICS" help:"disables anonymous analytics"`
|
||||
Mode string `arg:"env:DOZZLE_MODE" default:"server" help:"sets the mode to run in (server, swarm)"`
|
||||
TimeoutString string `arg:"env:DOZZLE_TIMEOUT" default:"3s" help:"sets the timeout for docker client"`
|
||||
Timeout time.Duration `arg:"-"`
|
||||
Healthcheck *HealthcheckCmd `arg:"subcommand:healthcheck" help:"checks if the server is running"`
|
||||
Generate *GenerateCmd `arg:"subcommand:generate" help:"generates a configuration file for simple auth"`
|
||||
Agent *AgentCmd `arg:"subcommand:agent" help:"starts the agent"`
|
||||
@@ -65,5 +68,13 @@ func ParseArgs() (Args, interface{}) {
|
||||
args.Filter[key] = append(args.Filter[key], val)
|
||||
}
|
||||
|
||||
if args.TimeoutString != "" {
|
||||
timeout, err := time.ParseDuration(args.TimeoutString)
|
||||
if err != nil {
|
||||
parser.Fail("timeout should be a valid duration")
|
||||
}
|
||||
args.Timeout = timeout
|
||||
}
|
||||
|
||||
return args, parser.Subcommand()
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
@@ -22,7 +23,9 @@ func CreateMultiHostService(embeddedCerts embed.FS, args Args) (docker.Client, *
|
||||
|
||||
log.Info().Interface("host", host).Msg("Adding remote host")
|
||||
if client, err := docker.NewRemoteClient(args.Filter, host); err == nil {
|
||||
if _, err := client.ListContainers(); err == nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), args.Timeout)
|
||||
defer cancel()
|
||||
if _, err := client.ListContainers(ctx); err == nil {
|
||||
clients = append(clients, docker_support.NewDockerClientService(client))
|
||||
} else {
|
||||
log.Warn().Err(err).Interface("host", host).Msg("Could not connect to remote host")
|
||||
@@ -34,7 +37,9 @@ func CreateMultiHostService(embeddedCerts embed.FS, args Args) (docker.Client, *
|
||||
|
||||
localClient, err := docker.NewLocalClient(args.Filter, args.Hostname)
|
||||
if err == nil {
|
||||
_, err := localClient.ListContainers()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), args.Timeout)
|
||||
defer cancel()
|
||||
_, err := localClient.ListContainers(ctx)
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("Could not connect to local Docker Engine")
|
||||
} else {
|
||||
@@ -48,6 +53,6 @@ func CreateMultiHostService(embeddedCerts embed.FS, args Args) (docker.Client, *
|
||||
log.Fatal().Err(err).Msg("Could not read certificates")
|
||||
}
|
||||
|
||||
clientManager := docker_support.NewRetriableClientManager(args.RemoteAgent, certs, clients...)
|
||||
return localClient, docker_support.NewMultiHostService(clientManager)
|
||||
clientManager := docker_support.NewRetriableClientManager(args.RemoteAgent, args.Timeout, certs, clients...)
|
||||
return localClient, docker_support.NewMultiHostService(clientManager, args.Timeout)
|
||||
}
|
||||
|
||||
@@ -20,8 +20,8 @@ func NewAgentService(client *agent.Client) ClientService {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *agentService) FindContainer(id string) (docker.Container, error) {
|
||||
return a.client.FindContainer(id)
|
||||
func (a *agentService) FindContainer(ctx context.Context, id string) (docker.Container, error) {
|
||||
return a.client.FindContainer(ctx, id)
|
||||
}
|
||||
|
||||
func (a *agentService) RawLogs(ctx context.Context, container docker.Container, from time.Time, to time.Time, stdTypes docker.StdType) (io.ReadCloser, error) {
|
||||
@@ -36,12 +36,12 @@ func (a *agentService) StreamLogs(ctx context.Context, container docker.Containe
|
||||
return a.client.StreamContainerLogs(ctx, container.ID, from, stdTypes, events)
|
||||
}
|
||||
|
||||
func (a *agentService) ListContainers() ([]docker.Container, error) {
|
||||
return a.client.ListContainers()
|
||||
func (a *agentService) ListContainers(ctx context.Context) ([]docker.Container, error) {
|
||||
return a.client.ListContainers(ctx)
|
||||
}
|
||||
|
||||
func (a *agentService) Host() (docker.Host, error) {
|
||||
host, err := a.client.Host()
|
||||
func (a *agentService) Host(ctx context.Context) (docker.Host, error) {
|
||||
host, err := a.client.Host(ctx)
|
||||
if err != nil {
|
||||
host := a.host
|
||||
host.Available = false
|
||||
@@ -64,6 +64,6 @@ func (d *agentService) SubscribeContainersStarted(ctx context.Context, container
|
||||
go d.client.StreamNewContainers(ctx, containers)
|
||||
}
|
||||
|
||||
func (a *agentService) ContainerAction(container docker.Container, action docker.ContainerAction) error {
|
||||
return a.client.ContainerAction(container.ID, action)
|
||||
func (a *agentService) ContainerAction(ctx context.Context, container docker.Container, action docker.ContainerAction) error {
|
||||
return a.client.ContainerAction(ctx, container.ID, action)
|
||||
}
|
||||
|
||||
@@ -9,10 +9,10 @@ import (
|
||||
)
|
||||
|
||||
type ClientService interface {
|
||||
FindContainer(id string) (docker.Container, error)
|
||||
ListContainers() ([]docker.Container, error)
|
||||
Host() (docker.Host, error)
|
||||
ContainerAction(container docker.Container, action docker.ContainerAction) error
|
||||
FindContainer(ctx context.Context, id string) (docker.Container, error)
|
||||
ListContainers(ctx context.Context) ([]docker.Container, error)
|
||||
Host(ctx context.Context) (docker.Host, error)
|
||||
ContainerAction(ctx context.Context, container docker.Container, action docker.ContainerAction) error
|
||||
LogsBetweenDates(ctx context.Context, container docker.Container, from time.Time, to time.Time, stdTypes docker.StdType) (<-chan *docker.LogEvent, error)
|
||||
RawLogs(ctx context.Context, container docker.Container, from time.Time, to time.Time, stdTypes docker.StdType) (io.ReadCloser, error)
|
||||
|
||||
@@ -70,19 +70,19 @@ func (d *dockerClientService) StreamLogs(ctx context.Context, container docker.C
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dockerClientService) FindContainer(id string) (docker.Container, error) {
|
||||
func (d *dockerClientService) FindContainer(ctx context.Context, id string) (docker.Container, error) {
|
||||
return d.store.FindContainer(id)
|
||||
}
|
||||
|
||||
func (d *dockerClientService) ContainerAction(container docker.Container, action docker.ContainerAction) error {
|
||||
return d.client.ContainerActions(action, container.ID)
|
||||
func (d *dockerClientService) ContainerAction(ctx context.Context, container docker.Container, action docker.ContainerAction) error {
|
||||
return d.client.ContainerActions(ctx, action, container.ID)
|
||||
}
|
||||
|
||||
func (d *dockerClientService) ListContainers() ([]docker.Container, error) {
|
||||
func (d *dockerClientService) ListContainers(ctx context.Context) ([]docker.Container, error) {
|
||||
return d.store.ListContainers()
|
||||
}
|
||||
|
||||
func (d *dockerClientService) Host() (docker.Host, error) {
|
||||
func (d *dockerClientService) Host(ctx context.Context) (docker.Host, error) {
|
||||
return d.client.Host(), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,6 @@ func (c *containerService) StreamLogs(ctx context.Context, from time.Time, stdTy
|
||||
return c.clientService.StreamLogs(ctx, c.Container, from, stdTypes, events)
|
||||
}
|
||||
|
||||
func (c *containerService) Action(action docker.ContainerAction) error {
|
||||
return c.clientService.ContainerAction(c.Container, action)
|
||||
func (c *containerService) Action(ctx context.Context, action docker.ContainerAction) error {
|
||||
return c.clientService.ContainerAction(ctx, c.Container, action)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package docker_support
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/rs/zerolog/log"
|
||||
@@ -24,16 +25,18 @@ type ClientManager interface {
|
||||
List() []ClientService
|
||||
RetryAndList() ([]ClientService, []error)
|
||||
Subscribe(ctx context.Context, channel chan<- docker.Host)
|
||||
Hosts() []docker.Host
|
||||
Hosts(ctx context.Context) []docker.Host
|
||||
}
|
||||
|
||||
type MultiHostService struct {
|
||||
manager ClientManager
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func NewMultiHostService(manager ClientManager) *MultiHostService {
|
||||
func NewMultiHostService(manager ClientManager, timeout time.Duration) *MultiHostService {
|
||||
m := &MultiHostService{
|
||||
manager: manager,
|
||||
timeout: timeout,
|
||||
}
|
||||
|
||||
return m
|
||||
@@ -44,8 +47,9 @@ func (m *MultiHostService) FindContainer(host string, id string) (*containerServ
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("host %s not found", host)
|
||||
}
|
||||
|
||||
container, err := client.FindContainer(id)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
container, err := client.FindContainer(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -61,8 +65,10 @@ func (m *MultiHostService) ListContainersForHost(host string) ([]docker.Containe
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("host %s not found", host)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
|
||||
return client.ListContainers()
|
||||
return client.ListContainers(ctx)
|
||||
}
|
||||
|
||||
func (m *MultiHostService) ListAllContainers() ([]docker.Container, []error) {
|
||||
@@ -70,9 +76,11 @@ func (m *MultiHostService) ListAllContainers() ([]docker.Container, []error) {
|
||||
clients, errors := m.manager.RetryAndList()
|
||||
|
||||
for _, client := range clients {
|
||||
list, err := client.ListContainers()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
list, err := client.ListContainers(ctx)
|
||||
if err != nil {
|
||||
host, _ := client.Host()
|
||||
host, _ := client.Host(ctx)
|
||||
log.Debug().Err(err).Str("host", host.Name).Msg("error listing containers")
|
||||
host.Available = false
|
||||
errors = append(errors, &HostUnavailableError{Host: host, Err: err})
|
||||
@@ -131,7 +139,9 @@ func (m *MultiHostService) TotalClients() int {
|
||||
}
|
||||
|
||||
func (m *MultiHostService) Hosts() []docker.Host {
|
||||
return m.manager.Hosts()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
return m.manager.Hosts(ctx)
|
||||
}
|
||||
|
||||
func (m *MultiHostService) LocalHost() (docker.Host, error) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
@@ -20,12 +21,15 @@ type RetriableClientManager struct {
|
||||
certs tls.Certificate
|
||||
mu sync.RWMutex
|
||||
subscribers *xsync.MapOf[context.Context, chan<- docker.Host]
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func NewRetriableClientManager(agents []string, certs tls.Certificate, clients ...ClientService) *RetriableClientManager {
|
||||
func NewRetriableClientManager(agents []string, timeout time.Duration, certs tls.Certificate, clients ...ClientService) *RetriableClientManager {
|
||||
clientMap := make(map[string]ClientService)
|
||||
for _, client := range clients {
|
||||
host, err := client.Host()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
host, err := client.Host(ctx)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("host", host.Name).Msg("error fetching host info for client")
|
||||
continue
|
||||
@@ -47,7 +51,9 @@ func NewRetriableClientManager(agents []string, certs tls.Certificate, clients .
|
||||
continue
|
||||
}
|
||||
|
||||
host, err := agent.Host()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
host, err := agent.Host(ctx)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("endpoint", endpoint).Msg("error fetching host info for agent")
|
||||
failed = append(failed, endpoint)
|
||||
@@ -66,6 +72,7 @@ func NewRetriableClientManager(agents []string, certs tls.Certificate, clients .
|
||||
failedAgents: failed,
|
||||
certs: certs,
|
||||
subscribers: xsync.NewMapOf[context.Context, chan<- docker.Host](),
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,7 +99,9 @@ func (m *RetriableClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
continue
|
||||
}
|
||||
|
||||
host, err := agent.Host()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
host, err := agent.Host(ctx)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("endpoint", endpoint).Msg("error fetching host info for agent")
|
||||
errors = append(errors, err)
|
||||
@@ -146,12 +155,13 @@ func (m *RetriableClientManager) String() string {
|
||||
return fmt.Sprintf("RetriableClientManager{clients: %d, failedAgents: %d}", len(m.clients), len(m.failedAgents))
|
||||
}
|
||||
|
||||
func (m *RetriableClientManager) Hosts() []docker.Host {
|
||||
func (m *RetriableClientManager) Hosts(ctx context.Context) []docker.Host {
|
||||
clients := m.List()
|
||||
|
||||
hosts := lop.Map(clients, func(client ClientService, _ int) docker.Host {
|
||||
host, err := client.Host()
|
||||
host, err := client.Host(ctx)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("host", host.Name).Msg("error fetching host info for client")
|
||||
host.Available = false
|
||||
} else {
|
||||
host.Available = true
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/amir20/dozzle/internal/agent"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
@@ -25,6 +26,7 @@ type SwarmClientManager struct {
|
||||
localClient docker.Client
|
||||
localIPs []string
|
||||
name string
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func localIPs() []string {
|
||||
@@ -44,7 +46,7 @@ func localIPs() []string {
|
||||
return ips
|
||||
}
|
||||
|
||||
func NewSwarmClientManager(localClient docker.Client, certs tls.Certificate) *SwarmClientManager {
|
||||
func NewSwarmClientManager(localClient docker.Client, certs tls.Certificate, timeout time.Duration) *SwarmClientManager {
|
||||
clientMap := make(map[string]ClientService)
|
||||
localService := NewDockerClientService(localClient)
|
||||
clientMap[localClient.Host().ID] = localService
|
||||
@@ -54,7 +56,9 @@ func NewSwarmClientManager(localClient docker.Client, certs tls.Certificate) *Sw
|
||||
log.Fatal().Msg("HOSTNAME environment variable not set when looking for swarm service name")
|
||||
}
|
||||
|
||||
container, err := localClient.FindContainer(id)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
container, err := localClient.FindContainer(ctx, id)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("error finding own container when looking for swarm service name")
|
||||
}
|
||||
@@ -70,6 +74,7 @@ func NewSwarmClientManager(localClient docker.Client, certs tls.Certificate) *Sw
|
||||
subscribers: xsync.NewMapOf[context.Context, chan<- docker.Host](),
|
||||
localIPs: localIPs(),
|
||||
name: serviceName,
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +102,9 @@ func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
|
||||
clients := lo.Values(m.clients)
|
||||
endpoints := lo.KeyBy(clients, func(client ClientService) string {
|
||||
host, _ := client.Host()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
host, _ := client.Host(ctx)
|
||||
return host.Endpoint
|
||||
})
|
||||
|
||||
@@ -125,7 +132,9 @@ func (m *SwarmClientManager) RetryAndList() ([]ClientService, []error) {
|
||||
continue
|
||||
}
|
||||
|
||||
host, err := agent.Host()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
host, err := agent.Host(ctx)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Stringer("ip", ip).Msg("error getting host from agent client")
|
||||
errors = append(errors, err)
|
||||
@@ -183,12 +192,13 @@ func (m *SwarmClientManager) Find(id string) (ClientService, bool) {
|
||||
return client, ok
|
||||
}
|
||||
|
||||
func (m *SwarmClientManager) Hosts() []docker.Host {
|
||||
func (m *SwarmClientManager) Hosts(ctx context.Context) []docker.Host {
|
||||
clients := m.List()
|
||||
|
||||
return lop.Map(clients, func(client ClientService, _ int) docker.Host {
|
||||
host, err := client.Host()
|
||||
host, err := client.Host(ctx)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("id", host.ID).Msg("error getting host from client")
|
||||
host.Available = false
|
||||
} else {
|
||||
host.Available = true
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package web
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -26,7 +28,9 @@ func (h *handler) containerActions(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := containerService.Action(parsedAction); err != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
if err := containerService.Action(ctx, parsedAction); err != nil {
|
||||
log.Error().Err(err).Msg("error while trying to perform container action")
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
||||
@@ -17,15 +17,15 @@ func mockedClient() *MockedClient {
|
||||
mockedClient := new(MockedClient)
|
||||
container := docker.Container{ID: "123"}
|
||||
|
||||
mockedClient.On("FindContainer", "123").Return(container, nil)
|
||||
mockedClient.On("FindContainer", "456").Return(docker.Container{}, errors.New("container not found"))
|
||||
mockedClient.On("ContainerActions", docker.Start, container.ID).Return(nil)
|
||||
mockedClient.On("ContainerActions", docker.Stop, container.ID).Return(nil)
|
||||
mockedClient.On("ContainerActions", docker.Restart, container.ID).Return(nil)
|
||||
mockedClient.On("ContainerActions", docker.Start, mock.Anything).Return(errors.New("container not found"))
|
||||
mockedClient.On("ContainerActions", docker.ContainerAction("something-else"), container.ID).Return(errors.New("unknown action"))
|
||||
mockedClient.On("FindContainer", mock.Anything, "123").Return(container, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, "456").Return(docker.Container{}, errors.New("container not found"))
|
||||
mockedClient.On("ContainerActions", mock.Anything, docker.Start, container.ID).Return(nil)
|
||||
mockedClient.On("ContainerActions", mock.Anything, docker.Stop, container.ID).Return(nil)
|
||||
mockedClient.On("ContainerActions", mock.Anything, docker.Restart, container.ID).Return(nil)
|
||||
mockedClient.On("ContainerActions", mock.Anything, docker.Start, mock.Anything).Return(errors.New("container not found"))
|
||||
mockedClient.On("ContainerActions", mock.Anything, docker.ContainerAction("something-else"), container.ID).Return(errors.New("unknown action"))
|
||||
mockedClient.On("Host").Return(docker.Host{ID: "localhost"})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{container}, nil)
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{container}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
return mockedClient
|
||||
|
||||
@@ -25,7 +25,7 @@ func Test_handler_download_logs(t *testing.T) {
|
||||
|
||||
data := makeMessage("INFO Testing logs...", docker.STDOUT)
|
||||
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Tty: false}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id, Tty: false}, nil)
|
||||
mockedClient.On("ContainerLogsBetweenDates", mock.Anything, id, mock.Anything, mock.Anything, docker.STDOUT).Return(io.NopCloser(bytes.NewReader(data)), nil)
|
||||
mockedClient.On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
@@ -33,7 +33,7 @@ func Test_handler_download_logs(t *testing.T) {
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil).Run(func(args mock.Arguments) {
|
||||
time.Sleep(1 * time.Second)
|
||||
})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", State: "running"},
|
||||
}, nil)
|
||||
|
||||
|
||||
@@ -18,13 +18,12 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
events := make(chan docker.ContainerEvent)
|
||||
stats := make(chan docker.ContainerStat)
|
||||
availableHosts := make(chan docker.Host)
|
||||
|
||||
h.multiHostService.SubscribeEventsAndStats(ctx, events, stats)
|
||||
h.multiHostService.SubscribeAvailableHosts(ctx, availableHosts)
|
||||
h.multiHostService.SubscribeEventsAndStats(r.Context(), events, stats)
|
||||
h.multiHostService.SubscribeAvailableHosts(r.Context(), availableHosts)
|
||||
|
||||
allContainers, errors := h.multiHostService.ListAllContainers()
|
||||
|
||||
@@ -63,6 +62,7 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
case "start", "die", "destroy":
|
||||
if event.Name == "start" {
|
||||
log.Debug().Str("container", event.ActorID).Msg("container started")
|
||||
|
||||
if containers, err := h.multiHostService.ListContainersForHost(event.Host); err == nil {
|
||||
if err := sseWriter.Event("containers-changed", containers); err != nil {
|
||||
log.Error().Err(err).Msg("error writing containers to event stream")
|
||||
@@ -92,7 +92,7 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ func Test_handler_streamEvents_happy(t *testing.T) {
|
||||
|
||||
mockedClient := new(MockedClient)
|
||||
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{}, nil)
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil).Run(func(args mock.Arguments) {
|
||||
messages := args.Get(1).(chan<- docker.ContainerEvent)
|
||||
|
||||
@@ -42,7 +42,7 @@ func Test_handler_streamEvents_happy(t *testing.T) {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cancel()
|
||||
})
|
||||
mockedClient.On("FindContainer", "1234").Return(docker.Container{
|
||||
mockedClient.On("FindContainer", mock.Anything, "1234").Return(docker.Container{
|
||||
ID: "1234",
|
||||
Name: "test",
|
||||
Image: "test",
|
||||
@@ -54,8 +54,8 @@ func Test_handler_streamEvents_happy(t *testing.T) {
|
||||
})
|
||||
|
||||
// This is needed so that the server is initialized for store
|
||||
manager := docker_support.NewRetriableClientManager(nil, tls.Certificate{}, docker_support.NewDockerClientService(mockedClient))
|
||||
multiHostService := docker_support.NewMultiHostService(manager)
|
||||
manager := docker_support.NewRetriableClientManager(nil, 3*time.Second, tls.Certificate{}, docker_support.NewDockerClientService(mockedClient))
|
||||
multiHostService := docker_support.NewMultiHostService(manager, 3*time.Second)
|
||||
|
||||
server := CreateServer(multiHostService, nil, Config{Base: "/", Authorization: Authorization{Provider: NONE}})
|
||||
|
||||
|
||||
@@ -290,7 +290,9 @@ func streamLogsForContainers(w http.ResponseWriter, r *http.Request, multiHostCl
|
||||
events := make([]*docker.LogEvent, 0)
|
||||
stillRunning := false
|
||||
for _, container := range existingContainers {
|
||||
|
||||
containerService, err := multiHostClient.FindContainer(container.Host, container.ID)
|
||||
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error while finding container")
|
||||
return
|
||||
|
||||
@@ -38,7 +38,7 @@ func Test_handler_streamLogs_happy(t *testing.T) {
|
||||
|
||||
now := time.Now()
|
||||
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Tty: false, Host: "localhost", StartedAt: now}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id, Tty: false, Host: "localhost", StartedAt: now}, nil)
|
||||
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, now, docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
go func() {
|
||||
@@ -49,7 +49,7 @@ func Test_handler_streamLogs_happy(t *testing.T) {
|
||||
mockedClient.On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", Host: "localhost", State: "running"},
|
||||
}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil).Run(func(args mock.Arguments) {
|
||||
@@ -80,7 +80,7 @@ func Test_handler_streamLogs_happy_with_id(t *testing.T) {
|
||||
|
||||
started := time.Date(2020, time.May, 13, 18, 55, 37, 772853839, time.UTC)
|
||||
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Host: "localhost", StartedAt: started}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id, Host: "localhost", StartedAt: started}, nil)
|
||||
mockedClient.On("ContainerLogs", mock.Anything, mock.Anything, started, docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
go func() {
|
||||
@@ -92,7 +92,7 @@ func Test_handler_streamLogs_happy_with_id(t *testing.T) {
|
||||
ID: "localhost",
|
||||
})
|
||||
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", Host: "localhost", State: "running"},
|
||||
}, nil)
|
||||
|
||||
@@ -120,7 +120,7 @@ func Test_handler_streamLogs_happy_container_stopped(t *testing.T) {
|
||||
|
||||
started := time.Date(2020, time.May, 13, 18, 55, 37, 772853839, time.UTC)
|
||||
mockedClient := new(MockedClient)
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Host: "localhost", StartedAt: started}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id, Host: "localhost", StartedAt: started}, nil)
|
||||
mockedClient.On("ContainerLogs", mock.Anything, id, started, docker.STDALL).Return(io.NopCloser(strings.NewReader("")), io.EOF).
|
||||
Run(func(args mock.Arguments) {
|
||||
go func() {
|
||||
@@ -131,7 +131,7 @@ func Test_handler_streamLogs_happy_container_stopped(t *testing.T) {
|
||||
mockedClient.On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", Host: "localhost", State: "running"},
|
||||
}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil)
|
||||
@@ -143,38 +143,6 @@ func Test_handler_streamLogs_happy_container_stopped(t *testing.T) {
|
||||
mockedClient.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// func Test_handler_streamLogs_error_finding_container(t *testing.T) {
|
||||
// id := "123456"
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// req, err := http.NewRequestWithContext(ctx, "GET", "/api/hosts/localhost/containers/"+id+"/logs/stream", nil)
|
||||
// q := req.URL.Query()
|
||||
// q.Add("stdout", "true")
|
||||
// q.Add("stderr", "true")
|
||||
|
||||
// req.URL.RawQuery = q.Encode()
|
||||
// require.NoError(t, err, "NewRequest should not return an error.")
|
||||
|
||||
// mockedClient := new(MockedClient)
|
||||
// mockedClient.On("FindContainer", id).Return(docker.Container{}, errors.New("error finding container")).
|
||||
// Run(func(args mock.Arguments) {
|
||||
// go func() {
|
||||
// time.Sleep(50 * time.Millisecond)
|
||||
// cancel()
|
||||
// }()
|
||||
// })
|
||||
// mockedClient.On("Host").Return(docker.Host{
|
||||
// ID: "localhost",
|
||||
// })
|
||||
// mockedClient.On("ListContainers").Return([]docker.Container{}, nil)
|
||||
// mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil)
|
||||
|
||||
// handler := createDefaultHandler(mockedClient)
|
||||
// rr := httptest.NewRecorder()
|
||||
// handler.ServeHTTP(rr, req)
|
||||
// abide.AssertHTTPResponse(t, t.Name(), rr.Result())
|
||||
// mockedClient.AssertExpectations(t)
|
||||
// }
|
||||
|
||||
func Test_handler_streamLogs_error_reading(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
@@ -189,7 +157,7 @@ func Test_handler_streamLogs_error_reading(t *testing.T) {
|
||||
|
||||
started := time.Date(2020, time.May, 13, 18, 55, 37, 772853839, time.UTC)
|
||||
mockedClient := new(MockedClient)
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Host: "localhost", StartedAt: started}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id, Host: "localhost", StartedAt: started}, nil)
|
||||
mockedClient.On("ContainerLogs", mock.Anything, id, started, docker.STDALL).Return(io.NopCloser(strings.NewReader("")), errors.New("test error")).
|
||||
Run(func(args mock.Arguments) {
|
||||
go func() {
|
||||
@@ -200,7 +168,7 @@ func Test_handler_streamLogs_error_reading(t *testing.T) {
|
||||
mockedClient.On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", Host: "localhost", State: "running"},
|
||||
}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil)
|
||||
@@ -219,11 +187,11 @@ func Test_handler_streamLogs_error_std(t *testing.T) {
|
||||
require.NoError(t, err, "NewRequest should not return an error.")
|
||||
|
||||
mockedClient := new(MockedClient)
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id, Host: "localhost"}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id, Host: "localhost"}, nil)
|
||||
mockedClient.On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", Host: "localhost", State: "running"},
|
||||
}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil).
|
||||
@@ -260,11 +228,11 @@ func Test_handler_between_dates(t *testing.T) {
|
||||
data := append(first, second...)
|
||||
|
||||
mockedClient.On("ContainerLogsBetweenDates", mock.Anything, id, from, to, docker.STDALL).Return(io.NopCloser(bytes.NewReader(data)), nil)
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id}, nil)
|
||||
mockedClient.On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", Host: "localhost", State: "running"},
|
||||
}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil)
|
||||
@@ -305,11 +273,11 @@ func Test_handler_between_dates_with_fill(t *testing.T) {
|
||||
mockedClient.On("ContainerLogsBetweenDates", mock.Anything, id, time.Date(2017, time.December, 31, 14, 0, 0, 0, time.UTC), to, docker.STDALL).
|
||||
Return(io.NopCloser(bytes.NewReader(data)), nil).
|
||||
Once()
|
||||
mockedClient.On("FindContainer", id).Return(docker.Container{ID: id}, nil)
|
||||
mockedClient.On("FindContainer", mock.Anything, id).Return(docker.Container{ID: id}, nil)
|
||||
mockedClient.On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
})
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{
|
||||
mockedClient.On("ListContainers", mock.Anything).Return([]docker.Container{
|
||||
{ID: id, Name: "test", Host: "localhost", State: "running"},
|
||||
}, nil)
|
||||
mockedClient.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- docker.ContainerEvent")).Return(nil)
|
||||
|
||||
@@ -23,13 +23,13 @@ type MockedClient struct {
|
||||
docker.Client
|
||||
}
|
||||
|
||||
func (m *MockedClient) FindContainer(id string) (docker.Container, error) {
|
||||
args := m.Called(id)
|
||||
func (m *MockedClient) FindContainer(ctx context.Context, id string) (docker.Container, error) {
|
||||
args := m.Called(ctx, id)
|
||||
return args.Get(0).(docker.Container), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockedClient) ContainerActions(action docker.ContainerAction, containerID string) error {
|
||||
args := m.Called(action, containerID)
|
||||
func (m *MockedClient) ContainerActions(ctx context.Context, action docker.ContainerAction, containerID string) error {
|
||||
args := m.Called(ctx, action, containerID)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
@@ -38,8 +38,8 @@ func (m *MockedClient) ContainerEvents(ctx context.Context, events chan<- docker
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *MockedClient) ListContainers() ([]docker.Container, error) {
|
||||
args := m.Called()
|
||||
func (m *MockedClient) ListContainers(ctx context.Context) ([]docker.Container, error) {
|
||||
args := m.Called(ctx)
|
||||
return args.Get(0).([]docker.Container), args.Error(1)
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ func (m *MockedClient) SystemInfo() system.Info {
|
||||
func createHandler(client docker.Client, content fs.FS, config Config) *chi.Mux {
|
||||
if client == nil {
|
||||
client = new(MockedClient)
|
||||
client.(*MockedClient).On("ListContainers").Return([]docker.Container{}, nil)
|
||||
client.(*MockedClient).On("ListContainers", mock.Anything).Return([]docker.Container{}, nil)
|
||||
client.(*MockedClient).On("Host").Return(docker.Host{
|
||||
ID: "localhost",
|
||||
})
|
||||
@@ -86,8 +86,8 @@ func createHandler(client docker.Client, content fs.FS, config Config) *chi.Mux
|
||||
content = afero.NewIOFS(fs)
|
||||
}
|
||||
|
||||
manager := docker_support.NewRetriableClientManager(nil, tls.Certificate{}, docker_support.NewDockerClientService(client))
|
||||
multiHostService := docker_support.NewMultiHostService(manager)
|
||||
manager := docker_support.NewRetriableClientManager(nil, 3*time.Second, tls.Certificate{}, docker_support.NewDockerClientService(client))
|
||||
multiHostService := docker_support.NewMultiHostService(manager, 3*time.Second)
|
||||
return createRouter(&handler{
|
||||
multiHostService: multiHostService,
|
||||
content: content,
|
||||
|
||||
8
main.go
8
main.go
@@ -103,7 +103,9 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Could not read certificates")
|
||||
}
|
||||
if err := healthcheck.RPCRequest(agentAddress, certs); err != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), args.Timeout)
|
||||
defer cancel()
|
||||
if err := healthcheck.RPCRequest(ctx, agentAddress, certs); err != nil {
|
||||
log.Fatal().Err(err).Msg("Failed to make request")
|
||||
}
|
||||
}
|
||||
@@ -155,8 +157,8 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Could not read certificates")
|
||||
}
|
||||
manager := docker_support.NewSwarmClientManager(localClient, certs)
|
||||
multiHostService = docker_support.NewMultiHostService(manager)
|
||||
manager := docker_support.NewSwarmClientManager(localClient, certs, args.Timeout)
|
||||
multiHostService = docker_support.NewMultiHostService(manager, args.Timeout)
|
||||
log.Info().Msg("Starting in swarm mode")
|
||||
listener, err := net.Listen("tcp", ":7007")
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user