From a62f098d42a3860bfc841e6e008a3eba3da1362e Mon Sep 17 00:00:00 2001 From: Alexis Couvreur Date: Sun, 6 Nov 2022 04:53:32 +0000 Subject: [PATCH] feat: add `swarm` event listener --- app/providers/docker_swarm.go | 31 +++++++- app/providers/docker_swarm_test.go | 115 +++++++++++++---------------- app/providers/mocks/client_mock.go | 29 +++++--- 3 files changed, 101 insertions(+), 74 deletions(-) diff --git a/app/providers/docker_swarm.go b/app/providers/docker_swarm.go index 2d53e42..02284bc 100644 --- a/app/providers/docker_swarm.go +++ b/app/providers/docker_swarm.go @@ -2,7 +2,9 @@ package providers import ( "context" + "errors" "fmt" + "io" "strings" "github.com/acouvreur/sablier/app/instance" @@ -14,7 +16,7 @@ import ( ) type DockerSwarmProvider struct { - Client client.ServiceAPIClient + Client client.APIClient desiredReplicas int } @@ -139,4 +141,31 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm. } func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) { + msgs, errs := provider.Client.Events(ctx, types.EventsOptions{ + Filters: filters.NewArgs( + filters.Arg("scope", "swarm"), + filters.Arg("type", "service"), + ), + }) + + go func() { + for { + select { + case msg := <-msgs: + // Send the container that has died to the channel + if msg.Actor.Attributes["replicas.new"] == "0" { + instance <- msg.Actor.Attributes["name"] + } + case err := <-errs: + if errors.Is(err, io.EOF) { + log.Debug("provider event stream closed") + close(instance) + return + } + case <-ctx.Done(): + close(instance) + return + } + } + }() } diff --git a/app/providers/docker_swarm_test.go b/app/providers/docker_swarm_test.go index 628d0f4..7b4f698 100644 --- a/app/providers/docker_swarm_test.go +++ b/app/providers/docker_swarm_test.go @@ -1,26 +1,24 @@ package providers import ( + "context" "reflect" "testing" "github.com/acouvreur/sablier/app/instance" "github.com/acouvreur/sablier/app/providers/mocks" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/swarm" "github.com/stretchr/testify/mock" ) func TestDockerSwarmProvider_Start(t *testing.T) { - type fields struct { - Client *mocks.ServiceAPIClientMock - } type args struct { name string } tests := []struct { name string - fields fields args args want instance.State serviceList []swarm.Service @@ -30,9 +28,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) { }{ { name: "scale nginx service to 1 replica", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -53,9 +48,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) { }, { name: "ambiguous service name", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -78,9 +70,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) { }, { name: "exact match service name", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -103,9 +92,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) { }, { name: "service match on suffix", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -127,9 +113,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) { }, { name: "nginx is not a replicated service", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -152,13 +135,14 @@ func TestDockerSwarmProvider_Start(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + clientMock := mocks.NewDockerAPIClientMock() provider := &DockerSwarmProvider{ - Client: tt.fields.Client, + Client: clientMock, desiredReplicas: 1, } - tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil) - tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil) + clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil) + clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil) got, err := provider.Start(tt.args.name) if (err != nil) != tt.wantErr { @@ -173,15 +157,11 @@ func TestDockerSwarmProvider_Start(t *testing.T) { } func TestDockerSwarmProvider_Stop(t *testing.T) { - type fields struct { - Client *mocks.ServiceAPIClientMock - } type args struct { name string } tests := []struct { name string - fields fields args args want instance.State serviceList []swarm.Service @@ -191,9 +171,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) { }{ { name: "scale nginx service to 0 replica", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -214,9 +191,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) { }, { name: "ambiguous service name", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -239,9 +213,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) { }, { name: "exact match service name", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -264,9 +235,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) { }, { name: "service match on suffix", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -288,9 +256,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) { }, { name: "nginx is not a replicated service", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -313,13 +278,14 @@ func TestDockerSwarmProvider_Stop(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + clientMock := mocks.NewDockerAPIClientMock() provider := &DockerSwarmProvider{ - Client: tt.fields.Client, + Client: clientMock, desiredReplicas: 1, } - tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil) - tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil) + clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil) + clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil) got, err := provider.Stop(tt.args.name) if (err != nil) != tt.wantErr { @@ -334,15 +300,11 @@ func TestDockerSwarmProvider_Stop(t *testing.T) { } func TestDockerSwarmProvider_GetState(t *testing.T) { - type fields struct { - Client *mocks.ServiceAPIClientMock - } type args struct { name string } tests := []struct { name string - fields fields args args want instance.State serviceList []swarm.Service @@ -350,9 +312,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) { }{ { name: "nginx service is ready", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -369,9 +328,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) { }, { name: "nginx service is not ready", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -388,9 +344,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) { }, { name: "nginx service is not ready", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -407,9 +360,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) { }, { name: "nginx is not a replicated service", - fields: fields{ - Client: mocks.NewServiceAPIClientMock(), - }, args: args{ name: "nginx", }, @@ -428,12 +378,13 @@ func TestDockerSwarmProvider_GetState(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + clientMock := mocks.NewDockerAPIClientMock() provider := &DockerSwarmProvider{ - Client: tt.fields.Client, + Client: clientMock, desiredReplicas: 1, } - tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil) + clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil) got, err := provider.GetState(tt.args.name) if (err != nil) != tt.wantErr { @@ -446,3 +397,43 @@ func TestDockerSwarmProvider_GetState(t *testing.T) { }) } } + +func TestDockerSwarmProvider_NotifyInsanceStopped(t *testing.T) { + tests := []struct { + name string + want []string + events []events.Message + errors []error + }{ + { + name: "service nginx is scaled to 0", + want: []string{"nginx"}, + events: []events.Message{ + mocks.SeviceScaledEvent("nginx", "1", "0"), + }, + errors: []error{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider := &DockerSwarmProvider{ + Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors), + desiredReplicas: 1, + } + + instanceC := make(chan string) + + provider.NotifyInsanceStopped(context.Background(), instanceC) + + var got []string + + for i := range instanceC { + got = append(got, i) + } + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/app/providers/mocks/client_mock.go b/app/providers/mocks/client_mock.go index 7f0fddc..0126cd0 100644 --- a/app/providers/mocks/client_mock.go +++ b/app/providers/mocks/client_mock.go @@ -190,21 +190,12 @@ func ContainerStoppedEvent(name string) events.Message { } } -type ServiceAPIClientMock struct { - client.ServiceAPIClient - mock.Mock -} - -func NewServiceAPIClientMock() *ServiceAPIClientMock { - return &ServiceAPIClientMock{} -} - -func (client *ServiceAPIClientMock) ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error) { +func (client *DockerAPIClientMock) ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error) { args := client.Mock.Called(ctx, serviceID, version, service, options) return args.Get(0).(types.ServiceUpdateResponse), args.Error(1) } -func (client *ServiceAPIClientMock) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) { +func (client *DockerAPIClientMock) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) { args := client.Mock.Called(ctx, options) return args.Get(0).([]swarm.Service), args.Error(1) } @@ -266,6 +257,22 @@ func ServiceGlobal(name string) swarm.Service { } } +func SeviceScaledEvent(name string, oldReplicas string, newReplicas string) events.Message { + return events.Message{ + Scope: "swarm", + Action: "update", + Type: "service", + Actor: events.Actor{ + ID: "randomid", + Attributes: map[string]string{ + "name": name, + "replicas.new": newReplicas, + "replicas.old": oldReplicas, + }, + }, + } +} + type KubernetesAPIClientMock struct { mockv1 AppsV1InterfaceMock