feat(docker): listens for container stopped event

When a container is shutdown manually while it's registered as `ready` in Sablier it will remove it from the store.
Meaning externally events are now handled for docker.
This commit is contained in:
Alexis Couvreur
2022-11-04 16:04:44 +00:00
parent eb83d39652
commit 1ca1934b1c
9 changed files with 277 additions and 34 deletions

View File

@@ -2,16 +2,19 @@ package providers
import (
"context"
"errors"
"fmt"
"io"
"github.com/acouvreur/sablier/app/instance"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"
)
type DockerClassicProvider struct {
Client client.ContainerAPIClient
Client client.APIClient
desiredReplicas int
}
@@ -103,3 +106,32 @@ func (provider *DockerClassicProvider) GetState(name string) (instance.State, er
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), provider.desiredReplicas)
}
}
func (provider *DockerClassicProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
Filters: filters.NewArgs(
filters.Arg("scope", "local"),
filters.Arg("type", "container"),
filters.Arg("event", "die"),
),
})
go func() {
for {
select {
case msg := <-msgs:
// Send the container that has died to the channel
instance <- msg.From
case err := <-errs:
if errors.Is(err, io.EOF) {
log.Debug("provider event stream closed")
close(instance)
return
}
case <-ctx.Done():
close(instance)
return
}
}
}()
}

View File

