From 1ca1934b1c57f5b45b269d6045dd1dcbe2d608c2 Mon Sep 17 00:00:00 2001 From: Alexis Couvreur Date: Fri, 4 Nov 2022 16:04:44 +0000 Subject: [PATCH] feat(docker): listens for container stopped event When a container is shutdown manually while it's registered as `ready` in Sablier it will remove it from the store. Meaning externally events are now handled for docker. --- app/providers/docker_classic.go | 34 +++++++++++- app/providers/docker_classic_test.go | 80 ++++++++++++++++++++------- app/providers/docker_swarm.go | 3 + app/providers/kubernetes.go | 3 + app/providers/mocks/client_mock.go | 56 +++++++++++++++---- app/providers/provider.go | 3 + app/sessions/mocks/provider_mock.go | 68 +++++++++++++++++++++++ app/sessions/sessions_manager.go | 25 +++++++-- app/sessions/sessions_manager_test.go | 39 +++++++++++++ 9 files changed, 277 insertions(+), 34 deletions(-) create mode 100644 app/sessions/mocks/provider_mock.go diff --git a/app/providers/docker_classic.go b/app/providers/docker_classic.go index ef708be..5d3ff37 100644 --- a/app/providers/docker_classic.go +++ b/app/providers/docker_classic.go @@ -2,16 +2,19 @@ package providers import ( "context" + "errors" "fmt" + "io" "github.com/acouvreur/sablier/app/instance" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" log "github.com/sirupsen/logrus" ) type DockerClassicProvider struct { - Client client.ContainerAPIClient + Client client.APIClient desiredReplicas int } @@ -103,3 +106,32 @@ func (provider *DockerClassicProvider) GetState(name string) (instance.State, er return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), provider.desiredReplicas) } } + +func (provider *DockerClassicProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) { + msgs, errs := provider.Client.Events(ctx, types.EventsOptions{ + Filters: filters.NewArgs( + filters.Arg("scope", "local"), + filters.Arg("type", "container"), + filters.Arg("event", "die"), + ), + }) + + go func() { + for { + select { + case msg := <-msgs: + // Send the container that has died to the channel + instance <- msg.From + case err := <-errs: + if errors.Is(err, io.EOF) { + log.Debug("provider event stream closed") + close(instance) + return + } + case <-ctx.Done(): + close(instance) + return + } + } + }() +} diff --git a/app/providers/docker_classic_test.go b/app/providers/docker_classic_test.go index 39aafa9..b6ea558 100644 --- a/app/providers/docker_classic_test.go +++ b/app/providers/docker_classic_test.go @@ -1,6 +1,7 @@ package providers import ( + "context" "fmt" "reflect" "testing" @@ -8,12 +9,13 @@ import ( "github.com/acouvreur/sablier/app/instance" "github.com/acouvreur/sablier/app/providers/mocks" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/stretchr/testify/mock" ) func TestDockerClassicProvider_GetState(t *testing.T) { type fields struct { - Client *mocks.ContainerAPIClientMock + Client *mocks.DockerAPIClientMock } type args struct { name string @@ -30,7 +32,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx created container state", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -47,7 +49,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx running container state without healthcheck", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -64,7 +66,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx running container state with \"starting\" health", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -81,7 +83,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx running container state with \"unhealthy\" health", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -99,7 +101,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx running container state with \"healthy\" health", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -116,7 +118,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx paused container state", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -133,7 +135,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx restarting container state", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -150,7 +152,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx removing container state", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -167,7 +169,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx exited container state with status code 0", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -184,7 +186,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx exited container state with status code 137", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -202,7 +204,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "nginx dead container state", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -220,7 +222,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { { name: "container inspect has an error", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -260,7 +262,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) { func TestDockerClassicProvider_Stop(t *testing.T) { type fields struct { - Client *mocks.ContainerAPIClientMock + Client *mocks.DockerAPIClientMock } type args struct { name string @@ -276,7 +278,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) { { name: "container stop has an error", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -294,7 +296,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) { { name: "container stop as expected", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -332,7 +334,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) { func TestDockerClassicProvider_Start(t *testing.T) { type fields struct { - Client *mocks.ContainerAPIClientMock + Client *mocks.DockerAPIClientMock } type args struct { name string @@ -348,7 +350,7 @@ func TestDockerClassicProvider_Start(t *testing.T) { { name: "container start has an error", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -366,7 +368,7 @@ func TestDockerClassicProvider_Start(t *testing.T) { { name: "container start as expected", fields: fields{ - Client: mocks.NewContainerAPIClientMock(), + Client: mocks.NewDockerAPIClientMock(), }, args: args{ name: "nginx", @@ -401,3 +403,43 @@ func TestDockerClassicProvider_Start(t *testing.T) { }) } } + +func TestDockerClassicProvider_NotifyInsanceStopped(t *testing.T) { + tests := []struct { + name string + want []string + events []events.Message + errors []error + }{ + { + name: "container nginx is stopped", + want: []string{"nginx"}, + events: []events.Message{ + mocks.ContainerStoppedEvent("nginx"), + }, + errors: []error{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider := &DockerClassicProvider{ + Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors), + desiredReplicas: 1, + } + + instanceC := make(chan string) + + provider.NotifyInsanceStopped(context.Background(), instanceC) + + var got []string + + for i := range instanceC { + got = append(got, i) + } + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/app/providers/docker_swarm.go b/app/providers/docker_swarm.go index c1135f0..2d53e42 100644 --- a/app/providers/docker_swarm.go +++ b/app/providers/docker_swarm.go @@ -137,3 +137,6 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm. return fmt.Sprintf("%s (%s)", name, service.Spec.Name) } + +func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) { +} diff --git a/app/providers/kubernetes.go b/app/providers/kubernetes.go index 3006ed7..05add0e 100644 --- a/app/providers/kubernetes.go +++ b/app/providers/kubernetes.go @@ -165,3 +165,6 @@ func (provider *KubernetesProvider) getStatefulsetState(config *Config) (instanc return instance.NotReadyInstanceState(config.OriginalName, int(ss.Status.ReadyReplicas), int(config.Replicas)) } + +func (provider *KubernetesProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) { +} diff --git a/app/providers/mocks/client_mock.go b/app/providers/mocks/client_mock.go index bd25ec7..7f0fddc 100644 --- a/app/providers/mocks/client_mock.go +++ b/app/providers/mocks/client_mock.go @@ -2,9 +2,11 @@ package mocks import ( "context" + "io" "time" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/client" "github.com/stretchr/testify/mock" @@ -15,30 +17,55 @@ import ( v1 "k8s.io/client-go/kubernetes/typed/apps/v1" ) -type ContainerAPIClientMock struct { - client.ContainerAPIClient +type DockerAPIClientMock struct { + // Will be sent through events + messages []events.Message + errors []error + + client.APIClient mock.Mock } -func NewContainerAPIClientMock() *ContainerAPIClientMock { - return &ContainerAPIClientMock{} +func NewDockerAPIClientMock() *DockerAPIClientMock { + return &DockerAPIClientMock{} } -func (client *ContainerAPIClientMock) ContainerStart(ctx context.Context, container string, options types.ContainerStartOptions) error { +func NewDockerAPIClientMockWithEvents(messages []events.Message, errors []error) *DockerAPIClientMock { + return &DockerAPIClientMock{ + messages: messages, + errors: errors, + } +} + +func (client *DockerAPIClientMock) ContainerStart(ctx context.Context, container string, options types.ContainerStartOptions) error { args := client.Mock.Called(ctx, container, options) return args.Error(0) } -func (client *ContainerAPIClientMock) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error { +func (client *DockerAPIClientMock) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error { args := client.Mock.Called(ctx, container, timeout) return args.Error(0) } -func (client *ContainerAPIClientMock) ContainerInspect(ctx context.Context, container string) (types.ContainerJSON, error) { +func (client *DockerAPIClientMock) ContainerInspect(ctx context.Context, container string) (types.ContainerJSON, error) { args := client.Mock.Called(ctx, container) return args.Get(0).(types.ContainerJSON), args.Error(1) } +func (client *DockerAPIClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + // client.Mock.Called(ctx, options) + evnts := make(chan events.Message) + errors := make(chan error) + go func() { + defer close(evnts) + for i := 0; i < len(client.messages); i++ { + evnts <- client.messages[i] + } + errors <- io.EOF + }() + return evnts, errors +} + func CreatedContainerSpec(name string) types.ContainerJSON { return types.ContainerJSON{ ContainerJSONBase: &types.ContainerJSONBase{ @@ -154,6 +181,15 @@ func DeadContainerSpec(name string) types.ContainerJSON { } } +func ContainerStoppedEvent(name string) events.Message { + return events.Message{ + From: name, + Scope: "local", + Action: "stop", + Type: "container", + } +} + type ServiceAPIClientMock struct { client.ServiceAPIClient mock.Mock @@ -327,7 +363,7 @@ func V1Scale(replicas int) *autoscalingv1.Scale { func V1Deployment(replicas int, readyReplicas int) *appsv1.Deployment { return &appsv1.Deployment{ Spec: appsv1.DeploymentSpec{ - Replicas: make(int32(replicas)), + Replicas: makeP(int32(replicas)), }, Status: appsv1.DeploymentStatus{ ReadyReplicas: int32(readyReplicas), @@ -338,7 +374,7 @@ func V1Deployment(replicas int, readyReplicas int) *appsv1.Deployment { func V1StatefulSet(replicas int, readyReplicas int) *appsv1.StatefulSet { return &appsv1.StatefulSet{ Spec: appsv1.StatefulSetSpec{ - Replicas: make(int32(replicas)), + Replicas: makeP(int32(replicas)), }, Status: appsv1.StatefulSetStatus{ ReadyReplicas: int32(readyReplicas), @@ -346,6 +382,6 @@ func V1StatefulSet(replicas int, readyReplicas int) *appsv1.StatefulSet { } } -func make(val int32) *int32 { +func makeP(val int32) *int32 { return &val } diff --git a/app/providers/provider.go b/app/providers/provider.go index 07e63e2..bb7148a 100644 --- a/app/providers/provider.go +++ b/app/providers/provider.go @@ -1,6 +1,7 @@ package providers import ( + "context" "fmt" "github.com/acouvreur/sablier/app/instance" @@ -11,6 +12,8 @@ type Provider interface { Start(name string) (instance.State, error) Stop(name string) (instance.State, error) GetState(name string) (instance.State, error) + + NotifyInsanceStopped(ctx context.Context, instance chan string) } func NewProvider(config config.Provider) (Provider, error) { diff --git a/app/sessions/mocks/provider_mock.go b/app/sessions/mocks/provider_mock.go new file mode 100644 index 0000000..2cf3e9e --- /dev/null +++ b/app/sessions/mocks/provider_mock.go @@ -0,0 +1,68 @@ +package mocks + +import ( + "context" + "sync" + + "github.com/acouvreur/sablier/app/instance" + "github.com/acouvreur/sablier/app/providers" + "github.com/acouvreur/sablier/pkg/tinykv" + "github.com/stretchr/testify/mock" +) + +type ProviderMock struct { + stoppedInstances []string + + wg sync.WaitGroup + + providers.Provider + mock.Mock +} + +func NewProviderMock(stoppedInstances []string) *ProviderMock { + return &ProviderMock{ + stoppedInstances: stoppedInstances, + } +} + +func (provider *ProviderMock) NotifyInsanceStopped(ctx context.Context, instance chan string) { + go func() { + defer close(instance) + for i := 0; i < len(provider.stoppedInstances); i++ { + instance <- provider.stoppedInstances[i] + } + provider.wg.Done() + }() +} + +func (provider *ProviderMock) Add(count int) { + provider.wg.Add(count) +} + +func (provider *ProviderMock) Wait() { + provider.wg.Wait() +} + +type KVMock[T any] struct { + wg sync.WaitGroup + + tinykv.KV[T] + mock.Mock +} + +func NewKVMock() *KVMock[instance.State] { + return &KVMock[instance.State]{} +} + +func (kv *KVMock[T]) Delete(k string) { + kv.Mock.Called(k) + kv.wg.Done() +} + +func (kv *KVMock[T]) Add(count int) { + kv.wg.Add(count) +} + +func (kv *KVMock[T]) Wait() { + kv.wg.Wait() +} diff --git a/app/sessions/sessions_manager.go b/app/sessions/sessions_manager.go index eac4a62..77db733 100644 --- a/app/sessions/sessions_manager.go +++ b/app/sessions/sessions_manager.go @@ -1,6 +1,7 @@ package sessions import ( + "context" "encoding/json" "fmt" "io" @@ -22,14 +23,30 @@ type Manager interface { } type SessionsManager struct { - store tinykv.KV[instance.State] - provider providers.Provider + store tinykv.KV[instance.State] + provider providers.Provider + insanceStopped chan string } func NewSessionsManager(store tinykv.KV[instance.State], provider providers.Provider) Manager { + + instanceStopped := make(chan string) + + go func() { + for instance := range instanceStopped { + // Will delete from the store containers that have been stop either by external sources + // or by the internal expiration loop, if the deleted entry does not exist, it doesn't matter + log.Debugf("received event instance %s is stopped, removing from store", instance) + store.Delete(instance) + } + }() + + provider.NotifyInsanceStopped(context.Background(), instanceStopped) + return &SessionsManager{ - store: store, - provider: provider, + store: store, + provider: provider, + insanceStopped: instanceStopped, } } diff --git a/app/sessions/sessions_manager_test.go b/app/sessions/sessions_manager_test.go index f302073..b884f02 100644 --- a/app/sessions/sessions_manager_test.go +++ b/app/sessions/sessions_manager_test.go @@ -5,6 +5,8 @@ import ( "testing" "github.com/acouvreur/sablier/app/instance" + "github.com/acouvreur/sablier/app/sessions/mocks" + "github.com/stretchr/testify/mock" ) func TestSessionState_IsReady(t *testing.T) { @@ -79,3 +81,40 @@ func createMap(instances []*instance.State) (store *sync.Map) { return } + +func TestNewSessionsManagerEvents(t *testing.T) { + tests := []struct { + name string + stoppedInstances []string + }{ + { + name: "when nginx is stopped it is removed from the store", + stoppedInstances: []string{"nginx"}, + }, + { + name: "when nginx, apache and whoami is stopped it is removed from the store", + stoppedInstances: []string{"nginx", "apache", "whoami"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider := mocks.NewProviderMock(tt.stoppedInstances) + provider.Add(1) + + kv := mocks.NewKVMock() + kv.Add(len(tt.stoppedInstances)) + kv.Mock.On("Delete", mock.AnythingOfType("string")).Return() + + NewSessionsManager(kv, provider) + + // The provider watches notifications from a Goroutine, must wait + provider.Wait() + // The key is deleted inside a Goroutine by the session manager, must wait + kv.Wait() + + for _, instance := range tt.stoppedInstances { + kv.AssertCalled(t, "Delete", instance) + } + }) + } +}