mirror of
https://github.com/sablierapp/sablier.git
synced 2026-01-04 03:54:56 +01:00
remove old scaler
This commit is contained in:
@@ -1,106 +0,0 @@
|
||||
package scaler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/client"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type DockerClassicScaler struct {
|
||||
Client client.ContainerAPIClient
|
||||
}
|
||||
|
||||
func NewDockerClassicScaler(client client.ContainerAPIClient) *DockerClassicScaler {
|
||||
return &DockerClassicScaler{
|
||||
Client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (scaler *DockerClassicScaler) ScaleUp(name string) error {
|
||||
log.Infof("Scaling up %s to %d", name, onereplicas)
|
||||
ctx := context.Background()
|
||||
container, err := scaler.GetContainerByName(name, ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
err = scaler.Client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{})
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scaler *DockerClassicScaler) ScaleDown(name string) error {
|
||||
log.Infof("Scaling down %s to 0", name)
|
||||
ctx := context.Background()
|
||||
container, err := scaler.GetContainerByName(name, ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
err = scaler.Client.ContainerStop(ctx, container.ID, nil)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scaler *DockerClassicScaler) IsUp(name string) bool {
|
||||
ctx := context.Background()
|
||||
container, err := scaler.GetContainerByName(name, ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
spec, err := scaler.Client.ContainerInspect(ctx, container.ID)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
if spec.State.Health != nil {
|
||||
return spec.State.Health.Status == "healthy"
|
||||
}
|
||||
|
||||
return spec.State.Running
|
||||
}
|
||||
|
||||
func (scaler *DockerClassicScaler) GetContainerByName(name string, ctx context.Context) (*types.Container, error) {
|
||||
opts := types.ContainerListOptions{
|
||||
All: true,
|
||||
Filters: filters.NewArgs(),
|
||||
}
|
||||
opts.Filters.Add("name", name)
|
||||
|
||||
containers, err := scaler.Client.ContainerList(ctx, opts)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(containers) == 0 {
|
||||
return nil, fmt.Errorf(fmt.Sprintf("container with name %s was not found", name))
|
||||
}
|
||||
|
||||
if len(containers) > 1 {
|
||||
return nil, fmt.Errorf("multiple containers (%d) with name %s were found: %v", len(containers), name, containers)
|
||||
}
|
||||
|
||||
return &containers[0], nil
|
||||
}
|
||||
@@ -1,363 +0,0 @@
|
||||
package scaler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/acouvreur/sablier/pkg/scaler/mocks"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func TestDockerClassicScaler_ScaleUp(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.ContainerAPIClientMock
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
containerList []types.Container
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "start nginx container",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx"},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "container nginx was not found",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{},
|
||||
err: errors.New("container with name nginx was not found"),
|
||||
},
|
||||
{
|
||||
name: "multiple containers with name nginx were found",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx1"},
|
||||
},
|
||||
{
|
||||
Names: []string{"nginx2"},
|
||||
},
|
||||
},
|
||||
err: errors.New("multiple containers (2) with name nginx were found: [{ [nginx1] 0 [] 0 0 map[] {} <nil> []} { [nginx2] 0 [] 0 0 map[] {} <nil> []}]"),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerClassicScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
|
||||
tt.fields.Client.On("ContainerList", mock.Anything, mock.Anything).Return(tt.containerList, nil)
|
||||
tt.fields.Client.On("ContainerStart", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
err := scaler.ScaleUp(tt.args.name)
|
||||
|
||||
assert.EqualValues(t, tt.err, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerClassicScaler_ScaleDown(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.ContainerAPIClientMock
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
containerList []types.Container
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "start nginx container",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx"},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "container nginx was not found",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{},
|
||||
err: errors.New("container with name nginx was not found"),
|
||||
},
|
||||
{
|
||||
name: "multiple containers with name nginx were found",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx1"},
|
||||
},
|
||||
{
|
||||
Names: []string{"nginx2"},
|
||||
},
|
||||
},
|
||||
err: errors.New("multiple containers (2) with name nginx were found: [{ [nginx1] 0 [] 0 0 map[] {} <nil> []} { [nginx2] 0 [] 0 0 map[] {} <nil> []}]"),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerClassicScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
|
||||
tt.fields.Client.On("ContainerList", mock.Anything, mock.Anything).Return(tt.containerList, nil)
|
||||
tt.fields.Client.On("ContainerStop", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
err := scaler.ScaleDown(tt.args.name)
|
||||
|
||||
assert.EqualValues(t, tt.err, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerClassicScaler_IsUp(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.ContainerAPIClientMock
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
containerList []types.Container
|
||||
containerSpec types.ContainerJSON
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "nginx container is started without healthcheck",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx"},
|
||||
},
|
||||
},
|
||||
containerSpec: types.ContainerJSON{
|
||||
ContainerJSONBase: &types.ContainerJSONBase{
|
||||
State: &types.ContainerState{
|
||||
Running: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "nginx container is not running without healthcheck",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx"},
|
||||
},
|
||||
},
|
||||
containerSpec: types.ContainerJSON{
|
||||
ContainerJSONBase: &types.ContainerJSONBase{
|
||||
State: &types.ContainerState{
|
||||
Running: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "nginx container is started but not healthy",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx"},
|
||||
},
|
||||
},
|
||||
containerSpec: types.ContainerJSON{
|
||||
ContainerJSONBase: &types.ContainerJSONBase{
|
||||
State: &types.ContainerState{
|
||||
Running: true,
|
||||
Health: &types.Health{
|
||||
Status: "starting",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "nginx container is started and healthy",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx"},
|
||||
},
|
||||
},
|
||||
containerSpec: types.ContainerJSON{
|
||||
ContainerJSONBase: &types.ContainerJSONBase{
|
||||
State: &types.ContainerState{
|
||||
Running: true,
|
||||
Health: &types.Health{
|
||||
Status: "healthy",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerClassicScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
|
||||
tt.fields.Client.On("ContainerList", mock.Anything, mock.Anything).Return(tt.containerList, nil)
|
||||
tt.fields.Client.On("ContainerInspect", mock.Anything, mock.Anything).Return(tt.containerSpec, nil)
|
||||
|
||||
got := scaler.IsUp(tt.args.name)
|
||||
|
||||
assert.EqualValues(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerClassicScaler_GetContainerByName(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.ContainerAPIClientMock
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
containerList []types.Container
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "start nginx container",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx"},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
name: "container nginx was not found",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{},
|
||||
err: errors.New("container with name nginx was not found"),
|
||||
},
|
||||
{
|
||||
name: "multiple containers with name nginx were found",
|
||||
fields: fields{
|
||||
Client: mocks.NewContainerAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
containerList: []types.Container{
|
||||
{
|
||||
Names: []string{"nginx1"},
|
||||
},
|
||||
{
|
||||
Names: []string{"nginx2"},
|
||||
},
|
||||
},
|
||||
err: errors.New("multiple containers (2) with name nginx were found: [{ [nginx1] 0 [] 0 0 map[] {} <nil> []} { [nginx2] 0 [] 0 0 map[] {} <nil> []}]"),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerClassicScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
|
||||
tt.fields.Client.On("ContainerList", mock.Anything, mock.Anything).Return(tt.containerList, nil)
|
||||
|
||||
_, err := scaler.GetContainerByName(tt.args.name, tt.args.ctx)
|
||||
|
||||
assert.EqualValues(t, tt.err, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,191 +0,0 @@
|
||||
package scaler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// Delimiter is used to split name into kind,namespace,name,replicacount
|
||||
const Delimiter = "_"
|
||||
|
||||
type Config struct {
|
||||
Kind string // deployment or statefulset
|
||||
Namespace string
|
||||
Name string
|
||||
Replicas int
|
||||
}
|
||||
|
||||
type Workload interface {
|
||||
GetScale(ctx context.Context, workloadName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
|
||||
UpdateScale(ctx context.Context, workloadName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
|
||||
}
|
||||
|
||||
func convertName(name string) (*Config, error) {
|
||||
// name format kind_namespace_name_replicas
|
||||
s := strings.Split(name, Delimiter)
|
||||
if len(s) < 4 {
|
||||
return nil, errors.New("invalid name should be: kind" + Delimiter + "namespace" + Delimiter + "name" + Delimiter + "replicas")
|
||||
}
|
||||
replicas, err := strconv.Atoi(s[3])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Config{
|
||||
Kind: s[0],
|
||||
Namespace: s[1],
|
||||
Name: s[2],
|
||||
Replicas: replicas,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type KubernetesScaler struct {
|
||||
Client *kubernetes.Clientset
|
||||
}
|
||||
|
||||
func NewKubernetesScaler(client *kubernetes.Clientset) *KubernetesScaler {
|
||||
return &KubernetesScaler{
|
||||
Client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (scaler *KubernetesScaler) ScaleUp(name string) error {
|
||||
config, err := convertName(name)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Scaling up %s %s in namespace %s to %d", config.Kind, config.Name, config.Namespace, config.Replicas)
|
||||
ctx := context.Background()
|
||||
|
||||
var workload Workload
|
||||
|
||||
switch config.Kind {
|
||||
case "deployment":
|
||||
workload = scaler.Client.AppsV1().Deployments(config.Namespace)
|
||||
case "statefulset":
|
||||
workload = scaler.Client.AppsV1().StatefulSets(config.Namespace)
|
||||
default:
|
||||
return fmt.Errorf("unsupported kind %s", config.Kind)
|
||||
}
|
||||
|
||||
s, err := workload.GetScale(ctx, config.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
sc := *s
|
||||
if sc.Spec.Replicas == 0 {
|
||||
sc.Spec.Replicas = int32(config.Replicas)
|
||||
} else {
|
||||
log.Infof("Replicas for %s %s in namespace %s are already: %d", config.Kind, config.Name, config.Namespace, sc.Spec.Replicas)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = workload.UpdateScale(ctx, config.Name, &sc, metav1.UpdateOptions{})
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scaler *KubernetesScaler) ScaleDown(name string) error {
|
||||
config, err := convertName(name)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Scaling down %s %s in namespace %s to 0", config.Kind, config.Name, config.Namespace)
|
||||
ctx := context.Background()
|
||||
|
||||
var workload Workload
|
||||
|
||||
switch config.Kind {
|
||||
case "deployment":
|
||||
workload = scaler.Client.AppsV1().Deployments(config.Namespace)
|
||||
case "statefulset":
|
||||
workload = scaler.Client.AppsV1().StatefulSets(config.Namespace)
|
||||
default:
|
||||
return fmt.Errorf("unsupported kind %s", config.Kind)
|
||||
}
|
||||
|
||||
s, err := workload.GetScale(ctx, config.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
sc := *s
|
||||
if sc.Spec.Replicas != 0 {
|
||||
sc.Spec.Replicas = 0
|
||||
} else {
|
||||
log.Infof("Replicas for %s %s in namespace %s are already: 0", config.Kind, config.Name, config.Namespace)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = workload.UpdateScale(ctx, config.Name, &sc, metav1.UpdateOptions{})
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scaler *KubernetesScaler) IsUp(name string) bool {
|
||||
ctx := context.Background()
|
||||
|
||||
config, err := convertName(name)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
switch config.Kind {
|
||||
case "deployment":
|
||||
d, err := scaler.Client.AppsV1().Deployments(config.Namespace).
|
||||
Get(ctx, config.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return false
|
||||
}
|
||||
log.Infof("Status for %s %s in namespace %s is: AvailableReplicas %d, ReadyReplicas: %d ", config.Kind, config.Name, config.Namespace, d.Status.AvailableReplicas, d.Status.ReadyReplicas)
|
||||
|
||||
if d.Status.ReadyReplicas > 0 {
|
||||
return true
|
||||
}
|
||||
case "statefulset":
|
||||
d, err := scaler.Client.AppsV1().StatefulSets(config.Namespace).
|
||||
Get(ctx, config.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return false
|
||||
}
|
||||
log.Infof("Status for %s %s in namespace %s is: AvailableReplicas %d, ReadyReplicas: %d ", config.Kind, config.Name, config.Namespace, d.Status.AvailableReplicas, d.Status.ReadyReplicas)
|
||||
|
||||
if d.Status.ReadyReplicas > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
default:
|
||||
log.Error(fmt.Errorf("unsupported kind %s", config.Kind))
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
@@ -1,154 +0,0 @@
|
||||
package scaler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/client"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type DockerSwarmScaler struct {
|
||||
Client client.ServiceAPIClient
|
||||
}
|
||||
|
||||
func NewDockerSwarmScaler() *DockerSwarmScaler {
|
||||
return &DockerSwarmScaler{}
|
||||
}
|
||||
|
||||
func (scaler *DockerSwarmScaler) ScaleUp(name string) error {
|
||||
ctx := context.Background()
|
||||
service, err := scaler.GetServiceByName(name, ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if *service.Spec.Mode.Replicated.Replicas == onereplicas {
|
||||
log.Infof("%s already scaled up to %d", name, onereplicas)
|
||||
return nil
|
||||
}
|
||||
log.Infof("scaling up %s to %d", name, onereplicas)
|
||||
|
||||
service.Spec.Mode.Replicated = &swarm.ReplicatedService{
|
||||
Replicas: &onereplicas,
|
||||
}
|
||||
response, err := scaler.Client.ServiceUpdate(ctx, service.ID, service.Meta.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if len(response.Warnings) > 0 {
|
||||
log.Warnf("received scaling up service %s: %v", name, response.Warnings)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scaler *DockerSwarmScaler) ScaleDown(name string) error {
|
||||
ctx := context.Background()
|
||||
service, err := scaler.GetServiceByName(name, ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
replicas := uint64(0)
|
||||
|
||||
if *service.Spec.Mode.Replicated.Replicas == replicas {
|
||||
log.Infof("%s already scaled down to %d", name, replicas)
|
||||
return nil
|
||||
}
|
||||
log.Infof("scaling down %s to %d", name, replicas)
|
||||
|
||||
service.Spec.Mode.Replicated = &swarm.ReplicatedService{
|
||||
Replicas: &replicas,
|
||||
}
|
||||
response, err := scaler.Client.ServiceUpdate(ctx, service.ID, service.Meta.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if len(response.Warnings) > 0 {
|
||||
log.Warnf("received scaling up service %s: %v", name, response.Warnings)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scaler *DockerSwarmScaler) IsUp(name string) bool {
|
||||
ctx := context.Background()
|
||||
service, err := scaler.GetServiceByName(name, ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
return scaler.isServiceRunningFor(service, 5*time.Second)
|
||||
}
|
||||
|
||||
func (scaler *DockerSwarmScaler) isServiceRunningFor(service *swarm.Service, duration time.Duration) bool {
|
||||
|
||||
if service.ServiceStatus.DesiredTasks == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
if service.ServiceStatus.DesiredTasks != service.ServiceStatus.RunningTasks {
|
||||
return false
|
||||
}
|
||||
|
||||
opts := types.TaskListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
}
|
||||
opts.Filters.Add("desired-state", "running")
|
||||
opts.Filters.Add("service", service.Spec.Name)
|
||||
|
||||
ctx := context.Background()
|
||||
tasks, err := scaler.Client.TaskList(ctx, opts)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
if len(tasks) == 0 {
|
||||
log.Error("No task found with filter desired-state=running and service=", service.Spec.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (scaler *DockerSwarmScaler) GetServiceByName(name string, ctx context.Context) (*swarm.Service, error) {
|
||||
opts := types.ServiceListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
Status: true,
|
||||
}
|
||||
opts.Filters.Add("name", name)
|
||||
|
||||
services, err := scaler.Client.ServiceList(ctx, opts)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(services) == 0 {
|
||||
return nil, fmt.Errorf(fmt.Sprintf("service with name %s was not found", name))
|
||||
}
|
||||
|
||||
if len(services) > 1 {
|
||||
return nil, fmt.Errorf("multiple services (%d) with name %s were found: %v", len(services), name, services)
|
||||
}
|
||||
|
||||
return &services[0], nil
|
||||
}
|
||||
@@ -1,404 +0,0 @@
|
||||
package scaler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/acouvreur/sablier/pkg/scaler/mocks"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func TestDockerSwarmScaler_ScaleUp(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.ServiceAPIClientMock
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
serviceList []swarm.Service
|
||||
want swarm.Service
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "scale nginx service to 1 replica",
|
||||
fields: fields{
|
||||
Client: mocks.NewServiceAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
serviceList: []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &zeroreplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: swarm.Service{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &onereplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerSwarmScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
|
||||
tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
|
||||
tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.want.ID, tt.want.Meta.Version, tt.want.Spec, mock.Anything).Return(types.ServiceUpdateResponse{
|
||||
Warnings: []string{},
|
||||
}, nil)
|
||||
|
||||
scaler.ScaleUp(tt.args.name)
|
||||
|
||||
tt.fields.Client.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("scale nginx service to 1 replica twice", func(t *testing.T) {
|
||||
swarmMock := mocks.NewServiceAPIClientMock()
|
||||
|
||||
serviceList := []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &zeroreplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
swarmMock.On("ServiceList", mock.Anything, mock.Anything).Return(serviceList, nil)
|
||||
swarmMock.On("ServiceUpdate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.ServiceUpdateResponse{
|
||||
Warnings: []string{},
|
||||
}, nil)
|
||||
|
||||
scaler := &DockerSwarmScaler{
|
||||
Client: swarmMock,
|
||||
}
|
||||
|
||||
scaler.ScaleUp("nginx")
|
||||
scaler.ScaleUp("nginx")
|
||||
|
||||
swarmMock.AssertNumberOfCalls(t, "ServiceUpdate", 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDockerSwarmScaler_ScaleDown(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.ServiceAPIClientMock
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
serviceList []swarm.Service
|
||||
want swarm.Service
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "scale nginx service to 1 replica",
|
||||
fields: fields{
|
||||
Client: mocks.NewServiceAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
serviceList: []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &onereplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: swarm.Service{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &zeroreplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerSwarmScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
|
||||
tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
|
||||
tt.fields.Client.On("ServiceUpdate", mock.Anything, tt.want.ID, tt.want.Meta.Version, tt.want.Spec, mock.Anything).Return(types.ServiceUpdateResponse{
|
||||
Warnings: []string{},
|
||||
}, nil)
|
||||
|
||||
scaler.ScaleDown(tt.args.name)
|
||||
|
||||
tt.fields.Client.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
t.Run("scale nginx service to 0 replica twice", func(t *testing.T) {
|
||||
swarmMock := mocks.NewServiceAPIClientMock()
|
||||
|
||||
serviceList := []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &onereplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
swarmMock.On("ServiceList", mock.Anything, mock.Anything).Return(serviceList, nil)
|
||||
swarmMock.On("ServiceUpdate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(types.ServiceUpdateResponse{
|
||||
Warnings: []string{},
|
||||
}, nil)
|
||||
|
||||
scaler := &DockerSwarmScaler{
|
||||
Client: swarmMock,
|
||||
}
|
||||
|
||||
scaler.ScaleDown("nginx")
|
||||
scaler.ScaleDown("nginx")
|
||||
|
||||
swarmMock.AssertNumberOfCalls(t, "ServiceUpdate", 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDockerSwarmScaler_IsUp(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.ServiceAPIClientMock
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
serviceList []swarm.Service
|
||||
taskList []swarm.Task
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "service nginx is 0/0",
|
||||
fields: fields{
|
||||
Client: mocks.NewServiceAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
serviceList: []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &zeroreplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
ServiceStatus: &swarm.ServiceStatus{
|
||||
RunningTasks: 0,
|
||||
DesiredTasks: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "service nginx is 1/1 since 10 seconds",
|
||||
fields: fields{
|
||||
Client: mocks.NewServiceAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
serviceList: []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &zeroreplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
ServiceStatus: &swarm.ServiceStatus{
|
||||
RunningTasks: 1,
|
||||
DesiredTasks: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
taskList: []swarm.Task{
|
||||
{
|
||||
Status: swarm.TaskStatus{
|
||||
Timestamp: time.Now().Add(-10 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "service nginx is 0/1",
|
||||
fields: fields{
|
||||
Client: mocks.NewServiceAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
serviceList: []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &zeroreplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
ServiceStatus: &swarm.ServiceStatus{
|
||||
RunningTasks: 0,
|
||||
DesiredTasks: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
taskList: []swarm.Task{},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "service nginx is 1/1 since 2 seconds",
|
||||
fields: fields{
|
||||
Client: mocks.NewServiceAPIClientMock(),
|
||||
},
|
||||
args: args{
|
||||
name: "nginx",
|
||||
},
|
||||
serviceList: []swarm.Service{
|
||||
{
|
||||
ID: "nginx_service",
|
||||
Meta: swarm.Meta{Version: swarm.Version{}},
|
||||
Spec: swarm.ServiceSpec{
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &zeroreplicas,
|
||||
},
|
||||
},
|
||||
},
|
||||
ServiceStatus: &swarm.ServiceStatus{
|
||||
RunningTasks: 0,
|
||||
DesiredTasks: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
taskList: []swarm.Task{
|
||||
{
|
||||
Status: swarm.TaskStatus{
|
||||
Timestamp: time.Now().Add(-2 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerSwarmScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
|
||||
tt.fields.Client.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
|
||||
tt.fields.Client.On("TaskList", mock.Anything, mock.Anything).Return(tt.taskList, nil)
|
||||
|
||||
got := scaler.IsUp(tt.args.name)
|
||||
|
||||
assert.EqualValues(t, tt.want, got)
|
||||
//tt.fields.Client.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerSwarmScaler_GetServiceByName(t *testing.T) {
|
||||
type fields struct {
|
||||
Client client.ServiceAPIClient
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want *swarm.Service
|
||||
wantErr bool
|
||||
}{
|
||||
// TODO: Add test cases.
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
scaler := &DockerSwarmScaler{
|
||||
Client: tt.fields.Client,
|
||||
}
|
||||
got, err := scaler.GetServiceByName(tt.args.name, tt.args.ctx)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("DockerSwarmScaler.GetServiceByName() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("DockerSwarmScaler.GetServiceByName() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
package mocks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type ContainerAPIClientMock struct {
|
||||
client.ContainerAPIClient
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func NewContainerAPIClientMock() *ContainerAPIClientMock {
|
||||
return &ContainerAPIClientMock{}
|
||||
}
|
||||
|
||||
func (client *ContainerAPIClientMock) ContainerStart(ctx context.Context, container string, options types.ContainerStartOptions) error {
|
||||
args := client.Mock.Called(ctx, container, options)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (client *ContainerAPIClientMock) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
|
||||
args := client.Mock.Called(ctx, container, timeout)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (client *ContainerAPIClientMock) ContainerInspect(ctx context.Context, container string) (types.ContainerJSON, error) {
|
||||
args := client.Mock.Called(ctx, container)
|
||||
return args.Get(0).(types.ContainerJSON), args.Error(1)
|
||||
}
|
||||
|
||||
func (client *ContainerAPIClientMock) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
|
||||
args := client.Mock.Called(ctx, options)
|
||||
return args.Get(0).([]types.Container), args.Error(1)
|
||||
}
|
||||
|
||||
type ServiceAPIClientMock struct {
|
||||
client.ServiceAPIClient
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func NewServiceAPIClientMock() *ServiceAPIClientMock {
|
||||
return &ServiceAPIClientMock{}
|
||||
}
|
||||
|
||||
func (client *ServiceAPIClientMock) ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) (types.ServiceUpdateResponse, error) {
|
||||
args := client.Mock.Called(ctx, serviceID, version, service, options)
|
||||
return args.Get(0).(types.ServiceUpdateResponse), args.Error(1)
|
||||
}
|
||||
|
||||
func (client *ServiceAPIClientMock) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
|
||||
args := client.Mock.Called(ctx, options)
|
||||
return args.Get(0).([]swarm.Service), args.Error(1)
|
||||
}
|
||||
|
||||
func (client *ServiceAPIClientMock) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) {
|
||||
args := client.Mock.Called(ctx, options)
|
||||
return args.Get(0).([]swarm.Task), args.Error(1)
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
package scaler
|
||||
|
||||
var onereplicas = uint64(1)
|
||||
var zeroreplicas = uint64(0)
|
||||
|
||||
type Scaler interface {
|
||||
ScaleUp(name string) error
|
||||
ScaleDown(name string) error
|
||||
IsUp(name string) bool
|
||||
}
|
||||
Reference in New Issue
Block a user