@@ -1,6 +1,7 @@
package providers
import (
"context"
"fmt"
"reflect"
"testing"
@@ -8,12 +9,13 @@ import (
"github.com/acouvreur/sablier/app/instance"
"github.com/acouvreur/sablier/app/providers/mocks"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/stretchr/testify/mock"
)
func TestDockerClassicProvider_GetState(t *testing.T) {
type fields struct {
Client *mocks.ContainerAPIClientMock
Client *mocks.DockerAPIClientMock
}
type args struct {
name string
@@ -30,7 +32,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx created container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -47,7 +49,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state without healthcheck",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -64,7 +66,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state with \"starting\" health",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -81,7 +83,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state with \"unhealthy\" health",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -99,7 +101,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx running container state with \"healthy\" health",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -116,7 +118,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx paused container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -133,7 +135,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx restarting container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -150,7 +152,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx removing container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -167,7 +169,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx exited container state with status code 0",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -184,7 +186,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx exited container state with status code 137",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -202,7 +204,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "nginx dead container state",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -220,7 +222,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
{
name: "container inspect has an error",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -260,7 +262,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
func TestDockerClassicProvider_Stop(t *testing.T) {
type fields struct {
Client *mocks.ContainerAPIClientMock
Client *mocks.DockerAPIClientMock
}
type args struct {
name string
@@ -276,7 +278,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) {
{
name: "container stop has an error",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -294,7 +296,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) {
{
name: "container stop as expected",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -332,7 +334,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) {
func TestDockerClassicProvider_Start(t *testing.T) {
type fields struct {
Client *mocks.ContainerAPIClientMock
Client *mocks.DockerAPIClientMock
}
type args struct {
name string
@@ -348,7 +350,7 @@ func TestDockerClassicProvider_Start(t *testing.T) {
{
name: "container start has an error",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -366,7 +368,7 @@ func TestDockerClassicProvider_Start(t *testing.T) {
{
name: "container start as expected",
fields: fields{
Client: mocks.NewContainerAPIClientMock(),
Client: mocks.NewDockerAPIClientMock(),
},
args: args{
name: "nginx",
@@ -401,3 +403,43 @@ func TestDockerClassicProvider_Start(t *testing.T) {
})
}
}
func TestDockerClassicProvider_NotifyInsanceStopped(t *testing.T) {
tests := []struct {
name string
want []string
events []events.Message
errors []error
}{
{
name: "container nginx is stopped",
want: []string{"nginx"},
events: []events.Message{
mocks.ContainerStoppedEvent("nginx"),
},
errors: []error{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := &DockerClassicProvider{
Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors),
desiredReplicas: 1,
}
instanceC := make(chan string)
provider.NotifyInsanceStopped(context.Background(), instanceC)
var got []string
for i := range instanceC {
got = append(got, i)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -137,3 +137,6 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm.
return fmt.Sprintf("%s (%s)", name, service.Spec.Name)
}
func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
}

View File

@@ -165,3 +165,6 @@ func (provider *KubernetesProvider) getStatefulsetState(config *Config) (instanc
return instance.NotReadyInstanceState(config.OriginalName, int(ss.Status.ReadyReplicas), int(config.Replicas))
}
func (provider *KubernetesProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
}

View File

@@ -2,9 +2,11 @@ package mocks
import (
"context"
"io"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/stretchr/testify/mock"
@@ -15,30 +17,55 @@ import (
v1 "k8s.io/client-go/kubernetes/typed/apps/v1"
)
type ContainerAPIClientMock struct {
client.ContainerAPIClient
type DockerAPIClientMock struct {
// Will be sent through events
messages []events.Message
errors []error
client.APIClient
mock.Mock
}
func NewContainerAPIClientMock() *ContainerAPIClientMock {
return &ContainerAPIClientMock{}
func NewDockerAPIClientMock() *DockerAPIClientMock {
return &DockerAPIClientMock{}
}
func (client *ContainerAPIClientMock) ContainerStart(ctx context.Context, container string, options types.ContainerStartOptions) error {
func NewDockerAPIClientMockWithEvents(messages []events.Message, errors []error) *DockerAPIClientMock {
return &DockerAPIClientMock{
messages: messages,
errors: errors,
}
}
func (client *DockerAPIClientMock) ContainerStart(ctx context.Context, container string, options types.ContainerStartOptions) error {
args := client.Mock.Called(ctx, container, options)
return args.Error(0)
}
func (client *ContainerAPIClientMock) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
func (client *DockerAPIClientMock) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
args := client.Mock.Called(ctx, container, timeout)
return args.Error(0)
}
func (client *ContainerAPIClientMock) ContainerInspect(ctx context.Context, container string) (types.ContainerJSON, error) {
func (client *DockerAPIClientMock) ContainerInspect(ctx context.Context, container string) (types.ContainerJSON, error) {
args := client.Mock.Called(ctx, container)
return args.Get(0).(types.ContainerJSON), args.Error(1)
}
func (client *DockerAPIClientMock) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
// client.Mock.Called(ctx, options)
evnts := make(chan events.Message)
errors := make(chan error)
go func() {
defer close(evnts)
for i := 0; i < len(client.messages); i++ {
evnts <- client.messages[i]
}
errors <- io.EOF
}()
return evnts, errors
}
func CreatedContainerSpec(name string) types.ContainerJSON {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
@@ -154,6 +181,15 @@ func DeadContainerSpec(name string) types.ContainerJSON {
}
}
func ContainerStoppedEvent(name string) events.Message {
return events.Message{
From: name,
Scope: "local",
Action: "stop",
Type: "container",
}
}
type ServiceAPIClientMock struct {
client.ServiceAPIClient
mock.Mock
@@ -327,7 +363,7 @@ func V1Scale(replicas int) *autoscalingv1.Scale {
func V1Deployment(replicas int, readyReplicas int) *appsv1.Deployment {
return &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Replicas: make(int32(replicas)),
Replicas: makeP(int32(replicas)),
},
Status: appsv1.DeploymentStatus{
ReadyReplicas: int32(readyReplicas),
@@ -338,7 +374,7 @@ func V1Deployment(replicas int, readyReplicas int) *appsv1.Deployment {
func V1StatefulSet(replicas int, readyReplicas int) *appsv1.StatefulSet {
return &appsv1.StatefulSet{
Spec: appsv1.StatefulSetSpec{
Replicas: make(int32(replicas)),
Replicas: makeP(int32(replicas)),
},
Status: appsv1.StatefulSetStatus{
ReadyReplicas: int32(readyReplicas),
@@ -346,6 +382,6 @@ func V1StatefulSet(replicas int, readyReplicas int) *appsv1.StatefulSet {
}
}
func make(val int32) *int32 {
func makeP(val int32) *int32 {
return &val
}

View File

@@ -1,6 +1,7 @@
package providers
import (
"context"
"fmt"
"github.com/acouvreur/sablier/app/instance"
@@ -11,6 +12,8 @@ type Provider interface {
Start(name string) (instance.State, error)
Stop(name string) (instance.State, error)
GetState(name string) (instance.State, error)
NotifyInsanceStopped(ctx context.Context, instance chan string)
}
func NewProvider(config config.Provider) (Provider, error) {

View File

@@ -0,0 +1,68 @@
package mocks
import (
"context"
"sync"
"github.com/acouvreur/sablier/app/instance"
"github.com/acouvreur/sablier/app/providers"
"github.com/acouvreur/sablier/pkg/tinykv"
"github.com/stretchr/testify/mock"
)
type ProviderMock struct {
stoppedInstances []string
wg sync.WaitGroup
providers.Provider
mock.Mock
}
func NewProviderMock(stoppedInstances []string) *ProviderMock {
return &ProviderMock{
stoppedInstances: stoppedInstances,
}
}
func (provider *ProviderMock) NotifyInsanceStopped(ctx context.Context, instance chan string) {
go func() {
defer close(instance)
for i := 0; i < len(provider.stoppedInstances); i++ {
instance <- provider.stoppedInstances[i]
}
provider.wg.Done()
}()
}
func (provider *ProviderMock) Add(count int) {
provider.wg.Add(count)
}
func (provider *ProviderMock) Wait() {
provider.wg.Wait()
}
type KVMock[T any] struct {
wg sync.WaitGroup
tinykv.KV[T]
mock.Mock
}
func NewKVMock() *KVMock[instance.State] {
return &KVMock[instance.State]{}
}
func (kv *KVMock[T]) Delete(k string) {
kv.Mock.Called(k)
kv.wg.Done()
}
func (kv *KVMock[T]) Add(count int) {
kv.wg.Add(count)
}
func (kv *KVMock[T]) Wait() {
kv.wg.Wait()
}

View File

@@ -1,6 +1,7 @@
package sessions
import (
"context"
"encoding/json"
"fmt"
"io"
@@ -22,14 +23,30 @@ type Manager interface {
}
type SessionsManager struct {
store tinykv.KV[instance.State]
provider providers.Provider
store tinykv.KV[instance.State]
provider providers.Provider
insanceStopped chan string
}
func NewSessionsManager(store tinykv.KV[instance.State], provider providers.Provider) Manager {
instanceStopped := make(chan string)
go func() {
for instance := range instanceStopped {
// Will delete from the store containers that have been stop either by external sources
// or by the internal expiration loop, if the deleted entry does not exist, it doesn't matter
log.Debugf("received event instance %s is stopped, removing from store", instance)
store.Delete(instance)
}
}()
provider.NotifyInsanceStopped(context.Background(), instanceStopped)
return &SessionsManager{
store: store,
provider: provider,
store: store,
provider: provider,
insanceStopped: instanceStopped,
}
}

View File

@@ -5,6 +5,8 @@ import (
"testing"
"github.com/acouvreur/sablier/app/instance"
"github.com/acouvreur/sablier/app/sessions/mocks"
"github.com/stretchr/testify/mock"
)
func TestSessionState_IsReady(t *testing.T) {
@@ -79,3 +81,40 @@ func createMap(instances []*instance.State) (store *sync.Map) {
return
}
func TestNewSessionsManagerEvents(t *testing.T) {
tests := []struct {
name string
stoppedInstances []string
}{
{
name: "when nginx is stopped it is removed from the store",
stoppedInstances: []string{"nginx"},
},
{
name: "when nginx, apache and whoami is stopped it is removed from the store",
stoppedInstances: []string{"nginx", "apache", "whoami"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := mocks.NewProviderMock(tt.stoppedInstances)
provider.Add(1)
kv := mocks.NewKVMock()
kv.Add(len(tt.stoppedInstances))
kv.Mock.On("Delete", mock.AnythingOfType("string")).Return()
NewSessionsManager(kv, provider)
// The provider watches notifications from a Goroutine, must wait
provider.Wait()
// The key is deleted inside a Goroutine by the session manager, must wait
kv.Wait()
for _, instance := range tt.stoppedInstances {
kv.AssertCalled(t, "Delete", instance)
}
})
}
}