feat: add swarm event listener

This commit is contained in:
Alexis Couvreur
2022-11-06 04:53:32 +00:00
parent 8dc7198b54
commit a62f098d42
3 changed files with 101 additions and 74 deletions

View File

@@ -2,7 +2,9 @@ package providers
import (
"context"
"errors"
"fmt"
"io"
"strings"
"github.com/acouvreur/sablier/app/instance"
@@ -14,7 +16,7 @@ import (
)
type DockerSwarmProvider struct {
Client client.ServiceAPIClient
Client client.APIClient
desiredReplicas int
}
@@ -139,4 +141,31 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm.
}
func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
Filters: filters.NewArgs(
filters.Arg("scope", "swarm"),
filters.Arg("type", "service"),
),
})
go func() {
for {
select {
case msg := <-msgs:
// Send the container that has died to the channel
if msg.Actor.Attributes["replicas.new"] == "0" {
instance <- msg.Actor.Attributes["name"]
}
case err := <-errs:
if errors.Is(err, io.EOF) {
log.Debug("provider event stream closed")
close(instance)
return
}
case <-ctx.Done():
close(instance)
return
}
}
}()
}

View File

@@ -1,26 +1,24 @@
package providers
import (
"context"
"reflect"
"testing"
"github.com/acouvreur/sablier/app/instance"
"github.com/acouvreur/sablier/app/providers/mocks"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/swarm"
"github.com/stretchr/testify/mock"
)
func TestDockerSwarmProvider_Start(t *testing.T) {
type fields struct {
Client *mocks.ServiceAPIClientMock
}
type args struct {
name string
}
tests := []struct {
name string
fields fields
args args
want instance.State
serviceList []swarm.Service
@@ -30,9 +28,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
}{
{
name: "scale nginx service to 1 replica",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -53,9 +48,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "ambiguous service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -78,9 +70,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "exact match service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -103,9 +92,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "service match on suffix",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -127,9 +113,6 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
},
{
name: "nginx is not a replicated service",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -152,13 +135,14 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clientMock := mocks.NewDockerAPIClientMock()
provider := &DockerSwarmProvider{
Client: tt.fields.Client,
Client: clientMock,
desiredReplicas: 1,
}
tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
got, err := provider.Start(tt.args.name)
if (err != nil) != tt.wantErr {
@@ -173,15 +157,11 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
}
func TestDockerSwarmProvider_Stop(t *testing.T) {
type fields struct {
Client *mocks.ServiceAPIClientMock
}
type args struct {
name string
}
tests := []struct {
name string
fields fields
args args
want instance.State
serviceList []swarm.Service
@@ -191,9 +171,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
}{
{
name: "scale nginx service to 0 replica",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -214,9 +191,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "ambiguous service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -239,9 +213,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "exact match service name",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -264,9 +235,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "service match on suffix",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -288,9 +256,6 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
},
{
name: "nginx is not a replicated service",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -313,13 +278,14 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clientMock := mocks.NewDockerAPIClientMock()
provider := &DockerSwarmProvider{
Client: tt.fields.Client,
Client: clientMock,
desiredReplicas: 1,
}
tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
got, err := provider.Stop(tt.args.name)
if (err != nil) != tt.wantErr {
@@ -334,15 +300,11 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
}
func TestDockerSwarmProvider_GetState(t *testing.T) {
type fields struct {
Client *mocks.ServiceAPIClientMock
}
type args struct {
name string
}
tests := []struct {
name string
fields fields
args args
want instance.State
serviceList []swarm.Service
@@ -350,9 +312,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
}{
{
name: "nginx service is ready",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -369,9 +328,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
},
{
name: "nginx service is not ready",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -388,9 +344,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
},
{
name: "nginx service is not ready",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -407,9 +360,6 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
},
{
name: "nginx is not a replicated service",
fields: fields{
Client: mocks.NewServiceAPIClientMock(),
},
args: args{
name: "nginx",
},
@@ -428,12 +378,13 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clientMock := mocks.NewDockerAPIClientMock()
provider := &DockerSwarmProvider{
Client: tt.fields.Client,
Client: clientMock,
desiredReplicas: 1,
}
tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
got, err := provider.GetState(tt.args.name)
if (err != nil) != tt.wantErr {
@@ -446,3 +397,43 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
})
}
}
func TestDockerSwarmProvider_NotifyInsanceStopped(t *testing.T) {
tests := []struct {
name string
want []string
events []events.Message
errors []error
}{
{
name: "service nginx is scaled to 0",
want: []string{"nginx"},
events: []events.Message{
mocks.SeviceScaledEvent("nginx", "1", "0"),
},
errors: []error{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := &DockerSwarmProvider{
Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors),
desiredReplicas: 1,
}
instanceC := make(chan string)
provider.NotifyInsanceStopped(context.Background(), instanceC)
var got []string
for i := range instanceC {
got = append(got, i)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -190,21 +190,12 @@ func ContainerStoppedEvent(name string) events.Message {
}
}
type ServiceAPIClientMock struct {
client.ServiceAPIClient
mock.Mock
}
func NewServiceAPIClientMock() *ServiceAPIClientMock {
return &ServiceAPIClientMock{}
}
func (client *ServiceAPIClientMock) ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error) {
func (client *DockerAPIClientMock) ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error) {
args := client.Mock.Called(ctx, serviceID, version, service, options)
return args.Get(0).(types.ServiceUpdateResponse), args.Error(1)
}
func (client *ServiceAPIClientMock) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
func (client *DockerAPIClientMock) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
args := client.Mock.Called(ctx, options)
return args.Get(0).([]swarm.Service), args.Error(1)
}
@@ -266,6 +257,22 @@ func ServiceGlobal(name string) swarm.Service {
}
}
func SeviceScaledEvent(name string, oldReplicas string, newReplicas string) events.Message {
return events.Message{
Scope: "swarm",
Action: "update",
Type: "service",
Actor: events.Actor{
ID: "randomid",
Attributes: map[string]string{
"name": name,
"replicas.new": newReplicas,
"replicas.old": oldReplicas,
},
},
}
}
type KubernetesAPIClientMock struct {
mockv1 AppsV1InterfaceMock