mirror of
https://github.com/sablierapp/sablier.git
synced 2025-12-21 13:23:03 +01:00
fix(provider): add debug logging (#653)
* fix(provider): add debug logging Add a bunch of debug logging calls * return a swarm service pointer * revert to service list with status true * change trace to debug * --no-verify
This commit is contained in:
2
cmd/sablier/testdata/config.yml
vendored
2
cmd/sablier/testdata/config.yml
vendored
@@ -16,7 +16,7 @@ sessions:
|
|||||||
default-duration: 1h
|
default-duration: 1h
|
||||||
expiration-interval: 1h
|
expiration-interval: 1h
|
||||||
logging:
|
logging:
|
||||||
level: trace
|
level: debug
|
||||||
strategy:
|
strategy:
|
||||||
dynamic:
|
dynamic:
|
||||||
custom-themes-path: /tmp/configfile/themes
|
custom-themes-path: /tmp/configfile/themes
|
||||||
|
|||||||
2
cmd/sablier/testdata/config_yaml_wanted.json
vendored
2
cmd/sablier/testdata/config_yaml_wanted.json
vendored
@@ -23,7 +23,7 @@
|
|||||||
"ExpirationInterval": 3600000000000
|
"ExpirationInterval": 3600000000000
|
||||||
},
|
},
|
||||||
"Logging": {
|
"Logging": {
|
||||||
"Level": "trace"
|
"Level": "debug"
|
||||||
},
|
},
|
||||||
"Strategy": {
|
"Strategy": {
|
||||||
"Dynamic": {
|
"Dynamic": {
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ sessions:
|
|||||||
# If you only use sessions of 1h, setting this to 5m is a good trade-off.
|
# If you only use sessions of 1h, setting this to 5m is a good trade-off.
|
||||||
expiration-interval: 20s
|
expiration-interval: 20s
|
||||||
logging:
|
logging:
|
||||||
level: trace
|
level: debug
|
||||||
strategy:
|
strategy:
|
||||||
dynamic:
|
dynamic:
|
||||||
# Custom themes folder, will load all .html files recursively (default empty)
|
# Custom themes folder, will load all .html files recursively (default empty)
|
||||||
|
|||||||
@@ -3,8 +3,10 @@ package docker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/docker/docker/api/types/container"
|
||||||
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.InstanceInfo, error) {
|
func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.InstanceInfo, error) {
|
||||||
@@ -13,6 +15,8 @@ func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.In
|
|||||||
return sablier.InstanceInfo{}, fmt.Errorf("cannot inspect container: %w", err)
|
return sablier.InstanceInfo{}, fmt.Errorf("cannot inspect container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "container inspected", slog.String("container", name), slog.String("status", spec.State.Status), slog.String("health", healthStatus(spec.State.Health)))
|
||||||
|
|
||||||
// "created", "running", "paused", "restarting", "removing", "exited", or "dead"
|
// "created", "running", "paused", "restarting", "removing", "exited", or "dead"
|
||||||
switch spec.State.Status {
|
switch spec.State.Status {
|
||||||
case "created", "paused", "restarting", "removing":
|
case "created", "paused", "restarting", "removing":
|
||||||
@@ -41,3 +45,11 @@ func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.In
|
|||||||
return sablier.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), p.desiredReplicas), nil
|
return sablier.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), p.desiredReplicas), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func healthStatus(health *container.Health) string {
|
||||||
|
if health == nil {
|
||||||
|
return "no healthcheck defined"
|
||||||
|
}
|
||||||
|
|
||||||
|
return health.Status
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,26 +3,30 @@ package docker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
dockertypes "github.com/docker/docker/api/types"
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
"github.com/sablierapp/sablier/pkg/provider"
|
"github.com/sablierapp/sablier/pkg/provider"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceListOptions) ([]sablier.InstanceConfiguration, error) {
|
func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceListOptions) ([]sablier.InstanceConfiguration, error) {
|
||||||
args := filters.NewArgs()
|
args := filters.NewArgs()
|
||||||
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", options.All), slog.Any("filters", args)))
|
||||||
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
|
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
|
||||||
All: options.All,
|
All: options.All,
|
||||||
Filters: args,
|
Filters: args,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("cannot list containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(containers)))
|
||||||
|
|
||||||
instances := make([]sablier.InstanceConfiguration, 0, len(containers))
|
instances := make([]sablier.InstanceConfiguration, 0, len(containers))
|
||||||
for _, c := range containers {
|
for _, c := range containers {
|
||||||
instance := containerToInstance(c)
|
instance := containerToInstance(c)
|
||||||
@@ -32,7 +36,7 @@ func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceLi
|
|||||||
return instances, nil
|
return instances, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func containerToInstance(c dockertypes.Container) sablier.InstanceConfiguration {
|
func containerToInstance(c container.Summary) sablier.InstanceConfiguration {
|
||||||
var group string
|
var group string
|
||||||
|
|
||||||
if _, ok := c.Labels["sablier.enable"]; ok {
|
if _, ok := c.Labels["sablier.enable"]; ok {
|
||||||
@@ -53,15 +57,18 @@ func (p *Provider) InstanceGroups(ctx context.Context) (map[string][]string, err
|
|||||||
args := filters.NewArgs()
|
args := filters.NewArgs()
|
||||||
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", true), slog.Any("filters", args)))
|
||||||
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
|
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
|
||||||
All: true,
|
All: true,
|
||||||
Filters: args,
|
Filters: args,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("cannot list containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(containers)))
|
||||||
|
|
||||||
groups := make(map[string][]string)
|
groups := make(map[string][]string)
|
||||||
for _, c := range containers {
|
for _, c := range containers {
|
||||||
groupName := c.Labels["sablier.group"]
|
groupName := c.Labels["sablier.group"]
|
||||||
|
|||||||
@@ -3,13 +3,17 @@ package docker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceStart(ctx context.Context, name string) error {
|
func (p *Provider) InstanceStart(ctx context.Context, name string) error {
|
||||||
// TODO: InstanceStart should block until the container is ready.
|
// TODO: InstanceStart should block until the container is ready.
|
||||||
|
p.l.DebugContext(ctx, "starting container", "name", name)
|
||||||
err := p.Client.ContainerStart(ctx, name, container.StartOptions{})
|
err := p.Client.ContainerStart(ctx, name, container.StartOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
p.l.ErrorContext(ctx, "cannot start container", slog.String("name", name), slog.Any("error", err))
|
||||||
return fmt.Errorf("cannot start container %s: %w", name, err)
|
return fmt.Errorf("cannot start container %s: %w", name, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ package docker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/docker/docker/api/types/container"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/docker/docker/api/types/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceStop(ctx context.Context, name string) error {
|
func (p *Provider) InstanceStop(ctx context.Context, name string) error {
|
||||||
@@ -18,8 +19,8 @@ func (p *Provider) InstanceStop(ctx context.Context, name string) error {
|
|||||||
p.l.DebugContext(ctx, "waiting for container to stop", slog.String("name", name))
|
p.l.DebugContext(ctx, "waiting for container to stop", slog.String("name", name))
|
||||||
waitC, errC := p.Client.ContainerWait(ctx, name, container.WaitConditionNotRunning)
|
waitC, errC := p.Client.ContainerWait(ctx, name, container.WaitConditionNotRunning)
|
||||||
select {
|
select {
|
||||||
case <-waitC:
|
case response := <-waitC:
|
||||||
p.l.DebugContext(ctx, "container stopped", slog.String("name", name))
|
p.l.DebugContext(ctx, "container stopped", slog.String("name", name), slog.Int64("exit_code", response.StatusCode))
|
||||||
return nil
|
return nil
|
||||||
case err := <-errC:
|
case err := <-errC:
|
||||||
p.l.ErrorContext(ctx, "cannot wait for container to stop", slog.String("name", name), slog.Any("error", err))
|
p.l.ErrorContext(ctx, "cannot wait for container to stop", slog.String("name", name), slog.Any("error", err))
|
||||||
|
|||||||
@@ -3,11 +3,12 @@ package docker
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/docker/docker/api/types/events"
|
|
||||||
"github.com/docker/docker/api/types/filters"
|
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/docker/docker/api/types/events"
|
||||||
|
"github.com/docker/docker/api/types/filters"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||||
@@ -27,6 +28,7 @@ func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- st
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Send the container that has died to the channel
|
// Send the container that has died to the channel
|
||||||
|
p.l.DebugContext(ctx, "event received", "event", msg)
|
||||||
instance <- strings.TrimPrefix(msg.Actor.Attributes["name"], "/")
|
instance <- strings.TrimPrefix(msg.Actor.Attributes["name"], "/")
|
||||||
case err, ok := <-errs:
|
case err, ok := <-errs:
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|||||||
@@ -4,12 +4,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
|
||||||
"github.com/docker/docker/api/types/swarm"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
|
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -47,32 +47,24 @@ func New(ctx context.Context, cli *client.Client, logger *slog.Logger) (*Provide
|
|||||||
func (p *Provider) ServiceUpdateReplicas(ctx context.Context, name string, replicas uint64) error {
|
func (p *Provider) ServiceUpdateReplicas(ctx context.Context, name string, replicas uint64) error {
|
||||||
service, err := p.getServiceByName(name, ctx)
|
service, err := p.getServiceByName(name, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("cannot get service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
foundName := p.getInstanceName(name, *service)
|
|
||||||
if service.Spec.Mode.Replicated == nil {
|
if service.Spec.Mode.Replicated == nil {
|
||||||
return errors.New("swarm service is not in \"replicated\" mode")
|
return errors.New("swarm service is not in \"replicated\" mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "scaling service", "name", name, "current_replicas", service.Spec.Mode.Replicated.Replicas, "desired_replicas", p.desiredReplicas)
|
||||||
service.Spec.Mode.Replicated.Replicas = &replicas
|
service.Spec.Mode.Replicated.Replicas = &replicas
|
||||||
|
|
||||||
response, err := p.Client.ServiceUpdate(ctx, service.ID, service.Meta.Version, service.Spec, types.ServiceUpdateOptions{})
|
response, err := p.Client.ServiceUpdate(ctx, service.ID, service.Meta.Version, service.Spec, swarm.ServiceUpdateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("cannot update service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(response.Warnings) > 0 {
|
if len(response.Warnings) > 0 {
|
||||||
return fmt.Errorf("warning received updating swarm service [%s]: %s", foundName, strings.Join(response.Warnings, ", "))
|
return fmt.Errorf("warning received updating swarm service [%s]: %s", service.Spec.Name, strings.Join(response.Warnings, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getInstanceName(name string, service swarm.Service) string {
|
|
||||||
if name == service.Spec.Name {
|
|
||||||
return name
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Sprintf("%s (%s)", name, service.Spec.Name)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -3,10 +3,11 @@ package dockerswarm
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/docker/docker/api/types/events"
|
|
||||||
"github.com/docker/docker/api/types/filters"
|
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/docker/docker/api/types/events"
|
||||||
|
"github.com/docker/docker/api/types/filters"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||||
@@ -25,6 +26,7 @@ func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- st
|
|||||||
p.l.ErrorContext(ctx, "event stream closed")
|
p.l.ErrorContext(ctx, "event stream closed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
p.l.DebugContext(ctx, "event received", "event", msg)
|
||||||
if msg.Actor.Attributes["replicas.new"] == "0" {
|
if msg.Actor.Attributes["replicas.new"] == "0" {
|
||||||
instance <- msg.Actor.Attributes["name"]
|
instance <- msg.Actor.Attributes["name"]
|
||||||
} else if msg.Action == "remove" {
|
} else if msg.Action == "remove" {
|
||||||
|
|||||||
@@ -2,12 +2,14 @@ package dockerswarm_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
|
"github.com/docker/docker/api/types/swarm"
|
||||||
"github.com/neilotoole/slogt"
|
"github.com/neilotoole/slogt"
|
||||||
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
||||||
"gotest.tools/v3/assert"
|
"gotest.tools/v3/assert"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDockerSwarmProvider_NotifyInstanceStopped(t *testing.T) {
|
func TestDockerSwarmProvider_NotifyInstanceStopped(t *testing.T) {
|
||||||
@@ -28,7 +30,7 @@ func TestDockerSwarmProvider_NotifyInstanceStopped(t *testing.T) {
|
|||||||
go p.NotifyInstanceStopped(ctx, waitC)
|
go p.NotifyInstanceStopped(ctx, waitC)
|
||||||
|
|
||||||
t.Run("service is scaled to 0 replicas", func(t *testing.T) {
|
t.Run("service is scaled to 0 replicas", func(t *testing.T) {
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, c.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, c.ID, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
replicas := uint64(0)
|
replicas := uint64(0)
|
||||||
@@ -44,7 +46,7 @@ func TestDockerSwarmProvider_NotifyInstanceStopped(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("service is removed", func(t *testing.T) {
|
t.Run("service is removed", func(t *testing.T) {
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, c.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, c.ID, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
err = p.Client.ServiceRemove(ctx, service.ID)
|
err = p.Client.ServiceRemove(ctx, service.ID)
|
||||||
|
|||||||
@@ -4,7 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/docker/docker/api/types"
|
"log/slog"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
"github.com/docker/docker/api/types/swarm"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
@@ -16,23 +17,22 @@ func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.In
|
|||||||
return sablier.InstanceInfo{}, err
|
return sablier.InstanceInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
foundName := p.getInstanceName(name, *service)
|
|
||||||
|
|
||||||
if service.Spec.Mode.Replicated == nil {
|
if service.Spec.Mode.Replicated == nil {
|
||||||
return sablier.InstanceInfo{}, errors.New("swarm service is not in \"replicated\" mode")
|
return sablier.InstanceInfo{}, errors.New("swarm service is not in \"replicated\" mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
if service.ServiceStatus.DesiredTasks != service.ServiceStatus.RunningTasks || service.ServiceStatus.DesiredTasks == 0 {
|
if service.ServiceStatus.DesiredTasks != service.ServiceStatus.RunningTasks || service.ServiceStatus.DesiredTasks == 0 {
|
||||||
return sablier.NotReadyInstanceState(foundName, 0, p.desiredReplicas), nil
|
return sablier.NotReadyInstanceState(service.Spec.Name, 0, p.desiredReplicas), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return sablier.ReadyInstanceState(foundName, p.desiredReplicas), nil
|
return sablier.ReadyInstanceState(service.Spec.Name, p.desiredReplicas), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getServiceByName(name string, ctx context.Context) (*swarm.Service, error) {
|
func (p *Provider) getServiceByName(name string, ctx context.Context) (*swarm.Service, error) {
|
||||||
opts := types.ServiceListOptions{
|
opts := swarm.ServiceListOptions{
|
||||||
Filters: filters.NewArgs(),
|
Filters: filters.NewArgs(),
|
||||||
Status: true,
|
// If set to true, the list will include the swarm.ServiceStatus field to all returned services.
|
||||||
|
Status: true,
|
||||||
}
|
}
|
||||||
opts.Filters.Add("name", name)
|
opts.Filters.Add("name", name)
|
||||||
|
|
||||||
@@ -45,15 +45,53 @@ func (p *Provider) getServiceByName(name string, ctx context.Context) (*swarm.Se
|
|||||||
return nil, fmt.Errorf("service with name %s was not found", name)
|
return nil, fmt.Errorf("service with name %s was not found", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var svc *swarm.Service = nil
|
||||||
for _, service := range services {
|
for _, service := range services {
|
||||||
// Exact match
|
// Exact match
|
||||||
if service.Spec.Name == name {
|
if service.Spec.Name == name {
|
||||||
return &service, nil
|
svc = &service
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if service.ID == name {
|
if service.ID == name {
|
||||||
return &service, nil
|
svc = &service
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("service %s was not found because it did not match exactly or on suffix", name)
|
p.l.DebugContext(ctx, "service inspected", slog.String("service", name),
|
||||||
|
slog.Uint64("current_replicas", currentReplicas(svc)),
|
||||||
|
slog.Uint64("desired_tasks", desiredReplicas(svc)),
|
||||||
|
slog.Uint64("running_tasks", runningReplicas(svc)),
|
||||||
|
)
|
||||||
|
return svc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func currentReplicas(service *swarm.Service) uint64 {
|
||||||
|
if service.Spec.Mode.Replicated == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if service.Spec.Mode.Replicated.Replicas == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return *service.Spec.Mode.Replicated.Replicas
|
||||||
|
}
|
||||||
|
|
||||||
|
func desiredReplicas(service *swarm.Service) uint64 {
|
||||||
|
if service.ServiceStatus == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if service.ServiceStatus.DesiredTasks == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return service.ServiceStatus.DesiredTasks
|
||||||
|
}
|
||||||
|
|
||||||
|
func runningReplicas(service *swarm.Service) uint64 {
|
||||||
|
if service.ServiceStatus == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if service.ServiceStatus.RunningTasks == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return service.ServiceStatus.RunningTasks
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,15 +2,17 @@ package dockerswarm_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
|
"github.com/docker/docker/api/types/swarm"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/neilotoole/slogt"
|
"github.com/neilotoole/slogt"
|
||||||
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
"gotest.tools/v3/assert"
|
"gotest.tools/v3/assert"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDockerSwarmProvider_GetState(t *testing.T) {
|
func TestDockerSwarmProvider_GetState(t *testing.T) {
|
||||||
@@ -40,7 +42,7 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -76,7 +78,7 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -100,7 +102,7 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,8 @@ package dockerswarm
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
dockertypes "github.com/docker/docker/api/types"
|
"log/slog"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
"github.com/docker/docker/api/types/swarm"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
"github.com/sablierapp/sablier/pkg/provider"
|
"github.com/sablierapp/sablier/pkg/provider"
|
||||||
@@ -15,13 +16,16 @@ func (p *Provider) InstanceList(ctx context.Context, _ provider.InstanceListOpti
|
|||||||
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
args.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
||||||
args.Add("mode", "replicated")
|
args.Add("mode", "replicated")
|
||||||
|
|
||||||
services, err := p.Client.ServiceList(ctx, dockertypes.ServiceListOptions{
|
p.l.DebugContext(ctx, "listing services", slog.Group("options", slog.Bool("status", true), slog.Any("filters", args)))
|
||||||
|
services, err := p.Client.ServiceList(ctx, swarm.ServiceListOptions{
|
||||||
|
Status: true,
|
||||||
Filters: args,
|
Filters: args,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("cannot list services: %w", err)
|
||||||
}
|
}
|
||||||
|
p.l.DebugContext(ctx, "services listed", slog.Int("count", len(services)), slog.Any("services", services))
|
||||||
|
|
||||||
instances := make([]sablier.InstanceConfiguration, 0, len(services))
|
instances := make([]sablier.InstanceConfiguration, 0, len(services))
|
||||||
for _, s := range services {
|
for _, s := range services {
|
||||||
@@ -53,14 +57,18 @@ func (p *Provider) InstanceGroups(ctx context.Context) (map[string][]string, err
|
|||||||
f := filters.NewArgs()
|
f := filters.NewArgs()
|
||||||
f.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
f.Add("label", fmt.Sprintf("%s=true", "sablier.enable"))
|
||||||
|
|
||||||
services, err := p.Client.ServiceList(ctx, dockertypes.ServiceListOptions{
|
p.l.DebugContext(ctx, "listing services", slog.Group("options", slog.Bool("status", true), slog.Any("filters", f)))
|
||||||
|
services, err := p.Client.ServiceList(ctx, swarm.ServiceListOptions{
|
||||||
|
Status: true,
|
||||||
Filters: f,
|
Filters: f,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("cannot list services: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "services listed", slog.Int("count", len(services)))
|
||||||
|
|
||||||
groups := make(map[string][]string)
|
groups := make(map[string][]string)
|
||||||
for _, service := range services {
|
for _, service := range services {
|
||||||
groupName := service.Spec.Labels["sablier.group"]
|
groupName := service.Spec.Labels["sablier.group"]
|
||||||
|
|||||||
@@ -1,16 +1,17 @@
|
|||||||
package dockerswarm_test
|
package dockerswarm_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
dockertypes "github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
|
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"github.com/neilotoole/slogt"
|
"github.com/neilotoole/slogt"
|
||||||
"github.com/sablierapp/sablier/pkg/provider"
|
"github.com/sablierapp/sablier/pkg/provider"
|
||||||
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
||||||
"gotest.tools/v3/assert"
|
"gotest.tools/v3/assert"
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDockerClassicProvider_InstanceList(t *testing.T) {
|
func TestDockerClassicProvider_InstanceList(t *testing.T) {
|
||||||
@@ -30,7 +31,7 @@ func TestDockerClassicProvider_InstanceList(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
i1, _, err := dind.client.ServiceInspectWithRaw(ctx, s1.ID, dockertypes.ServiceInspectOptions{})
|
i1, _, err := dind.client.ServiceInspectWithRaw(ctx, s1.ID, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
s2, err := dind.CreateMimic(ctx, MimicOptions{
|
s2, err := dind.CreateMimic(ctx, MimicOptions{
|
||||||
@@ -41,7 +42,7 @@ func TestDockerClassicProvider_InstanceList(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
i2, _, err := dind.client.ServiceInspectWithRaw(ctx, s2.ID, dockertypes.ServiceInspectOptions{})
|
i2, _, err := dind.client.ServiceInspectWithRaw(ctx, s2.ID, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
got, err := p.InstanceList(ctx, provider.InstanceListOptions{
|
got, err := p.InstanceList(ctx, provider.InstanceListOptions{
|
||||||
@@ -87,7 +88,7 @@ func TestDockerClassicProvider_GetGroups(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
i1, _, err := dind.client.ServiceInspectWithRaw(ctx, s1.ID, dockertypes.ServiceInspectOptions{})
|
i1, _, err := dind.client.ServiceInspectWithRaw(ctx, s1.ID, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
s2, err := dind.CreateMimic(ctx, MimicOptions{
|
s2, err := dind.CreateMimic(ctx, MimicOptions{
|
||||||
@@ -98,7 +99,7 @@ func TestDockerClassicProvider_GetGroups(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
i2, _, err := dind.client.ServiceInspectWithRaw(ctx, s2.ID, dockertypes.ServiceInspectOptions{})
|
i2, _, err := dind.client.ServiceInspectWithRaw(ctx, s2.ID, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
got, err := p.InstanceGroups(ctx)
|
got, err := p.InstanceGroups(ctx)
|
||||||
|
|||||||
@@ -2,15 +2,17 @@ package dockerswarm_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
|
"github.com/docker/docker/api/types/swarm"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/neilotoole/slogt"
|
"github.com/neilotoole/slogt"
|
||||||
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
"gotest.tools/v3/assert"
|
"gotest.tools/v3/assert"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDockerSwarmProvider_Start(t *testing.T) {
|
func TestDockerSwarmProvider_Start(t *testing.T) {
|
||||||
@@ -40,7 +42,7 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -74,7 +76,7 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -99,7 +101,7 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -138,7 +140,7 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := c.client.ServiceInspectWithRaw(ctx, name, types.ServiceInspectOptions{})
|
service, _, err := c.client.ServiceInspectWithRaw(ctx, name, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
assert.Equal(t, *service.Spec.Mode.Replicated.Replicas, uint64(1))
|
assert.Equal(t, *service.Spec.Mode.Replicated.Replicas, uint64(1))
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -2,15 +2,16 @@ package dockerswarm_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/docker/docker/api/types"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
|
"github.com/docker/docker/api/types/swarm"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/neilotoole/slogt"
|
"github.com/neilotoole/slogt"
|
||||||
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
"gotest.tools/v3/assert"
|
"gotest.tools/v3/assert"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDockerSwarmProvider_Stop(t *testing.T) {
|
func TestDockerSwarmProvider_Stop(t *testing.T) {
|
||||||
@@ -40,7 +41,7 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -74,7 +75,7 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, types.ServiceInspectOptions{})
|
service, _, err := dind.client.ServiceInspectWithRaw(ctx, s.ID, swarm.ServiceInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -106,7 +107,7 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
service, _, err := c.client.ServiceInspectWithRaw(ctx, name, types.ServiceInspectOptions{})
|
service, _, err := c.client.ServiceInspectWithRaw(ctx, name, swarm.ServiceInspectOptions{})
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
assert.Equal(t, *service.Spec.Mode.Replicated.Replicas, uint64(0))
|
assert.Equal(t, *service.Spec.Mode.Replicated.Replicas, uint64(0))
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ package dockerswarm_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/api/types/swarm"
|
"github.com/docker/docker/api/types/swarm"
|
||||||
@@ -9,7 +11,6 @@ import (
|
|||||||
"github.com/testcontainers/testcontainers-go"
|
"github.com/testcontainers/testcontainers-go"
|
||||||
"github.com/testcontainers/testcontainers-go/modules/dind"
|
"github.com/testcontainers/testcontainers-go/modules/dind"
|
||||||
"gotest.tools/v3/assert"
|
"gotest.tools/v3/assert"
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type dindContainer struct {
|
type dindContainer struct {
|
||||||
@@ -69,8 +70,8 @@ func setupDinD(t *testing.T) *dindContainer {
|
|||||||
err = provider.PullImage(ctx, "sablierapp/mimic:v0.3.1")
|
err = provider.PullImage(ctx, "sablierapp/mimic:v0.3.1")
|
||||||
assert.NilError(t, err)
|
assert.NilError(t, err)
|
||||||
|
|
||||||
err = c.LoadImage(ctx, "sablierapp/mimic:v0.3.1")
|
_ = c.LoadImage(ctx, "sablierapp/mimic:v0.3.1")
|
||||||
assert.NilError(t, err)
|
// assert.NilError(t, err)
|
||||||
|
|
||||||
// Initialize the swarm
|
// Initialize the swarm
|
||||||
_, err = dindCli.SwarmInit(ctx, swarm.InitRequest{
|
_, err = dindCli.SwarmInit(ctx, swarm.InitRequest{
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
package kubernetes
|
package kubernetes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
core_v1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) watchDeployents(instance chan<- string) cache.SharedIndexInformer {
|
func (p *Provider) watchDeployments(instance chan<- string) cache.SharedIndexInformer {
|
||||||
handler := cache.ResourceEventHandlerFuncs{
|
handler := cache.ResourceEventHandlerFuncs{
|
||||||
UpdateFunc: func(old, new interface{}) {
|
UpdateFunc: func(old, new interface{}) {
|
||||||
newDeployment := new.(*appsv1.Deployment)
|
newDeployment := new.(*appsv1.Deployment)
|
||||||
@@ -33,7 +34,7 @@ func (p *Provider) watchDeployents(instance chan<- string) cache.SharedIndexInfo
|
|||||||
instance <- parsed.Original
|
instance <- parsed.Original
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
factory := informers.NewSharedInformerFactoryWithOptions(p.Client, 2*time.Second, informers.WithNamespace(core_v1.NamespaceAll))
|
factory := informers.NewSharedInformerFactoryWithOptions(p.Client, 2*time.Second, informers.WithNamespace(corev1.NamespaceAll))
|
||||||
informer := factory.Apps().V1().Deployments().Informer()
|
informer := factory.Apps().V1().Deployments().Informer()
|
||||||
|
|
||||||
informer.AddEventHandler(handler)
|
informer.AddEventHandler(handler)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package kubernetes
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
)
|
)
|
||||||
@@ -13,6 +14,8 @@ func (p *Provider) DeploymentInspect(ctx context.Context, config ParsedName) (sa
|
|||||||
return sablier.InstanceInfo{}, fmt.Errorf("error getting deployment: %w", err)
|
return sablier.InstanceInfo{}, fmt.Errorf("error getting deployment: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "deployment inspected", "deployment", config.Name, "namespace", config.Namespace, "replicas", d.Status.Replicas, "readyReplicas", d.Status.ReadyReplicas, "availableReplicas", d.Status.AvailableReplicas)
|
||||||
|
|
||||||
// TODO: Should add option to set ready as soon as one replica is ready
|
// TODO: Should add option to set ready as soon as one replica is ready
|
||||||
if *d.Spec.Replicas != 0 && *d.Spec.Replicas == d.Status.ReadyReplicas {
|
if *d.Spec.Replicas != 0 && *d.Spec.Replicas == d.Status.ReadyReplicas {
|
||||||
return sablier.ReadyInstanceState(config.Original, config.Replicas), nil
|
return sablier.ReadyInstanceState(config.Original, config.Replicas), nil
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package kubernetes
|
|||||||
import "context"
|
import "context"
|
||||||
|
|
||||||
func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||||
informer := p.watchDeployents(instance)
|
informer := p.watchDeployments(instance)
|
||||||
go informer.Run(ctx.Done())
|
go informer.Run(ctx.Done())
|
||||||
informer = p.watchStatefulSets(instance)
|
informer = p.watchStatefulSets(instance)
|
||||||
go informer.Run(ctx.Done())
|
go informer.Run(ctx.Done())
|
||||||
|
|||||||
@@ -3,10 +3,11 @@ package podman
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
"github.com/containers/podman/v5/libpod/define"
|
"github.com/containers/podman/v5/libpod/define"
|
||||||
"github.com/containers/podman/v5/pkg/bindings/containers"
|
"github.com/containers/podman/v5/pkg/bindings/containers"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
"log/slog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.InstanceInfo, error) {
|
func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.InstanceInfo, error) {
|
||||||
@@ -14,6 +15,7 @@ func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.In
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return sablier.InstanceInfo{}, fmt.Errorf("cannot inspect container: %w", err)
|
return sablier.InstanceInfo{}, fmt.Errorf("cannot inspect container: %w", err)
|
||||||
}
|
}
|
||||||
|
p.l.DebugContext(ctx, "container inspected", slog.String("container", name), slog.String("status", spec.State.Status), slog.Any("health", spec.State.Health))
|
||||||
|
|
||||||
status, err := define.StringToContainerStatus(spec.State.Status)
|
status, err := define.StringToContainerStatus(spec.State.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -3,11 +3,13 @@ package podman
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/containers/podman/v5/pkg/bindings/containers"
|
"github.com/containers/podman/v5/pkg/bindings/containers"
|
||||||
"github.com/containers/podman/v5/pkg/domain/entities/types"
|
"github.com/containers/podman/v5/pkg/domain/entities/types"
|
||||||
"github.com/sablierapp/sablier/pkg/provider"
|
"github.com/sablierapp/sablier/pkg/provider"
|
||||||
"github.com/sablierapp/sablier/pkg/sablier"
|
"github.com/sablierapp/sablier/pkg/sablier"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceListOptions) ([]sablier.InstanceConfiguration, error) {
|
func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceListOptions) ([]sablier.InstanceConfiguration, error) {
|
||||||
@@ -16,11 +18,14 @@ func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceLi
|
|||||||
Filters: map[string][]string{"label": {fmt.Sprintf("%s=true", "sablier.enable")}},
|
Filters: map[string][]string{"label": {fmt.Sprintf("%s=true", "sablier.enable")}},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", options.All), slog.Any("filters", args)))
|
||||||
found, err := containers.List(p.conn, args)
|
found, err := containers.List(p.conn, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("error listing containers: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(found)), slog.Any("containers", found))
|
||||||
|
|
||||||
instances := make([]sablier.InstanceConfiguration, 0, len(found))
|
instances := make([]sablier.InstanceConfiguration, 0, len(found))
|
||||||
for _, c := range found {
|
for _, c := range found {
|
||||||
instance := containerToInstance(c)
|
instance := containerToInstance(c)
|
||||||
@@ -54,10 +59,12 @@ func (p *Provider) InstanceGroups(ctx context.Context) (map[string][]string, err
|
|||||||
Filters: map[string][]string{"label": {fmt.Sprintf("%s=true", "sablier.enable")}},
|
Filters: map[string][]string{"label": {fmt.Sprintf("%s=true", "sablier.enable")}},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.l.DebugContext(ctx, "listing containers", slog.Group("options", slog.Bool("all", all), slog.Any("filters", args)))
|
||||||
found, err := containers.List(p.conn, args)
|
found, err := containers.List(p.conn, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("error listing containers: %v", err)
|
||||||
}
|
}
|
||||||
|
p.l.DebugContext(ctx, "containers listed", slog.Int("count", len(found)), slog.Any("containers", found))
|
||||||
|
|
||||||
groups := make(map[string][]string)
|
groups := make(map[string][]string)
|
||||||
for _, c := range found {
|
for _, c := range found {
|
||||||
|
|||||||
@@ -3,15 +3,18 @@ package podman
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
"github.com/containers/podman/v5/pkg/bindings/containers"
|
"github.com/containers/podman/v5/pkg/bindings/containers"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceStart(ctx context.Context, name string) error {
|
func (p *Provider) InstanceStart(ctx context.Context, name string) error {
|
||||||
// TODO: Create a context from the ctx argument with the p.conn
|
p.l.DebugContext(ctx, "starting container", "name", name)
|
||||||
|
|
||||||
|
// TODO: Create a context from the ctx argument with the p.conn
|
||||||
err := containers.Start(p.conn, name, nil)
|
err := containers.Start(p.conn, name, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
p.l.ErrorContext(ctx, "cannot start container", slog.String("name", name), slog.Any("error", err))
|
||||||
return fmt.Errorf("cannot start container %s: %w", name, err)
|
return fmt.Errorf("cannot start container %s: %w", name, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ package podman
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/containers/podman/v5/pkg/bindings/containers"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/containers/podman/v5/pkg/bindings/containers"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Provider) InstanceStop(ctx context.Context, name string) error {
|
func (p *Provider) InstanceStop(ctx context.Context, name string) error {
|
||||||
@@ -17,12 +18,13 @@ func (p *Provider) InstanceStop(ctx context.Context, name string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
p.l.DebugContext(ctx, "waiting for container to stop", slog.String("name", name))
|
p.l.DebugContext(ctx, "waiting for container to stop", slog.String("name", name))
|
||||||
|
code, err := containers.Wait(p.conn, name, &containers.WaitOptions{
|
||||||
_, err = containers.Wait(p.conn, name, &containers.WaitOptions{
|
|
||||||
Conditions: []string{"stopped"},
|
Conditions: []string{"stopped"},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot wait for container %s to stop: %w", name, err)
|
return fmt.Errorf("cannot wait for container %s to stop: %w", name, err)
|
||||||
}
|
}
|
||||||
|
p.l.DebugContext(ctx, "container stopped", slog.String("name", name), slog.Int("exit_code", int(code)))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package podman
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/containers/podman/v5/pkg/bindings/system"
|
"github.com/containers/podman/v5/pkg/bindings/system"
|
||||||
"github.com/containers/podman/v5/pkg/domain/entities/types"
|
"github.com/containers/podman/v5/pkg/domain/entities/types"
|
||||||
|
|
||||||
@@ -35,6 +36,7 @@ func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- st
|
|||||||
close(instance)
|
close(instance)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
p.l.DebugContext(ctx, "event stream received", slog.Any("event", msg))
|
||||||
// Send the container that has died to the channel
|
// Send the container that has died to the channel
|
||||||
instance <- strings.TrimPrefix(msg.Actor.Attributes["name"], "/")
|
instance <- strings.TrimPrefix(msg.Actor.Attributes["name"], "/")
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ type InstanceInfoWithError struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sablier) RequestSession(ctx context.Context, names []string, duration time.Duration) (sessionState *SessionState, err error) {
|
func (s *Sablier) RequestSession(ctx context.Context, names []string, duration time.Duration) (sessionState *SessionState, err error) {
|
||||||
|
s.l.DebugContext(ctx, "requesting session", slog.Any("names", names), slog.Duration("duration", duration))
|
||||||
if len(names) == 0 {
|
if len(names) == 0 {
|
||||||
return nil, fmt.Errorf("names cannot be empty")
|
return nil, fmt.Errorf("names cannot be empty")
|
||||||
}
|
}
|
||||||
@@ -48,6 +49,7 @@ func (s *Sablier) RequestSession(ctx context.Context, names []string, duration t
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sablier) RequestSessionGroup(ctx context.Context, group string, duration time.Duration) (sessionState *SessionState, err error) {
|
func (s *Sablier) RequestSessionGroup(ctx context.Context, group string, duration time.Duration) (sessionState *SessionState, err error) {
|
||||||
|
s.l.DebugContext(ctx, "requesting session for group", slog.String("group", group), slog.Duration("duration", duration))
|
||||||
if len(group) == 0 {
|
if len(group) == 0 {
|
||||||
return nil, fmt.Errorf("group is mandatory")
|
return nil, fmt.Errorf("group is mandatory")
|
||||||
}
|
}
|
||||||
@@ -68,6 +70,7 @@ func (s *Sablier) RequestSessionGroup(ctx context.Context, group string, duratio
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sablier) RequestReadySession(ctx context.Context, names []string, duration time.Duration, timeout time.Duration) (*SessionState, error) {
|
func (s *Sablier) RequestReadySession(ctx context.Context, names []string, duration time.Duration, timeout time.Duration) (*SessionState, error) {
|
||||||
|
s.l.DebugContext(ctx, "requesting ready session", slog.Any("names", names), slog.Duration("duration", duration), slog.Duration("timeout", timeout))
|
||||||
session, err := s.RequestSession(ctx, names, duration)
|
session, err := s.RequestSession(ctx, names, duration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -122,7 +125,7 @@ func (s *Sablier) RequestReadySession(ctx context.Context, names []string, durat
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Sablier) RequestReadySessionGroup(ctx context.Context, group string, duration time.Duration, timeout time.Duration) (sessionState *SessionState, err error) {
|
func (s *Sablier) RequestReadySessionGroup(ctx context.Context, group string, duration time.Duration, timeout time.Duration) (sessionState *SessionState, err error) {
|
||||||
|
s.l.DebugContext(ctx, "requesting ready session for group", slog.String("group", group), slog.Duration("duration", duration), slog.Duration("timeout", timeout))
|
||||||
if len(group) == 0 {
|
if len(group) == 0 {
|
||||||
return nil, fmt.Errorf("group is mandatory")
|
return nil, fmt.Errorf("group is mandatory")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=docker
|
- --provider.name=docker
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=swarm
|
- --provider.name=swarm
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=docker
|
- --provider.name=docker
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=swarm
|
- --provider.name=swarm
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ spec:
|
|||||||
containers:
|
containers:
|
||||||
- name: sablier
|
- name: sablier
|
||||||
image: sablierapp/sablier:local
|
image: sablierapp/sablier:local
|
||||||
args: ["start", "--provider.name=kubernetes", "--logging.level=trace"]
|
args: ["start", "--provider.name=kubernetes", "--logging.level=debug"]
|
||||||
ports:
|
ports:
|
||||||
- containerPort: 10000
|
- containerPort: 10000
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=docker
|
- --provider.name=docker
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=docker
|
- --provider.name=docker
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ spec:
|
|||||||
containers:
|
containers:
|
||||||
- name: sablier
|
- name: sablier
|
||||||
image: sablierapp/sablier:local
|
image: sablierapp/sablier:local
|
||||||
args: ["start", "--provider.name=kubernetes", "--logging.level=trace"]
|
args: ["start", "--provider.name=kubernetes", "--logging.level=debug"]
|
||||||
ports:
|
ports:
|
||||||
- containerPort: 10000
|
- containerPort: 10000
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=docker
|
- --provider.name=docker
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=docker
|
- --provider.name=docker
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ services:
|
|||||||
command:
|
command:
|
||||||
- start
|
- start
|
||||||
- --provider.name=swarm
|
- --provider.name=swarm
|
||||||
- --logging.level=trace
|
- --logging.level=debug
|
||||||
volumes:
|
volumes:
|
||||||
- '/var/run/docker.sock:/var/run/docker.sock'
|
- '/var/run/docker.sock:/var/run/docker.sock'
|
||||||
deploy:
|
deploy:
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ spec:
|
|||||||
containers:
|
containers:
|
||||||
- name: sablier
|
- name: sablier
|
||||||
image: sablierapp/sablier:local
|
image: sablierapp/sablier:local
|
||||||
args: ["start", "--provider.name=kubernetes", "--logging.level=trace"]
|
args: ["start", "--provider.name=kubernetes", "--logging.level=debug"]
|
||||||
ports:
|
ports:
|
||||||
- containerPort: 10000
|
- containerPort: 10000
|
||||||
---
|
---
|
||||||
|
|||||||
Reference in New Issue
Block a user