mirror of
https://github.com/sablierapp/sablier.git
synced 2025-12-24 14:31:51 +01:00
refactor(logging): use slog instead of logrus (#501)
Everything uses slog now and the logger is part of every struct
This commit is contained in:
2
Makefile
2
Makefile
@@ -18,7 +18,7 @@ $(PLATFORMS):
|
||||
CGO_ENABLED=0 GOOS=$(os) GOARCH=$(arch) go build -trimpath -tags=nomsgpack -v -ldflags="${GO_LDFLAGS}" -o 'sablier_$(VERSION)_$(os)-$(arch)' .
|
||||
|
||||
run:
|
||||
go run main.go start
|
||||
go run main.go start --storage.file=state.json --logging.level=debug
|
||||
|
||||
generate:
|
||||
go generate ./..
|
||||
|
||||
@@ -5,18 +5,15 @@ import (
|
||||
"errors"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
"github.com/sablierapp/sablier/pkg/store"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
// StopAllUnregisteredInstances stops all auto-discovered running instances that are not yet registered
|
||||
// as running instances by Sablier.
|
||||
// By default, Sablier does not stop all already running instances. Meaning that you need to make an
|
||||
// initial request in order to trigger the scaling to zero.
|
||||
func StopAllUnregisteredInstances(ctx context.Context, provider providers.Provider, s store.Store) error {
|
||||
log.Info("Stopping all unregistered running instances")
|
||||
|
||||
log.Tracef("Retrieving all instances with label [%v=true]", LabelEnable)
|
||||
func StopAllUnregisteredInstances(ctx context.Context, provider providers.Provider, s store.Store, logger *slog.Logger) error {
|
||||
instances, err := provider.InstanceList(ctx, providers.InstanceListOptions{
|
||||
All: false, // Only running containers
|
||||
Labels: []string{LabelEnable},
|
||||
@@ -25,7 +22,6 @@ func StopAllUnregisteredInstances(ctx context.Context, provider providers.Provid
|
||||
return err
|
||||
}
|
||||
|
||||
log.Tracef("Found %v instances with label [%v=true]", len(instances), LabelEnable)
|
||||
unregistered := make([]string, 0)
|
||||
for _, instance := range instances {
|
||||
_, err = s.Get(ctx, instance.Name)
|
||||
@@ -34,29 +30,25 @@ func StopAllUnregisteredInstances(ctx context.Context, provider providers.Provid
|
||||
}
|
||||
}
|
||||
|
||||
log.Tracef("Found %v unregistered instances ", len(instances))
|
||||
logger.DebugContext(ctx, "found instances to stop", slog.Any("instances", unregistered))
|
||||
|
||||
waitGroup := errgroup.Group{}
|
||||
|
||||
// Previously, the variables declared by a “for” loop were created once and updated by each iteration.
|
||||
// In Go 1.22, each iteration of the loop creates new variables, to avoid accidental sharing bugs.
|
||||
// The transition support tooling described in the proposal continues to work in the same way it did in Go 1.21.
|
||||
for _, name := range unregistered {
|
||||
waitGroup.Go(stopFunc(ctx, name, provider))
|
||||
waitGroup.Go(stopFunc(ctx, name, provider, logger))
|
||||
}
|
||||
|
||||
return waitGroup.Wait()
|
||||
}
|
||||
|
||||
func stopFunc(ctx context.Context, name string, provider providers.Provider) func() error {
|
||||
func stopFunc(ctx context.Context, name string, provider providers.Provider, logger *slog.Logger) func() error {
|
||||
return func() error {
|
||||
log.Tracef("Stopping %v...", name)
|
||||
err := provider.Stop(ctx, name)
|
||||
if err != nil {
|
||||
log.Errorf("Could not stop %v: %v", name, err)
|
||||
logger.ErrorContext(ctx, "failed to stop instance", slog.String("instance", name), slog.Any("error", err))
|
||||
return err
|
||||
}
|
||||
log.Tracef("Successfully stopped %v", name)
|
||||
logger.InfoContext(ctx, "stopped unregistered instance", slog.String("instance", name), slog.String("reason", "instance is enabled but not started by Sablier"))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package discovery_test
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/neilotoole/slogt"
|
||||
"github.com/sablierapp/sablier/app/discovery"
|
||||
"github.com/sablierapp/sablier/app/instance"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
@@ -39,7 +40,7 @@ func TestStopAllUnregisteredInstances(t *testing.T) {
|
||||
mockProvider.On("Stop", ctx, "instance3").Return(nil)
|
||||
|
||||
// Call the function under test
|
||||
err = discovery.StopAllUnregisteredInstances(ctx, mockProvider, store)
|
||||
err = discovery.StopAllUnregisteredInstances(ctx, mockProvider, store, slogt.New(t))
|
||||
assert.NilError(t, err)
|
||||
|
||||
// Check expectations
|
||||
@@ -71,7 +72,7 @@ func TestStopAllUnregisteredInstances_WithError(t *testing.T) {
|
||||
mockProvider.On("Stop", ctx, "instance3").Return(nil)
|
||||
|
||||
// Call the function under test
|
||||
err = discovery.StopAllUnregisteredInstances(ctx, mockProvider, store)
|
||||
err = discovery.StopAllUnregisteredInstances(ctx, mockProvider, store, slogt.New(t))
|
||||
assert.Error(t, err, "stop error")
|
||||
|
||||
// Check expectations
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
func TestGetVersion(t *testing.T) {
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
version.Branch = "testing"
|
||||
version.Revision = "8ffebca"
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package instance
|
||||
|
||||
import log "github.com/sirupsen/logrus"
|
||||
|
||||
var Ready = "ready"
|
||||
var NotReady = "not-ready"
|
||||
var Unrecoverable = "unrecoverable"
|
||||
@@ -19,7 +17,6 @@ func (instance State) IsReady() bool {
|
||||
}
|
||||
|
||||
func ErrorInstanceState(name string, err error, desiredReplicas int32) (State, error) {
|
||||
log.Error(err.Error())
|
||||
return State{
|
||||
Name: name,
|
||||
CurrentReplicas: 0,
|
||||
@@ -30,7 +27,6 @@ func ErrorInstanceState(name string, err error, desiredReplicas int32) (State, e
|
||||
}
|
||||
|
||||
func UnrecoverableInstanceState(name string, message string, desiredReplicas int32) State {
|
||||
log.Warn(message)
|
||||
return State{
|
||||
Name: name,
|
||||
CurrentReplicas: 0,
|
||||
|
||||
39
app/logger.go
Normal file
39
app/logger.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"github.com/lmittmann/tint"
|
||||
"github.com/sablierapp/sablier/config"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func setupLogger(config config.Logging) *slog.Logger {
|
||||
w := os.Stderr
|
||||
level := parseLogLevel(config.Level)
|
||||
// create a new logger
|
||||
logger := slog.New(tint.NewHandler(w, &tint.Options{
|
||||
Level: level,
|
||||
TimeFormat: time.Kitchen,
|
||||
AddSource: true,
|
||||
}))
|
||||
|
||||
return logger
|
||||
}
|
||||
|
||||
func parseLogLevel(level string) slog.Level {
|
||||
switch strings.ToUpper(level) {
|
||||
case slog.LevelDebug.String():
|
||||
return slog.LevelDebug
|
||||
case slog.LevelInfo.String():
|
||||
return slog.LevelInfo
|
||||
case slog.LevelWarn.String():
|
||||
return slog.LevelWarn
|
||||
case slog.LevelError.String():
|
||||
return slog.LevelError
|
||||
default:
|
||||
slog.Warn("invalid log level, defaulting to info", slog.String("level", level))
|
||||
return slog.LevelInfo
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/sablierapp/sablier/app/discovery"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
@@ -16,7 +17,6 @@ import (
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/sablierapp/sablier/app/instance"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Interface guard
|
||||
@@ -25,32 +25,37 @@ var _ providers.Provider = (*DockerClassicProvider)(nil)
|
||||
type DockerClassicProvider struct {
|
||||
Client client.APIClient
|
||||
desiredReplicas int32
|
||||
l *slog.Logger
|
||||
}
|
||||
|
||||
func NewDockerClassicProvider() (*DockerClassicProvider, error) {
|
||||
func NewDockerClassicProvider(ctx context.Context, logger *slog.Logger) (*DockerClassicProvider, error) {
|
||||
logger = logger.With(slog.String("provider", "docker"))
|
||||
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create docker client: %v", err)
|
||||
}
|
||||
|
||||
serverVersion, err := cli.ServerVersion(context.Background())
|
||||
serverVersion, err := cli.ServerVersion(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot connect to docker host: %v", err)
|
||||
}
|
||||
|
||||
log.Tracef("connection established with docker %s (API %s)", serverVersion.Version, serverVersion.APIVersion)
|
||||
|
||||
logger.InfoContext(ctx, "connection established with docker",
|
||||
slog.String("version", serverVersion.Version),
|
||||
slog.String("api_version", serverVersion.APIVersion),
|
||||
)
|
||||
return &DockerClassicProvider{
|
||||
Client: cli,
|
||||
desiredReplicas: 1,
|
||||
l: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (provider *DockerClassicProvider) GetGroups(ctx context.Context) (map[string][]string, error) {
|
||||
func (p *DockerClassicProvider) GetGroups(ctx context.Context) (map[string][]string, error) {
|
||||
args := filters.NewArgs()
|
||||
args.Add("label", fmt.Sprintf("%s=true", discovery.LabelEnable))
|
||||
|
||||
containers, err := provider.Client.ContainerList(ctx, container.ListOptions{
|
||||
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
|
||||
All: true,
|
||||
Filters: args,
|
||||
})
|
||||
@@ -70,21 +75,19 @@ func (provider *DockerClassicProvider) GetGroups(ctx context.Context) (map[strin
|
||||
groups[groupName] = group
|
||||
}
|
||||
|
||||
log.Debug(fmt.Sprintf("%v", groups))
|
||||
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (provider *DockerClassicProvider) Start(ctx context.Context, name string) error {
|
||||
return provider.Client.ContainerStart(ctx, name, container.StartOptions{})
|
||||
func (p *DockerClassicProvider) Start(ctx context.Context, name string) error {
|
||||
return p.Client.ContainerStart(ctx, name, container.StartOptions{})
|
||||
}
|
||||
|
||||
func (provider *DockerClassicProvider) Stop(ctx context.Context, name string) error {
|
||||
return provider.Client.ContainerStop(ctx, name, container.StopOptions{})
|
||||
func (p *DockerClassicProvider) Stop(ctx context.Context, name string) error {
|
||||
return p.Client.ContainerStop(ctx, name, container.StopOptions{})
|
||||
}
|
||||
|
||||
func (provider *DockerClassicProvider) GetState(ctx context.Context, name string) (instance.State, error) {
|
||||
spec, err := provider.Client.ContainerInspect(ctx, name)
|
||||
func (p *DockerClassicProvider) GetState(ctx context.Context, name string) (instance.State, error) {
|
||||
spec, err := p.Client.ContainerInspect(ctx, name)
|
||||
if err != nil {
|
||||
return instance.State{}, err
|
||||
}
|
||||
@@ -92,38 +95,38 @@ func (provider *DockerClassicProvider) GetState(ctx context.Context, name string
|
||||
// "created", "running", "paused", "restarting", "removing", "exited", or "dead"
|
||||
switch spec.State.Status {
|
||||
case "created", "paused", "restarting", "removing":
|
||||
return instance.NotReadyInstanceState(name, 0, provider.desiredReplicas), nil
|
||||
return instance.NotReadyInstanceState(name, 0, p.desiredReplicas), nil
|
||||
case "running":
|
||||
if spec.State.Health != nil {
|
||||
// // "starting", "healthy" or "unhealthy"
|
||||
if spec.State.Health.Status == "healthy" {
|
||||
return instance.ReadyInstanceState(name, provider.desiredReplicas), nil
|
||||
return instance.ReadyInstanceState(name, p.desiredReplicas), nil
|
||||
} else if spec.State.Health.Status == "unhealthy" {
|
||||
if len(spec.State.Health.Log) >= 1 {
|
||||
lastLog := spec.State.Health.Log[len(spec.State.Health.Log)-1]
|
||||
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container is unhealthy: %s (%d)", lastLog.Output, lastLog.ExitCode), provider.desiredReplicas), nil
|
||||
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container is unhealthy: %s (%d)", lastLog.Output, lastLog.ExitCode), p.desiredReplicas), nil
|
||||
} else {
|
||||
return instance.UnrecoverableInstanceState(name, "container is unhealthy: no log available", provider.desiredReplicas), nil
|
||||
return instance.UnrecoverableInstanceState(name, "container is unhealthy: no log available", p.desiredReplicas), nil
|
||||
}
|
||||
} else {
|
||||
return instance.NotReadyInstanceState(name, 0, provider.desiredReplicas), nil
|
||||
return instance.NotReadyInstanceState(name, 0, p.desiredReplicas), nil
|
||||
}
|
||||
}
|
||||
return instance.ReadyInstanceState(name, provider.desiredReplicas), nil
|
||||
return instance.ReadyInstanceState(name, p.desiredReplicas), nil
|
||||
case "exited":
|
||||
if spec.State.ExitCode != 0 {
|
||||
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container exited with code \"%d\"", spec.State.ExitCode), provider.desiredReplicas), nil
|
||||
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container exited with code \"%d\"", spec.State.ExitCode), p.desiredReplicas), nil
|
||||
}
|
||||
return instance.NotReadyInstanceState(name, 0, provider.desiredReplicas), nil
|
||||
return instance.NotReadyInstanceState(name, 0, p.desiredReplicas), nil
|
||||
case "dead":
|
||||
return instance.UnrecoverableInstanceState(name, "container in \"dead\" state cannot be restarted", provider.desiredReplicas), nil
|
||||
return instance.UnrecoverableInstanceState(name, "container in \"dead\" state cannot be restarted", p.desiredReplicas), nil
|
||||
default:
|
||||
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), provider.desiredReplicas), nil
|
||||
return instance.UnrecoverableInstanceState(name, fmt.Sprintf("container status \"%s\" not handled", spec.State.Status), p.desiredReplicas), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *DockerClassicProvider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
|
||||
func (p *DockerClassicProvider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||
msgs, errs := p.Client.Events(ctx, types.EventsOptions{
|
||||
Filters: filters.NewArgs(
|
||||
filters.Arg("scope", "local"),
|
||||
filters.Arg("type", string(events.ContainerEventType)),
|
||||
@@ -134,21 +137,21 @@ func (provider *DockerClassicProvider) NotifyInstanceStopped(ctx context.Context
|
||||
select {
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
log.Error("provider event stream is closed")
|
||||
p.l.ErrorContext(ctx, "event stream closed")
|
||||
return
|
||||
}
|
||||
// Send the container that has died to the channel
|
||||
instance <- strings.TrimPrefix(msg.Actor.Attributes["name"], "/")
|
||||
case err, ok := <-errs:
|
||||
if !ok {
|
||||
log.Error("provider event stream is closed", err)
|
||||
p.l.ErrorContext(ctx, "event stream closed")
|
||||
return
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
log.Debug("provider event stream closed")
|
||||
p.l.ErrorContext(ctx, "event stream closed")
|
||||
return
|
||||
}
|
||||
log.Error("provider event stream error", err)
|
||||
p.l.ErrorContext(ctx, "event stream error", slog.Any("error", err))
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package docker
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/neilotoole/slogt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@@ -13,6 +15,15 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func setupProvider(t *testing.T, client client.APIClient) *DockerClassicProvider {
|
||||
t.Helper()
|
||||
return &DockerClassicProvider{
|
||||
Client: client,
|
||||
desiredReplicas: 1,
|
||||
l: slogt.New(t),
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerClassicProvider_GetState(t *testing.T) {
|
||||
type fields struct {
|
||||
Client *mocks.DockerAPIClientMock
|
||||
@@ -235,10 +246,7 @@ func TestDockerClassicProvider_GetState(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider := &DockerClassicProvider{
|
||||
Client: tt.fields.Client,
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, tt.fields.Client)
|
||||
|
||||
tt.fields.Client.On("ContainerInspect", mock.Anything, mock.Anything).Return(tt.containerSpec, tt.err)
|
||||
|
||||
@@ -293,10 +301,7 @@ func TestDockerClassicProvider_Stop(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider := &DockerClassicProvider{
|
||||
Client: tt.fields.Client,
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, tt.fields.Client)
|
||||
|
||||
tt.fields.Client.On("ContainerStop", mock.Anything, mock.Anything, mock.Anything).Return(tt.err)
|
||||
|
||||
@@ -348,10 +353,7 @@ func TestDockerClassicProvider_Start(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider := &DockerClassicProvider{
|
||||
Client: tt.fields.Client,
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, tt.fields.Client)
|
||||
|
||||
tt.fields.Client.On("ContainerStart", mock.Anything, mock.Anything, mock.Anything).Return(tt.err)
|
||||
|
||||
@@ -382,10 +384,7 @@ func TestDockerClassicProvider_NotifyInstanceStopped(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider := &DockerClassicProvider{
|
||||
Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors),
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors))
|
||||
|
||||
instanceC := make(chan string, 1)
|
||||
|
||||
|
||||
@@ -12,14 +12,14 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (provider *DockerClassicProvider) InstanceList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
func (p *DockerClassicProvider) InstanceList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
args := filters.NewArgs()
|
||||
for _, label := range options.Labels {
|
||||
args.Add("label", label)
|
||||
args.Add("label", fmt.Sprintf("%s=true", label))
|
||||
}
|
||||
|
||||
containers, err := provider.Client.ContainerList(ctx, container.ListOptions{
|
||||
containers, err := p.Client.ContainerList(ctx, container.ListOptions{
|
||||
All: options.All,
|
||||
Filters: args,
|
||||
})
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/sablierapp/sablier/app/discovery"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
@@ -14,7 +15,6 @@ import (
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/sablierapp/sablier/app/instance"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Interface guard
|
||||
@@ -23,50 +23,57 @@ var _ providers.Provider = (*DockerSwarmProvider)(nil)
|
||||
type DockerSwarmProvider struct {
|
||||
Client client.APIClient
|
||||
desiredReplicas int32
|
||||
|
||||
l *slog.Logger
|
||||
}
|
||||
|
||||
func NewDockerSwarmProvider() (*DockerSwarmProvider, error) {
|
||||
func NewDockerSwarmProvider(ctx context.Context, logger *slog.Logger) (*DockerSwarmProvider, error) {
|
||||
logger = logger.With(slog.String("provider", "swarm"))
|
||||
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create docker client: %v", err)
|
||||
return nil, fmt.Errorf("cannot create docker client: %w", err)
|
||||
}
|
||||
|
||||
serverVersion, err := cli.ServerVersion(context.Background())
|
||||
serverVersion, err := cli.ServerVersion(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot connect to docker host: %v", err)
|
||||
return nil, fmt.Errorf("cannot connect to docker host: %w", err)
|
||||
}
|
||||
|
||||
log.Trace(fmt.Sprintf("connection established with docker %s (API %s)", serverVersion.Version, serverVersion.APIVersion))
|
||||
logger.InfoContext(ctx, "connection established with docker swarm",
|
||||
slog.String("version", serverVersion.Version),
|
||||
slog.String("api_version", serverVersion.APIVersion),
|
||||
)
|
||||
|
||||
return &DockerSwarmProvider{
|
||||
Client: cli,
|
||||
desiredReplicas: 1,
|
||||
l: logger,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) Start(ctx context.Context, name string) error {
|
||||
return provider.scale(ctx, name, uint64(provider.desiredReplicas))
|
||||
func (p *DockerSwarmProvider) Start(ctx context.Context, name string) error {
|
||||
return p.scale(ctx, name, uint64(p.desiredReplicas))
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) Stop(ctx context.Context, name string) error {
|
||||
return provider.scale(ctx, name, 0)
|
||||
func (p *DockerSwarmProvider) Stop(ctx context.Context, name string) error {
|
||||
return p.scale(ctx, name, 0)
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) scale(ctx context.Context, name string, replicas uint64) error {
|
||||
service, err := provider.getServiceByName(name, ctx)
|
||||
func (p *DockerSwarmProvider) scale(ctx context.Context, name string, replicas uint64) error {
|
||||
service, err := p.getServiceByName(name, ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
foundName := provider.getInstanceName(name, *service)
|
||||
foundName := p.getInstanceName(name, *service)
|
||||
if service.Spec.Mode.Replicated == nil {
|
||||
return errors.New("swarm service is not in \"replicated\" mode")
|
||||
}
|
||||
|
||||
service.Spec.Mode.Replicated.Replicas = &replicas
|
||||
|
||||
response, err := provider.Client.ServiceUpdate(ctx, service.ID, service.Meta.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||
response, err := p.Client.ServiceUpdate(ctx, service.ID, service.Meta.Version, service.Spec, types.ServiceUpdateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -78,12 +85,12 @@ func (provider *DockerSwarmProvider) scale(ctx context.Context, name string, rep
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) GetGroups(ctx context.Context) (map[string][]string, error) {
|
||||
filters := filters.NewArgs()
|
||||
filters.Add("label", fmt.Sprintf("%s=true", discovery.LabelEnable))
|
||||
func (p *DockerSwarmProvider) GetGroups(ctx context.Context) (map[string][]string, error) {
|
||||
f := filters.NewArgs()
|
||||
f.Add("label", fmt.Sprintf("%s=true", discovery.LabelEnable))
|
||||
|
||||
services, err := provider.Client.ServiceList(ctx, types.ServiceListOptions{
|
||||
Filters: filters,
|
||||
services, err := p.Client.ServiceList(ctx, types.ServiceListOptions{
|
||||
Filters: f,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
@@ -105,41 +112,41 @@ func (provider *DockerSwarmProvider) GetGroups(ctx context.Context) (map[string]
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) GetState(ctx context.Context, name string) (instance.State, error) {
|
||||
func (p *DockerSwarmProvider) GetState(ctx context.Context, name string) (instance.State, error) {
|
||||
|
||||
service, err := provider.getServiceByName(name, ctx)
|
||||
service, err := p.getServiceByName(name, ctx)
|
||||
if err != nil {
|
||||
return instance.State{}, err
|
||||
}
|
||||
|
||||
foundName := provider.getInstanceName(name, *service)
|
||||
foundName := p.getInstanceName(name, *service)
|
||||
|
||||
if service.Spec.Mode.Replicated == nil {
|
||||
return instance.State{}, errors.New("swarm service is not in \"replicated\" mode")
|
||||
}
|
||||
|
||||
if service.ServiceStatus.DesiredTasks != service.ServiceStatus.RunningTasks || service.ServiceStatus.DesiredTasks == 0 {
|
||||
return instance.NotReadyInstanceState(foundName, 0, provider.desiredReplicas), nil
|
||||
return instance.NotReadyInstanceState(foundName, 0, p.desiredReplicas), nil
|
||||
}
|
||||
|
||||
return instance.ReadyInstanceState(foundName, provider.desiredReplicas), nil
|
||||
return instance.ReadyInstanceState(foundName, p.desiredReplicas), nil
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) getServiceByName(name string, ctx context.Context) (*swarm.Service, error) {
|
||||
func (p *DockerSwarmProvider) getServiceByName(name string, ctx context.Context) (*swarm.Service, error) {
|
||||
opts := types.ServiceListOptions{
|
||||
Filters: filters.NewArgs(),
|
||||
Status: true,
|
||||
}
|
||||
opts.Filters.Add("name", name)
|
||||
|
||||
services, err := provider.Client.ServiceList(ctx, opts)
|
||||
services, err := p.Client.ServiceList(ctx, opts)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(services) == 0 {
|
||||
return nil, fmt.Errorf(fmt.Sprintf("service with name %s was not found", name))
|
||||
return nil, fmt.Errorf("service with name %s was not found", name)
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
@@ -149,10 +156,10 @@ func (provider *DockerSwarmProvider) getServiceByName(name string, ctx context.C
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf(fmt.Sprintf("service %s was not found because it did not match exactly or on suffix", name))
|
||||
return nil, fmt.Errorf("service %s was not found because it did not match exactly or on suffix", name)
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm.Service) string {
|
||||
func (p *DockerSwarmProvider) getInstanceName(name string, service swarm.Service) string {
|
||||
if name == service.Spec.Name {
|
||||
return name
|
||||
}
|
||||
@@ -160,8 +167,8 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm.
|
||||
return fmt.Sprintf("%s (%s)", name, service.Spec.Name)
|
||||
}
|
||||
|
||||
func (provider *DockerSwarmProvider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
|
||||
func (p *DockerSwarmProvider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||
msgs, errs := p.Client.Events(ctx, types.EventsOptions{
|
||||
Filters: filters.NewArgs(
|
||||
filters.Arg("scope", "swarm"),
|
||||
filters.Arg("type", "service"),
|
||||
@@ -173,7 +180,7 @@ func (provider *DockerSwarmProvider) NotifyInstanceStopped(ctx context.Context,
|
||||
select {
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
log.Error("provider event stream is closed")
|
||||
p.l.ErrorContext(ctx, "event stream closed")
|
||||
return
|
||||
}
|
||||
if msg.Actor.Attributes["replicas.new"] == "0" {
|
||||
@@ -183,14 +190,14 @@ func (provider *DockerSwarmProvider) NotifyInstanceStopped(ctx context.Context,
|
||||
}
|
||||
case err, ok := <-errs:
|
||||
if !ok {
|
||||
log.Error("provider event stream is closed", err)
|
||||
p.l.ErrorContext(ctx, "event stream closed")
|
||||
return
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
log.Debug("provider event stream closed")
|
||||
p.l.ErrorContext(ctx, "event stream closed")
|
||||
return
|
||||
}
|
||||
log.Error("provider event stream error", err)
|
||||
p.l.ErrorContext(ctx, "event stream error", slog.Any("error", err))
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ package dockerswarm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/neilotoole/slogt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@@ -12,6 +14,15 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func setupProvider(t *testing.T, client client.APIClient) *DockerSwarmProvider {
|
||||
t.Helper()
|
||||
return &DockerSwarmProvider{
|
||||
Client: client,
|
||||
desiredReplicas: 1,
|
||||
l: slogt.New(t),
|
||||
}
|
||||
}
|
||||
|
||||
func TestDockerSwarmProvider_Start(t *testing.T) {
|
||||
type args struct {
|
||||
name string
|
||||
@@ -72,10 +83,7 @@ func TestDockerSwarmProvider_Start(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
clientMock := mocks.NewDockerAPIClientMock()
|
||||
provider := &DockerSwarmProvider{
|
||||
Client: clientMock,
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, clientMock)
|
||||
|
||||
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
|
||||
clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
|
||||
@@ -149,10 +157,7 @@ func TestDockerSwarmProvider_Stop(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
clientMock := mocks.NewDockerAPIClientMock()
|
||||
provider := &DockerSwarmProvider{
|
||||
Client: clientMock,
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, clientMock)
|
||||
|
||||
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
|
||||
clientMock.On("ServiceUpdate", mock.Anything, tt.wantService.ID, tt.wantService.Meta.Version, tt.wantService.Spec, mock.Anything).Return(tt.response, nil)
|
||||
@@ -224,10 +229,7 @@ func TestDockerSwarmProvider_GetState(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
clientMock := mocks.NewDockerAPIClientMock()
|
||||
provider := &DockerSwarmProvider{
|
||||
Client: clientMock,
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, clientMock)
|
||||
|
||||
clientMock.On("ServiceList", mock.Anything, mock.Anything).Return(tt.serviceList, nil)
|
||||
|
||||
@@ -268,10 +270,7 @@ func TestDockerSwarmProvider_NotifyInstanceStopped(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
provider := &DockerSwarmProvider{
|
||||
Client: mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors),
|
||||
desiredReplicas: 1,
|
||||
}
|
||||
provider := setupProvider(t, mocks.NewDockerAPIClientMockWithEvents(tt.events, tt.errors))
|
||||
|
||||
instanceC := make(chan string)
|
||||
|
||||
|
||||
@@ -9,18 +9,18 @@ import (
|
||||
"github.com/sablierapp/sablier/app/discovery"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
"github.com/sablierapp/sablier/app/types"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func (provider *DockerSwarmProvider) InstanceList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
func (p *DockerSwarmProvider) InstanceList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
args := filters.NewArgs()
|
||||
for _, label := range options.Labels {
|
||||
args.Add("label", label)
|
||||
args.Add("label", fmt.Sprintf("%s=true", label))
|
||||
}
|
||||
|
||||
services, err := provider.Client.ServiceList(ctx, dockertypes.ServiceListOptions{
|
||||
services, err := p.Client.ServiceList(ctx, dockertypes.ServiceListOptions{
|
||||
Filters: args,
|
||||
})
|
||||
|
||||
@@ -30,14 +30,14 @@ func (provider *DockerSwarmProvider) InstanceList(ctx context.Context, options p
|
||||
|
||||
instances := make([]types.Instance, 0, len(services))
|
||||
for _, s := range services {
|
||||
instance := serviceToInstance(s)
|
||||
instance := p.serviceToInstance(s)
|
||||
instances = append(instances, instance)
|
||||
}
|
||||
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
func serviceToInstance(s swarm.Service) (i types.Instance) {
|
||||
func (p *DockerSwarmProvider) serviceToInstance(s swarm.Service) (i types.Instance) {
|
||||
var group string
|
||||
var replicas uint64
|
||||
|
||||
@@ -51,7 +51,7 @@ func serviceToInstance(s swarm.Service) (i types.Instance) {
|
||||
if r, ok := s.Spec.Labels[discovery.LabelReplicas]; ok {
|
||||
atoi, err := strconv.Atoi(r)
|
||||
if err != nil {
|
||||
log.Warnf("Defaulting to default replicas value, could not convert value \"%v\" to int: %v", r, err)
|
||||
p.l.Warn("invalid replicas label value, using default replicas value", slog.Any("error", err), slog.String("instance", s.Spec.Name), slog.String("value", r))
|
||||
replicas = discovery.LabelReplicasDefaultValue
|
||||
} else {
|
||||
replicas = uint64(atoi)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/sablierapp/sablier/app/discovery"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
@@ -12,7 +13,6 @@ import (
|
||||
|
||||
"github.com/sablierapp/sablier/app/instance"
|
||||
providerConfig "github.com/sablierapp/sablier/config"
|
||||
log "github.com/sirupsen/logrus"
|
||||
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
@@ -32,53 +32,64 @@ type Workload interface {
|
||||
type KubernetesProvider struct {
|
||||
Client kubernetes.Interface
|
||||
delimiter string
|
||||
l *slog.Logger
|
||||
}
|
||||
|
||||
func NewKubernetesProvider(providerConfig providerConfig.Kubernetes) (*KubernetesProvider, error) {
|
||||
kubeclientConfig, err := rest.InClusterConfig()
|
||||
func NewKubernetesProvider(ctx context.Context, logger *slog.Logger, providerConfig providerConfig.Kubernetes) (*KubernetesProvider, error) {
|
||||
logger = logger.With(slog.String("provider", "kubernetes"))
|
||||
|
||||
kubeclientConfig, err := rest.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kubeclientConfig.QPS = providerConfig.QPS
|
||||
kubeclientConfig.Burst = providerConfig.Burst
|
||||
|
||||
log.Debug(fmt.Sprintf("Provider configuration: QPS=%v, Burst=%v", kubeclientConfig.QPS, kubeclientConfig.Burst))
|
||||
|
||||
client, err := kubernetes.NewForConfig(kubeclientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info, err := client.ServerVersion()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.InfoContext(ctx, "connection established with kubernetes",
|
||||
slog.String("version", info.String()),
|
||||
slog.Float64("config.qps", float64(kubeclientConfig.QPS)),
|
||||
slog.Int("config.burst", kubeclientConfig.Burst),
|
||||
)
|
||||
|
||||
return &KubernetesProvider{
|
||||
Client: client,
|
||||
delimiter: providerConfig.Delimiter,
|
||||
l: logger,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) Start(ctx context.Context, name string) error {
|
||||
parsed, err := ParseName(name, ParseOptions{Delimiter: provider.delimiter})
|
||||
func (p *KubernetesProvider) Start(ctx context.Context, name string) error {
|
||||
parsed, err := ParseName(name, ParseOptions{Delimiter: p.delimiter})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return provider.scale(ctx, parsed, parsed.Replicas)
|
||||
return p.scale(ctx, parsed, parsed.Replicas)
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) Stop(ctx context.Context, name string) error {
|
||||
parsed, err := ParseName(name, ParseOptions{Delimiter: provider.delimiter})
|
||||
func (p *KubernetesProvider) Stop(ctx context.Context, name string) error {
|
||||
parsed, err := ParseName(name, ParseOptions{Delimiter: p.delimiter})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return provider.scale(ctx, parsed, 0)
|
||||
return p.scale(ctx, parsed, 0)
|
||||
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) GetGroups(ctx context.Context) (map[string][]string, error) {
|
||||
deployments, err := provider.Client.AppsV1().Deployments(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
func (p *KubernetesProvider) GetGroups(ctx context.Context) (map[string][]string, error) {
|
||||
deployments, err := p.Client.AppsV1().Deployments(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: discovery.LabelEnable,
|
||||
})
|
||||
|
||||
@@ -94,12 +105,12 @@ func (provider *KubernetesProvider) GetGroups(ctx context.Context) (map[string][
|
||||
}
|
||||
|
||||
group := groups[groupName]
|
||||
parsed := DeploymentName(deployment, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := DeploymentName(deployment, ParseOptions{Delimiter: p.delimiter})
|
||||
group = append(group, parsed.Original)
|
||||
groups[groupName] = group
|
||||
}
|
||||
|
||||
statefulSets, err := provider.Client.AppsV1().StatefulSets(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
statefulSets, err := p.Client.AppsV1().StatefulSets(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: discovery.LabelEnable,
|
||||
})
|
||||
|
||||
@@ -114,7 +125,7 @@ func (provider *KubernetesProvider) GetGroups(ctx context.Context) (map[string][
|
||||
}
|
||||
|
||||
group := groups[groupName]
|
||||
parsed := StatefulSetName(statefulSet, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := StatefulSetName(statefulSet, ParseOptions{Delimiter: p.delimiter})
|
||||
group = append(group, parsed.Original)
|
||||
groups[groupName] = group
|
||||
}
|
||||
@@ -122,14 +133,14 @@ func (provider *KubernetesProvider) GetGroups(ctx context.Context) (map[string][
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) scale(ctx context.Context, config ParsedName, replicas int32) error {
|
||||
func (p *KubernetesProvider) scale(ctx context.Context, config ParsedName, replicas int32) error {
|
||||
var workload Workload
|
||||
|
||||
switch config.Kind {
|
||||
case "deployment":
|
||||
workload = provider.Client.AppsV1().Deployments(config.Namespace)
|
||||
workload = p.Client.AppsV1().Deployments(config.Namespace)
|
||||
case "statefulset":
|
||||
workload = provider.Client.AppsV1().StatefulSets(config.Namespace)
|
||||
workload = p.Client.AppsV1().StatefulSets(config.Namespace)
|
||||
default:
|
||||
return fmt.Errorf("unsupported kind \"%s\" must be one of \"deployment\", \"statefulset\"", config.Kind)
|
||||
}
|
||||
@@ -145,24 +156,24 @@ func (provider *KubernetesProvider) scale(ctx context.Context, config ParsedName
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) GetState(ctx context.Context, name string) (instance.State, error) {
|
||||
parsed, err := ParseName(name, ParseOptions{Delimiter: provider.delimiter})
|
||||
func (p *KubernetesProvider) GetState(ctx context.Context, name string) (instance.State, error) {
|
||||
parsed, err := ParseName(name, ParseOptions{Delimiter: p.delimiter})
|
||||
if err != nil {
|
||||
return instance.State{}, err
|
||||
}
|
||||
|
||||
switch parsed.Kind {
|
||||
case "deployment":
|
||||
return provider.getDeploymentState(ctx, parsed)
|
||||
return p.getDeploymentState(ctx, parsed)
|
||||
case "statefulset":
|
||||
return provider.getStatefulsetState(ctx, parsed)
|
||||
return p.getStatefulsetState(ctx, parsed)
|
||||
default:
|
||||
return instance.State{}, fmt.Errorf("unsupported kind \"%s\" must be one of \"deployment\", \"statefulset\"", parsed.Kind)
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) getDeploymentState(ctx context.Context, config ParsedName) (instance.State, error) {
|
||||
d, err := provider.Client.AppsV1().Deployments(config.Namespace).Get(ctx, config.Name, metav1.GetOptions{})
|
||||
func (p *KubernetesProvider) getDeploymentState(ctx context.Context, config ParsedName) (instance.State, error) {
|
||||
d, err := p.Client.AppsV1().Deployments(config.Namespace).Get(ctx, config.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return instance.State{}, err
|
||||
}
|
||||
@@ -174,8 +185,8 @@ func (provider *KubernetesProvider) getDeploymentState(ctx context.Context, conf
|
||||
return instance.NotReadyInstanceState(config.Original, d.Status.ReadyReplicas, config.Replicas), nil
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) getStatefulsetState(ctx context.Context, config ParsedName) (instance.State, error) {
|
||||
ss, err := provider.Client.AppsV1().StatefulSets(config.Namespace).Get(ctx, config.Name, metav1.GetOptions{})
|
||||
func (p *KubernetesProvider) getStatefulsetState(ctx context.Context, config ParsedName) (instance.State, error) {
|
||||
ss, err := p.Client.AppsV1().StatefulSets(config.Namespace).Get(ctx, config.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return instance.State{}, err
|
||||
}
|
||||
@@ -187,15 +198,15 @@ func (provider *KubernetesProvider) getStatefulsetState(ctx context.Context, con
|
||||
return instance.NotReadyInstanceState(config.Original, ss.Status.ReadyReplicas, *ss.Spec.Replicas), nil
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||
func (p *KubernetesProvider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
|
||||
|
||||
informer := provider.watchDeployents(instance)
|
||||
informer := p.watchDeployents(instance)
|
||||
go informer.Run(ctx.Done())
|
||||
informer = provider.watchStatefulSets(instance)
|
||||
informer = p.watchStatefulSets(instance)
|
||||
go informer.Run(ctx.Done())
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) watchDeployents(instance chan<- string) cache.SharedIndexInformer {
|
||||
func (p *KubernetesProvider) watchDeployents(instance chan<- string) cache.SharedIndexInformer {
|
||||
handler := cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
newDeployment := new.(*appsv1.Deployment)
|
||||
@@ -206,24 +217,24 @@ func (provider *KubernetesProvider) watchDeployents(instance chan<- string) cach
|
||||
}
|
||||
|
||||
if *newDeployment.Spec.Replicas == 0 {
|
||||
parsed := DeploymentName(*newDeployment, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := DeploymentName(*newDeployment, ParseOptions{Delimiter: p.delimiter})
|
||||
instance <- parsed.Original
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
deletedDeployment := obj.(*appsv1.Deployment)
|
||||
parsed := DeploymentName(*deletedDeployment, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := DeploymentName(*deletedDeployment, ParseOptions{Delimiter: p.delimiter})
|
||||
instance <- parsed.Original
|
||||
},
|
||||
}
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(provider.Client, 2*time.Second, informers.WithNamespace(core_v1.NamespaceAll))
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(p.Client, 2*time.Second, informers.WithNamespace(core_v1.NamespaceAll))
|
||||
informer := factory.Apps().V1().Deployments().Informer()
|
||||
|
||||
informer.AddEventHandler(handler)
|
||||
return informer
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) watchStatefulSets(instance chan<- string) cache.SharedIndexInformer {
|
||||
func (p *KubernetesProvider) watchStatefulSets(instance chan<- string) cache.SharedIndexInformer {
|
||||
handler := cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
newStatefulSet := new.(*appsv1.StatefulSet)
|
||||
@@ -234,17 +245,17 @@ func (provider *KubernetesProvider) watchStatefulSets(instance chan<- string) ca
|
||||
}
|
||||
|
||||
if *newStatefulSet.Spec.Replicas == 0 {
|
||||
parsed := StatefulSetName(*newStatefulSet, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := StatefulSetName(*newStatefulSet, ParseOptions{Delimiter: p.delimiter})
|
||||
instance <- parsed.Original
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
deletedStatefulSet := obj.(*appsv1.StatefulSet)
|
||||
parsed := StatefulSetName(*deletedStatefulSet, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := StatefulSetName(*deletedStatefulSet, ParseOptions{Delimiter: p.delimiter})
|
||||
instance <- parsed.Original
|
||||
},
|
||||
}
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(provider.Client, 2*time.Second, informers.WithNamespace(core_v1.NamespaceAll))
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(p.Client, 2*time.Second, informers.WithNamespace(core_v1.NamespaceAll))
|
||||
informer := factory.Apps().V1().StatefulSets().Informer()
|
||||
|
||||
informer.AddEventHandler(handler)
|
||||
|
||||
@@ -2,6 +2,8 @@ package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/neilotoole/slogt"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@@ -13,6 +15,15 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func setupProvider(t *testing.T, client kubernetes.Interface) *KubernetesProvider {
|
||||
t.Helper()
|
||||
return &KubernetesProvider{
|
||||
Client: client,
|
||||
delimiter: "_",
|
||||
l: slogt.New(t),
|
||||
}
|
||||
}
|
||||
|
||||
func TestKubernetesProvider_Start(t *testing.T) {
|
||||
type data struct {
|
||||
name string
|
||||
@@ -70,10 +81,7 @@ func TestKubernetesProvider_Start(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
deploymentAPI := mocks.DeploymentMock{}
|
||||
statefulsetAPI := mocks.StatefulSetsMock{}
|
||||
provider := KubernetesProvider{
|
||||
Client: mocks.NewKubernetesAPIClientMock(&deploymentAPI, &statefulsetAPI),
|
||||
delimiter: "_",
|
||||
}
|
||||
provider := setupProvider(t, mocks.NewKubernetesAPIClientMock(&deploymentAPI, &statefulsetAPI))
|
||||
|
||||
deploymentAPI.On("GetScale", mock.Anything, tt.data.name, metav1.GetOptions{}).Return(tt.data.get, nil)
|
||||
deploymentAPI.On("UpdateScale", mock.Anything, tt.data.name, tt.data.update, metav1.UpdateOptions{}).Return(nil, nil)
|
||||
@@ -147,10 +155,7 @@ func TestKubernetesProvider_Stop(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
deploymentAPI := mocks.DeploymentMock{}
|
||||
statefulsetAPI := mocks.StatefulSetsMock{}
|
||||
provider := KubernetesProvider{
|
||||
Client: mocks.NewKubernetesAPIClientMock(&deploymentAPI, &statefulsetAPI),
|
||||
delimiter: "_",
|
||||
}
|
||||
provider := setupProvider(t, mocks.NewKubernetesAPIClientMock(&deploymentAPI, &statefulsetAPI))
|
||||
|
||||
deploymentAPI.On("GetScale", mock.Anything, tt.data.name, metav1.GetOptions{}).Return(tt.data.get, nil)
|
||||
deploymentAPI.On("UpdateScale", mock.Anything, tt.data.name, tt.data.update, metav1.UpdateOptions{}).Return(nil, nil)
|
||||
@@ -267,10 +272,7 @@ func TestKubernetesProvider_GetState(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
deploymentAPI := mocks.DeploymentMock{}
|
||||
statefulsetAPI := mocks.StatefulSetsMock{}
|
||||
provider := KubernetesProvider{
|
||||
Client: mocks.NewKubernetesAPIClientMock(&deploymentAPI, &statefulsetAPI),
|
||||
delimiter: "_",
|
||||
}
|
||||
provider := setupProvider(t, mocks.NewKubernetesAPIClientMock(&deploymentAPI, &statefulsetAPI))
|
||||
|
||||
deploymentAPI.On("Get", mock.Anything, tt.data.name, metav1.GetOptions{}).Return(tt.data.getDeployment, nil)
|
||||
statefulsetAPI.On("Get", mock.Anything, tt.data.name, metav1.GetOptions{}).Return(tt.data.getStatefulSet, nil)
|
||||
|
||||
@@ -5,21 +5,21 @@ import (
|
||||
"github.com/sablierapp/sablier/app/discovery"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
"github.com/sablierapp/sablier/app/types"
|
||||
log "github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/apps/v1"
|
||||
core_v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (provider *KubernetesProvider) InstanceList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
deployments, err := provider.deploymentList(ctx, options)
|
||||
func (p *KubernetesProvider) InstanceList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
deployments, err := p.deploymentList(ctx, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statefulSets, err := provider.statefulSetList(ctx, options)
|
||||
statefulSets, err := p.statefulSetList(ctx, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -27,8 +27,8 @@ func (provider *KubernetesProvider) InstanceList(ctx context.Context, options pr
|
||||
return append(deployments, statefulSets...), nil
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) deploymentList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
deployments, err := provider.Client.AppsV1().Deployments(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
func (p *KubernetesProvider) deploymentList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
deployments, err := p.Client.AppsV1().Deployments(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: strings.Join(options.Labels, ","),
|
||||
})
|
||||
|
||||
@@ -38,14 +38,14 @@ func (provider *KubernetesProvider) deploymentList(ctx context.Context, options
|
||||
|
||||
instances := make([]types.Instance, 0, len(deployments.Items))
|
||||
for _, d := range deployments.Items {
|
||||
instance := provider.deploymentToInstance(d)
|
||||
instance := p.deploymentToInstance(d)
|
||||
instances = append(instances, instance)
|
||||
}
|
||||
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) deploymentToInstance(d v1.Deployment) types.Instance {
|
||||
func (p *KubernetesProvider) deploymentToInstance(d v1.Deployment) types.Instance {
|
||||
var group string
|
||||
var replicas uint64
|
||||
|
||||
@@ -59,7 +59,7 @@ func (provider *KubernetesProvider) deploymentToInstance(d v1.Deployment) types.
|
||||
if r, ok := d.Labels[discovery.LabelReplicas]; ok {
|
||||
atoi, err := strconv.Atoi(r)
|
||||
if err != nil {
|
||||
log.Warnf("Defaulting to default replicas value, could not convert value \"%v\" to int: %v", r, err)
|
||||
p.l.Warn("invalid replicas label value, using default replicas value", slog.Any("error", err), slog.String("instance", d.Name), slog.String("value", r))
|
||||
replicas = discovery.LabelReplicasDefaultValue
|
||||
} else {
|
||||
replicas = uint64(atoi)
|
||||
@@ -69,7 +69,7 @@ func (provider *KubernetesProvider) deploymentToInstance(d v1.Deployment) types.
|
||||
}
|
||||
}
|
||||
|
||||
parsed := DeploymentName(d, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := DeploymentName(d, ParseOptions{Delimiter: p.delimiter})
|
||||
|
||||
return types.Instance{
|
||||
Name: parsed.Original,
|
||||
@@ -82,8 +82,8 @@ func (provider *KubernetesProvider) deploymentToInstance(d v1.Deployment) types.
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) statefulSetList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
statefulSets, err := provider.Client.AppsV1().StatefulSets(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
func (p *KubernetesProvider) statefulSetList(ctx context.Context, options providers.InstanceListOptions) ([]types.Instance, error) {
|
||||
statefulSets, err := p.Client.AppsV1().StatefulSets(core_v1.NamespaceAll).List(ctx, metav1.ListOptions{
|
||||
LabelSelector: strings.Join(options.Labels, ","),
|
||||
})
|
||||
|
||||
@@ -93,14 +93,14 @@ func (provider *KubernetesProvider) statefulSetList(ctx context.Context, options
|
||||
|
||||
instances := make([]types.Instance, 0, len(statefulSets.Items))
|
||||
for _, ss := range statefulSets.Items {
|
||||
instance := provider.statefulSetToInstance(ss)
|
||||
instance := p.statefulSetToInstance(ss)
|
||||
instances = append(instances, instance)
|
||||
}
|
||||
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
func (provider *KubernetesProvider) statefulSetToInstance(ss v1.StatefulSet) types.Instance {
|
||||
func (p *KubernetesProvider) statefulSetToInstance(ss v1.StatefulSet) types.Instance {
|
||||
var group string
|
||||
var replicas uint64
|
||||
|
||||
@@ -114,7 +114,7 @@ func (provider *KubernetesProvider) statefulSetToInstance(ss v1.StatefulSet) typ
|
||||
if r, ok := ss.Labels[discovery.LabelReplicas]; ok {
|
||||
atoi, err := strconv.Atoi(r)
|
||||
if err != nil {
|
||||
log.Warnf("Defaulting to default replicas value, could not convert value \"%v\" to int: %v", r, err)
|
||||
p.l.Warn("invalid replicas label value, using default replicas value", slog.Any("error", err), slog.String("instance", ss.Name), slog.String("value", r))
|
||||
replicas = discovery.LabelReplicasDefaultValue
|
||||
} else {
|
||||
replicas = uint64(atoi)
|
||||
@@ -124,7 +124,7 @@ func (provider *KubernetesProvider) statefulSetToInstance(ss v1.StatefulSet) typ
|
||||
}
|
||||
}
|
||||
|
||||
parsed := StatefulSetName(ss, ParseOptions{Delimiter: provider.delimiter})
|
||||
parsed := StatefulSetName(ss, ParseOptions{Delimiter: p.delimiter})
|
||||
|
||||
return types.Instance{
|
||||
Name: parsed.Original,
|
||||
|
||||
@@ -22,56 +22,48 @@ import (
|
||||
"github.com/sablierapp/sablier/config"
|
||||
"github.com/sablierapp/sablier/internal/server"
|
||||
"github.com/sablierapp/sablier/version"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func Start(ctx context.Context, conf config.Config) error {
|
||||
// Create context that listens for the interrupt signal from the OS.
|
||||
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
logLevel, err := log.ParseLevel(conf.Logging.Level)
|
||||
logger := setupLogger(conf.Logging)
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("unrecognized log level \"%s\" must be one of [panic, fatal, error, warn, info, debug, trace]", conf.Logging.Level)
|
||||
logLevel = log.InfoLevel
|
||||
}
|
||||
logger.Info("running Sablier version " + version.Info())
|
||||
|
||||
logger := slog.Default()
|
||||
|
||||
log.SetLevel(logLevel)
|
||||
|
||||
log.Info(version.Info())
|
||||
|
||||
provider, err := NewProvider(conf.Provider)
|
||||
provider, err := NewProvider(ctx, logger, conf.Provider)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("using provider \"%s\"", conf.Provider.Name)
|
||||
|
||||
store := inmemory.NewInMemory()
|
||||
err = store.OnExpire(ctx, onSessionExpires(provider))
|
||||
err = store.OnExpire(ctx, onSessionExpires(ctx, provider, logger))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
storage, err := storage.NewFileStorage(conf.Storage)
|
||||
sessionsManager := sessions.NewSessionsManager(logger, store, provider)
|
||||
|
||||
if conf.Storage.File != "" {
|
||||
storage, err := storage.NewFileStorage(conf.Storage, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sessionsManager := sessions.NewSessionsManager(store, provider)
|
||||
defer sessionsManager.Stop()
|
||||
defer saveSessions(storage, sessionsManager, logger)
|
||||
loadSessions(storage, sessionsManager, logger)
|
||||
}
|
||||
|
||||
groups, err := provider.GetGroups(ctx)
|
||||
if err != nil {
|
||||
log.Warn("could not get groups", err)
|
||||
logger.WarnContext(ctx, "initial group scan failed", slog.Any("reason", err))
|
||||
} else {
|
||||
sessionsManager.SetGroups(groups)
|
||||
}
|
||||
|
||||
updateGroups := make(chan map[string][]string)
|
||||
go WatchGroups(ctx, provider, 2*time.Second, updateGroups)
|
||||
go WatchGroups(ctx, provider, 2*time.Second, updateGroups, logger)
|
||||
go func() {
|
||||
for groups := range updateGroups {
|
||||
sessionsManager.SetGroups(groups)
|
||||
@@ -89,30 +81,25 @@ func Start(ctx context.Context, conf config.Config) error {
|
||||
}
|
||||
}()
|
||||
|
||||
if storage.Enabled() {
|
||||
defer saveSessions(storage, sessionsManager)
|
||||
loadSessions(storage, sessionsManager)
|
||||
}
|
||||
|
||||
if conf.Provider.AutoStopOnStartup {
|
||||
err := discovery.StopAllUnregisteredInstances(context.Background(), provider, store)
|
||||
err := discovery.StopAllUnregisteredInstances(ctx, provider, store, logger)
|
||||
if err != nil {
|
||||
log.Warnf("Stopping unregistered instances had an error: %v", err)
|
||||
logger.ErrorContext(ctx, "unable to stop unregistered instances", slog.Any("reason", err))
|
||||
}
|
||||
}
|
||||
|
||||
var t *theme.Themes
|
||||
|
||||
if conf.Strategy.Dynamic.CustomThemesPath != "" {
|
||||
log.Tracef("loading themes with custom theme path: %s", conf.Strategy.Dynamic.CustomThemesPath)
|
||||
logger.DebugContext(ctx, "loading themes from custom theme path", slog.String("path", conf.Strategy.Dynamic.CustomThemesPath))
|
||||
custom := os.DirFS(conf.Strategy.Dynamic.CustomThemesPath)
|
||||
t, err = theme.NewWithCustomThemes(custom)
|
||||
t, err = theme.NewWithCustomThemes(custom, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
log.Trace("loading themes without custom themes")
|
||||
t, err = theme.New()
|
||||
logger.DebugContext(ctx, "loading themes without custom theme path", slog.String("reason", "--strategy.dynamic.custom-themes-path is empty"))
|
||||
t, err = theme.New(logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -130,77 +117,71 @@ func Start(ctx context.Context, conf config.Config) error {
|
||||
// Listen for the interrupt signal.
|
||||
<-ctx.Done()
|
||||
|
||||
// Restore default behavior on the interrupt signal and notify user of shutdown.
|
||||
stop()
|
||||
log.Println("shutting down gracefully, press Ctrl+C again to force")
|
||||
logger.InfoContext(ctx, "shutting down gracefully, press Ctrl+C again to force")
|
||||
|
||||
// The context is used to inform the server it has 5 seconds to finish
|
||||
// the request it is currently handling
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
log.Println("Server exiting")
|
||||
logger.InfoContext(ctx, "Server exiting")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func onSessionExpires(provider providers.Provider) func(key string) {
|
||||
func onSessionExpires(ctx context.Context, provider providers.Provider, logger *slog.Logger) func(key string) {
|
||||
return func(_key string) {
|
||||
go func(key string) {
|
||||
log.Debugf("stopping %s...", key)
|
||||
err := provider.Stop(context.Background(), key)
|
||||
|
||||
logger.InfoContext(ctx, "instance expired", slog.String("instance", key))
|
||||
err := provider.Stop(ctx, key)
|
||||
if err != nil {
|
||||
log.Warnf("error stopping %s: %s", key, err.Error())
|
||||
} else {
|
||||
log.Debugf("stopped %s", key)
|
||||
logger.ErrorContext(ctx, "instance expired could not be stopped from provider", slog.String("instance", key), slog.Any("error", err))
|
||||
}
|
||||
}(_key)
|
||||
}
|
||||
}
|
||||
|
||||
func loadSessions(storage storage.Storage, sessions sessions.Manager) {
|
||||
slog.Info("loading sessions from storage")
|
||||
func loadSessions(storage storage.Storage, sessions sessions.Manager, logger *slog.Logger) {
|
||||
logger.Info("loading sessions from storage")
|
||||
reader, err := storage.Reader()
|
||||
if err != nil {
|
||||
log.Error("error loading sessions", err)
|
||||
logger.Error("error loading sessions from storage", slog.Any("reason", err))
|
||||
}
|
||||
err = sessions.LoadSessions(reader)
|
||||
if err != nil {
|
||||
log.Error("error loading sessions", err)
|
||||
logger.Error("error loading sessions into Sablier", slog.Any("reason", err))
|
||||
}
|
||||
}
|
||||
|
||||
func saveSessions(storage storage.Storage, sessions sessions.Manager) {
|
||||
slog.Info("writing sessions to storage")
|
||||
func saveSessions(storage storage.Storage, sessions sessions.Manager, logger *slog.Logger) {
|
||||
logger.Info("writing sessions to storage")
|
||||
writer, err := storage.Writer()
|
||||
if err != nil {
|
||||
log.Error("error saving sessions", err)
|
||||
logger.Error("error saving sessions to storage", slog.Any("reason", err))
|
||||
return
|
||||
}
|
||||
err = sessions.SaveSessions(writer)
|
||||
if err != nil {
|
||||
log.Error("error saving sessions", err)
|
||||
logger.Error("error saving sessions from Sablier", slog.Any("reason", err))
|
||||
}
|
||||
}
|
||||
|
||||
func NewProvider(config config.Provider) (providers.Provider, error) {
|
||||
func NewProvider(ctx context.Context, logger *slog.Logger, config config.Provider) (providers.Provider, error) {
|
||||
if err := config.IsValid(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch config.Name {
|
||||
case "swarm", "docker_swarm":
|
||||
return dockerswarm.NewDockerSwarmProvider()
|
||||
return dockerswarm.NewDockerSwarmProvider(ctx, logger)
|
||||
case "docker":
|
||||
return docker.NewDockerClassicProvider()
|
||||
return docker.NewDockerClassicProvider(ctx, logger)
|
||||
case "kubernetes":
|
||||
return kubernetes.NewKubernetesProvider(config.Kubernetes)
|
||||
return kubernetes.NewKubernetesProvider(ctx, logger, config.Kubernetes)
|
||||
}
|
||||
return nil, fmt.Errorf("unimplemented provider %s", config.Name)
|
||||
}
|
||||
|
||||
func WatchGroups(ctx context.Context, provider providers.Provider, frequency time.Duration, send chan<- map[string][]string) {
|
||||
func WatchGroups(ctx context.Context, provider providers.Provider, frequency time.Duration, send chan<- map[string][]string, logger *slog.Logger) {
|
||||
ticker := time.NewTicker(frequency)
|
||||
for {
|
||||
select {
|
||||
@@ -209,7 +190,7 @@ func WatchGroups(ctx context.Context, provider providers.Provider, frequency tim
|
||||
case <-ticker.C:
|
||||
groups, err := provider.GetGroups(ctx)
|
||||
if err != nil {
|
||||
log.Warn("could not get groups", err)
|
||||
logger.Error("cannot retrieve group from provider", slog.Any("reason", err))
|
||||
} else if groups != nil {
|
||||
send <- groups
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/sablierapp/sablier/pkg/store"
|
||||
"io"
|
||||
"log/slog"
|
||||
@@ -15,14 +16,13 @@ import (
|
||||
|
||||
"github.com/sablierapp/sablier/app/instance"
|
||||
"github.com/sablierapp/sablier/app/providers"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
//go:generate mockgen -package sessionstest -source=sessions_manager.go -destination=sessionstest/mocks_sessions_manager.go *
|
||||
|
||||
type Manager interface {
|
||||
RequestSession(names []string, duration time.Duration) (*SessionState, error)
|
||||
RequestSessionGroup(group string, duration time.Duration) (*SessionState, error)
|
||||
RequestSession(ctx context.Context, names []string, duration time.Duration) (*SessionState, error)
|
||||
RequestSessionGroup(ctx context.Context, group string, duration time.Duration) (*SessionState, error)
|
||||
RequestReadySession(ctx context.Context, names []string, duration time.Duration, timeout time.Duration) (*SessionState, error)
|
||||
RequestReadySessionGroup(ctx context.Context, group string, duration time.Duration, timeout time.Duration) (*SessionState, error)
|
||||
|
||||
@@ -31,47 +31,44 @@ type Manager interface {
|
||||
|
||||
RemoveInstance(name string) error
|
||||
SetGroups(groups map[string][]string)
|
||||
|
||||
Stop()
|
||||
}
|
||||
|
||||
type SessionsManager struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
store store.Store
|
||||
provider providers.Provider
|
||||
groups map[string][]string
|
||||
|
||||
l *slog.Logger
|
||||
}
|
||||
|
||||
func NewSessionsManager(store store.Store, provider providers.Provider) Manager {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
func NewSessionsManager(logger *slog.Logger, store store.Store, provider providers.Provider) Manager {
|
||||
sm := &SessionsManager{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
store: store,
|
||||
provider: provider,
|
||||
groups: map[string][]string{},
|
||||
l: logger,
|
||||
}
|
||||
|
||||
return sm
|
||||
}
|
||||
|
||||
func (sm *SessionsManager) SetGroups(groups map[string][]string) {
|
||||
func (s *SessionsManager) SetGroups(groups map[string][]string) {
|
||||
if groups == nil {
|
||||
groups = map[string][]string{}
|
||||
}
|
||||
slog.Info("set groups", slog.Any("old", sm.groups), slog.Any("new", groups))
|
||||
sm.groups = groups
|
||||
if diff := cmp.Diff(s.groups, groups); diff != "" {
|
||||
// TODO: Change this log for a friendly logging, groups rarely change, so we can put some effort on displaying what changed
|
||||
s.l.Info("set groups", slog.Any("old", s.groups), slog.Any("new", groups), slog.Any("diff", diff))
|
||||
s.groups = groups
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SessionsManager) RemoveInstance(name string) error {
|
||||
return sm.store.Delete(context.Background(), name)
|
||||
func (s *SessionsManager) RemoveInstance(name string) error {
|
||||
return s.store.Delete(context.Background(), name)
|
||||
}
|
||||
|
||||
func (sm *SessionsManager) LoadSessions(reader io.ReadCloser) error {
|
||||
unmarshaler, ok := sm.store.(json.Unmarshaler)
|
||||
func (s *SessionsManager) LoadSessions(reader io.ReadCloser) error {
|
||||
unmarshaler, ok := s.store.(json.Unmarshaler)
|
||||
defer reader.Close()
|
||||
if ok {
|
||||
return json.NewDecoder(reader).Decode(unmarshaler)
|
||||
@@ -79,8 +76,8 @@ func (sm *SessionsManager) LoadSessions(reader io.ReadCloser) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sm *SessionsManager) SaveSessions(writer io.WriteCloser) error {
|
||||
marshaler, ok := sm.store.(json.Marshaler)
|
||||
func (s *SessionsManager) SaveSessions(writer io.WriteCloser) error {
|
||||
marshaler, ok := s.store.(json.Marshaler)
|
||||
defer writer.Close()
|
||||
if ok {
|
||||
encoder := json.NewEncoder(writer)
|
||||
@@ -93,7 +90,7 @@ func (sm *SessionsManager) SaveSessions(writer io.WriteCloser) error {
|
||||
}
|
||||
|
||||
type InstanceState struct {
|
||||
Instance *instance.State `json:"instance"`
|
||||
Instance instance.State `json:"instance"`
|
||||
Error error `json:"error"`
|
||||
}
|
||||
|
||||
@@ -123,13 +120,14 @@ func (s *SessionState) Status() string {
|
||||
return "not-ready"
|
||||
}
|
||||
|
||||
func (s *SessionsManager) RequestSession(names []string, duration time.Duration) (sessionState *SessionState, err error) {
|
||||
func (s *SessionsManager) RequestSession(ctx context.Context, names []string, duration time.Duration) (sessionState *SessionState, err error) {
|
||||
if len(names) == 0 {
|
||||
return nil, fmt.Errorf("names cannot be empty")
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
mx := sync.Mutex{}
|
||||
sessionState = &SessionState{
|
||||
Instances: map[string]InstanceState{},
|
||||
}
|
||||
@@ -139,8 +137,9 @@ func (s *SessionsManager) RequestSession(names []string, duration time.Duration)
|
||||
for i := 0; i < len(names); i++ {
|
||||
go func(name string) {
|
||||
defer wg.Done()
|
||||
state, err := s.requestSessionInstance(name, duration)
|
||||
|
||||
state, err := s.requestInstance(ctx, name, duration)
|
||||
mx.Lock()
|
||||
defer mx.Unlock()
|
||||
sessionState.Instances[name] = InstanceState{
|
||||
Instance: state,
|
||||
Error: err,
|
||||
@@ -153,7 +152,7 @@ func (s *SessionsManager) RequestSession(names []string, duration time.Duration)
|
||||
return sessionState, nil
|
||||
}
|
||||
|
||||
func (s *SessionsManager) RequestSessionGroup(group string, duration time.Duration) (sessionState *SessionState, err error) {
|
||||
func (s *SessionsManager) RequestSessionGroup(ctx context.Context, group string, duration time.Duration) (sessionState *SessionState, err error) {
|
||||
if len(group) == 0 {
|
||||
return nil, fmt.Errorf("group is mandatory")
|
||||
}
|
||||
@@ -170,60 +169,48 @@ func (s *SessionsManager) RequestSessionGroup(group string, duration time.Durati
|
||||
return nil, fmt.Errorf("group has no member")
|
||||
}
|
||||
|
||||
return s.RequestSession(names, duration)
|
||||
return s.RequestSession(ctx, names, duration)
|
||||
}
|
||||
|
||||
func (s *SessionsManager) requestSessionInstance(name string, duration time.Duration) (*instance.State, error) {
|
||||
func (s *SessionsManager) requestInstance(ctx context.Context, name string, duration time.Duration) (instance.State, error) {
|
||||
if name == "" {
|
||||
return nil, errors.New("instance name cannot be empty")
|
||||
return instance.State{}, errors.New("instance name cannot be empty")
|
||||
}
|
||||
|
||||
requestState, err := s.store.Get(context.TODO(), name)
|
||||
state, err := s.store.Get(ctx, name)
|
||||
if errors.Is(err, store.ErrKeyNotFound) {
|
||||
log.Debugf("starting [%s]...", name)
|
||||
s.l.DebugContext(ctx, "request to start instance received", slog.String("instance", name))
|
||||
|
||||
err := s.provider.Start(s.ctx, name)
|
||||
err := s.provider.Start(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return instance.State{}, err
|
||||
}
|
||||
|
||||
state, err := s.provider.GetState(s.ctx, name)
|
||||
state, err = s.provider.GetState(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return instance.State{}, err
|
||||
}
|
||||
|
||||
requestState.Name = name
|
||||
requestState.CurrentReplicas = state.CurrentReplicas
|
||||
requestState.DesiredReplicas = state.DesiredReplicas
|
||||
requestState.Status = state.Status
|
||||
requestState.Message = state.Message
|
||||
|
||||
log.Debugf("status for [%s]=[%s]", name, requestState.Status)
|
||||
s.l.DebugContext(ctx, "request to start instance status completed", slog.String("instance", name), slog.String("status", state.Status))
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("cannot retrieve instance from store: %w", err)
|
||||
} else if requestState.Status != instance.Ready {
|
||||
log.Debugf("checking [%s]...", name)
|
||||
state, err := s.provider.GetState(s.ctx, name)
|
||||
s.l.ErrorContext(ctx, "request to start instance failed", slog.String("instance", name), slog.Any("error", err))
|
||||
return instance.State{}, fmt.Errorf("cannot retrieve instance from store: %w", err)
|
||||
} else if state.Status != instance.Ready {
|
||||
s.l.DebugContext(ctx, "request to check instance status received", slog.String("instance", name), slog.String("current_status", state.Status))
|
||||
state, err = s.provider.GetState(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return instance.State{}, err
|
||||
}
|
||||
s.l.DebugContext(ctx, "request to check instance status completed", slog.String("instance", name), slog.String("new_status", state.Status))
|
||||
}
|
||||
|
||||
requestState.Name = state.Name
|
||||
requestState.CurrentReplicas = state.CurrentReplicas
|
||||
requestState.DesiredReplicas = state.DesiredReplicas
|
||||
requestState.Status = state.Status
|
||||
requestState.Message = state.Message
|
||||
log.Debugf("status for %s=%s", name, requestState.Status)
|
||||
}
|
||||
|
||||
log.Debugf("expiring %+v in %v", requestState, duration)
|
||||
s.l.DebugContext(ctx, "set expiration for instance", slog.String("instance", name), slog.Duration("expiration", duration))
|
||||
// Refresh the duration
|
||||
s.ExpiresAfter(&requestState, duration)
|
||||
return &requestState, nil
|
||||
s.expiresAfter(ctx, state, duration)
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func (s *SessionsManager) RequestReadySession(ctx context.Context, names []string, duration time.Duration, timeout time.Duration) (*SessionState, error) {
|
||||
session, err := s.RequestSession(names, duration)
|
||||
session, err := s.RequestSession(ctx, names, duration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -241,7 +228,7 @@ func (s *SessionsManager) RequestReadySession(ctx context.Context, names []strin
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
session, err := s.RequestSession(names, duration)
|
||||
session, err := s.RequestSession(ctx, names, duration)
|
||||
if err != nil {
|
||||
errch <- err
|
||||
return
|
||||
@@ -258,7 +245,7 @@ func (s *SessionsManager) RequestReadySession(ctx context.Context, names []strin
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Debug("request cancelled by user, stopping timeout")
|
||||
s.l.DebugContext(ctx, "request cancelled", slog.Any("reason", ctx.Err()))
|
||||
close(quit)
|
||||
if ctx.Err() != nil {
|
||||
return nil, fmt.Errorf("request cancelled by user: %w", ctx.Err())
|
||||
@@ -297,18 +284,13 @@ func (s *SessionsManager) RequestReadySessionGroup(ctx context.Context, group st
|
||||
return s.RequestReadySession(ctx, names, duration, timeout)
|
||||
}
|
||||
|
||||
func (s *SessionsManager) ExpiresAfter(instance *instance.State, duration time.Duration) {
|
||||
err := s.store.Put(context.TODO(), *instance, duration)
|
||||
func (s *SessionsManager) expiresAfter(ctx context.Context, instance instance.State, duration time.Duration) {
|
||||
err := s.store.Put(ctx, instance, duration)
|
||||
if err != nil {
|
||||
slog.Default().Warn("could not put instance to store, will not expire", slog.Any("error", err), slog.String("instance", instance.Name))
|
||||
s.l.Error("could not put instance to store, will not expire", slog.Any("error", err), slog.String("instance", instance.Name))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SessionsManager) Stop() {
|
||||
// Stop event listeners
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
func (s *SessionState) MarshalJSON() ([]byte, error) {
|
||||
instances := maps.Values(s.Instances)
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package sessions
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/neilotoole/slogt"
|
||||
"github.com/sablierapp/sablier/pkg/store/storetest"
|
||||
"go.uber.org/mock/gomock"
|
||||
"testing"
|
||||
@@ -26,7 +27,7 @@ func TestSessionState_IsReady(t *testing.T) {
|
||||
{
|
||||
name: "all instances are ready",
|
||||
fields: fields{
|
||||
Instances: createMap([]*instance.State{
|
||||
Instances: createMap([]instance.State{
|
||||
{Name: "nginx", Status: instance.Ready},
|
||||
{Name: "apache", Status: instance.Ready},
|
||||
}),
|
||||
@@ -36,7 +37,7 @@ func TestSessionState_IsReady(t *testing.T) {
|
||||
{
|
||||
name: "one instance is not ready",
|
||||
fields: fields{
|
||||
Instances: createMap([]*instance.State{
|
||||
Instances: createMap([]instance.State{
|
||||
{Name: "nginx", Status: instance.Ready},
|
||||
{Name: "apache", Status: instance.NotReady},
|
||||
}),
|
||||
@@ -46,14 +47,14 @@ func TestSessionState_IsReady(t *testing.T) {
|
||||
{
|
||||
name: "no instances specified",
|
||||
fields: fields{
|
||||
Instances: createMap([]*instance.State{}),
|
||||
Instances: createMap([]instance.State{}),
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "one instance has an error",
|
||||
fields: fields{
|
||||
Instances: createMap([]*instance.State{
|
||||
Instances: createMap([]instance.State{
|
||||
{Name: "nginx-error", Status: instance.Unrecoverable, Message: "connection timeout"},
|
||||
{Name: "apache", Status: instance.Ready},
|
||||
}),
|
||||
@@ -73,7 +74,7 @@ func TestSessionState_IsReady(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func createMap(instances []*instance.State) map[string]InstanceState {
|
||||
func createMap(instances []instance.State) map[string]InstanceState {
|
||||
states := make(map[string]InstanceState)
|
||||
|
||||
for _, v := range instances {
|
||||
@@ -93,7 +94,7 @@ func setupSessionManager(t *testing.T) (Manager, *storetest.MockStore, *mocks.Pr
|
||||
p := mocks.NewProviderMock()
|
||||
s := storetest.NewMockStore(ctrl)
|
||||
|
||||
m := NewSessionsManager(s, p)
|
||||
m := NewSessionsManager(slogt.New(t), s, p)
|
||||
return m, s, p
|
||||
}
|
||||
|
||||
|
||||
@@ -102,33 +102,33 @@ func (mr *MockManagerMockRecorder) RequestReadySessionGroup(ctx, group, duration
|
||||
}
|
||||
|
||||
// RequestSession mocks base method.
|
||||
func (m *MockManager) RequestSession(names []string, duration time.Duration) (*sessions.SessionState, error) {
|
||||
func (m *MockManager) RequestSession(ctx context.Context, names []string, duration time.Duration) (*sessions.SessionState, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "RequestSession", names, duration)
|
||||
ret := m.ctrl.Call(m, "RequestSession", ctx, names, duration)
|
||||
ret0, _ := ret[0].(*sessions.SessionState)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// RequestSession indicates an expected call of RequestSession.
|
||||
func (mr *MockManagerMockRecorder) RequestSession(names, duration any) *gomock.Call {
|
||||
func (mr *MockManagerMockRecorder) RequestSession(ctx, names, duration any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestSession", reflect.TypeOf((*MockManager)(nil).RequestSession), names, duration)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestSession", reflect.TypeOf((*MockManager)(nil).RequestSession), ctx, names, duration)
|
||||
}
|
||||
|
||||
// RequestSessionGroup mocks base method.
|
||||
func (m *MockManager) RequestSessionGroup(group string, duration time.Duration) (*sessions.SessionState, error) {
|
||||
func (m *MockManager) RequestSessionGroup(ctx context.Context, group string, duration time.Duration) (*sessions.SessionState, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "RequestSessionGroup", group, duration)
|
||||
ret := m.ctrl.Call(m, "RequestSessionGroup", ctx, group, duration)
|
||||
ret0, _ := ret[0].(*sessions.SessionState)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// RequestSessionGroup indicates an expected call of RequestSessionGroup.
|
||||
func (mr *MockManagerMockRecorder) RequestSessionGroup(group, duration any) *gomock.Call {
|
||||
func (mr *MockManagerMockRecorder) RequestSessionGroup(ctx, group, duration any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestSessionGroup", reflect.TypeOf((*MockManager)(nil).RequestSessionGroup), group, duration)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestSessionGroup", reflect.TypeOf((*MockManager)(nil).RequestSessionGroup), ctx, group, duration)
|
||||
}
|
||||
|
||||
// SaveSessions mocks base method.
|
||||
@@ -156,15 +156,3 @@ func (mr *MockManagerMockRecorder) SetGroups(groups any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetGroups", reflect.TypeOf((*MockManager)(nil).SetGroups), groups)
|
||||
}
|
||||
|
||||
// Stop mocks base method.
|
||||
func (m *MockManager) Stop() {
|
||||
m.ctrl.T.Helper()
|
||||
m.ctrl.Call(m, "Stop")
|
||||
}
|
||||
|
||||
// Stop indicates an expected call of Stop.
|
||||
func (mr *MockManagerMockRecorder) Stop() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockManager)(nil).Stop))
|
||||
}
|
||||
|
||||
@@ -3,67 +3,56 @@ package storage
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
|
||||
"github.com/sablierapp/sablier/config"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Storage interface {
|
||||
Reader() (io.ReadCloser, error)
|
||||
Writer() (io.WriteCloser, error)
|
||||
|
||||
Enabled() bool
|
||||
}
|
||||
|
||||
type FileStorage struct {
|
||||
file string
|
||||
l *slog.Logger
|
||||
}
|
||||
|
||||
func NewFileStorage(config config.Storage) (Storage, error) {
|
||||
func NewFileStorage(config config.Storage, logger *slog.Logger) (Storage, error) {
|
||||
logger = logger.With(slog.String("file", config.File))
|
||||
storage := &FileStorage{
|
||||
file: config.File,
|
||||
}
|
||||
|
||||
if storage.Enabled() {
|
||||
file, err := os.OpenFile(config.File, os.O_RDWR|os.O_CREATE, 0755)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("unable to open file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stats, err := file.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("unable to read file info: %w", err)
|
||||
}
|
||||
|
||||
// Initialize file to an empty JSON3
|
||||
if stats.Size() == 0 {
|
||||
file.WriteString("{}")
|
||||
_, err := file.WriteString("{}")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize file to valid json: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("initialized storage to %s", config.File)
|
||||
} else {
|
||||
log.Warn("no storage configuration provided. all states will be lost upon exit")
|
||||
}
|
||||
logger.Info("storage successfully initialized")
|
||||
|
||||
return storage, nil
|
||||
}
|
||||
|
||||
func (fs *FileStorage) Reader() (io.ReadCloser, error) {
|
||||
if !fs.Enabled() {
|
||||
return nil, fmt.Errorf("file storage is not enabled")
|
||||
}
|
||||
return os.OpenFile(fs.file, os.O_RDWR|os.O_CREATE, 0755)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) Writer() (io.WriteCloser, error) {
|
||||
if !fs.Enabled() {
|
||||
return nil, fmt.Errorf("file storage is not enabled")
|
||||
}
|
||||
return os.OpenFile(fs.file, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
|
||||
}
|
||||
|
||||
func (fs *FileStorage) Enabled() bool {
|
||||
return len(fs.file) > 0
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package theme_test
|
||||
|
||||
import (
|
||||
"github.com/neilotoole/slogt"
|
||||
"testing"
|
||||
"testing/fstest"
|
||||
|
||||
@@ -13,7 +14,7 @@ func TestList(t *testing.T) {
|
||||
fstest.MapFS{
|
||||
"theme1.html": &fstest.MapFile{},
|
||||
"inner/theme2.html": &fstest.MapFile{},
|
||||
})
|
||||
}, slogt.New(t))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
package theme
|
||||
|
||||
import (
|
||||
"html/template"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func ParseTemplatesFS(f fs.FS, t *template.Template) error {
|
||||
func (t *Themes) ParseTemplatesFS(f fs.FS) error {
|
||||
err := fs.WalkDir(f, ".", func(path string, d fs.DirEntry, err error) error {
|
||||
if strings.Contains(path, ".html") {
|
||||
log.Tracef("found template %s", path)
|
||||
_, err = t.ParseFS(f, path)
|
||||
t.l.Info("theme found", slog.String("path", path))
|
||||
_, err = t.themes.ParseFS(f, path)
|
||||
if err != nil {
|
||||
t.l.Info("cannot add theme", slog.String("path", path), slog.Any("reason", err))
|
||||
return err
|
||||
}
|
||||
log.Tracef("successfully added template %s", path)
|
||||
|
||||
t.l.Info("successfully added theme", slog.String("path", path))
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
@@ -3,6 +3,8 @@ package theme_test
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/neilotoole/slogt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"testing"
|
||||
"testing/fstest"
|
||||
@@ -65,7 +67,7 @@ func TestThemes_Render(t *testing.T) {
|
||||
version.Version = "1.0.0"
|
||||
themes, err := theme.NewWithCustomThemes(fstest.MapFS{
|
||||
"inner/custom-theme.html": &fstest.MapFile{Data: []byte(customTheme)},
|
||||
})
|
||||
}, slogt.New(t))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@@ -179,7 +181,7 @@ func ExampleThemes_Render() {
|
||||
version.Version = "1.0.0"
|
||||
themes, err := theme.NewWithCustomThemes(fstest.MapFS{
|
||||
"inner/custom-theme.html": &fstest.MapFile{Data: []byte(customTheme)},
|
||||
})
|
||||
}, slog.Default())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -4,8 +4,7 @@ import (
|
||||
"embed"
|
||||
"html/template"
|
||||
"io/fs"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
// List of built-it themes
|
||||
@@ -15,38 +14,41 @@ var embeddedThemesFS embed.FS
|
||||
|
||||
type Themes struct {
|
||||
themes *template.Template
|
||||
l *slog.Logger
|
||||
}
|
||||
|
||||
func New() (*Themes, error) {
|
||||
func New(logger *slog.Logger) (*Themes, error) {
|
||||
themes := &Themes{
|
||||
themes: template.New("root"),
|
||||
l: logger,
|
||||
}
|
||||
|
||||
err := ParseTemplatesFS(embeddedThemesFS, themes.themes)
|
||||
err := themes.ParseTemplatesFS(embeddedThemesFS)
|
||||
if err != nil {
|
||||
// Should never happen
|
||||
log.Errorf("could not parse embedded templates: %v", err)
|
||||
logger.Error("could not parse embedded templates", slog.Any("reason", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return themes, nil
|
||||
}
|
||||
|
||||
func NewWithCustomThemes(custom fs.FS) (*Themes, error) {
|
||||
func NewWithCustomThemes(custom fs.FS, logger *slog.Logger) (*Themes, error) {
|
||||
themes := &Themes{
|
||||
themes: template.New("root"),
|
||||
l: logger,
|
||||
}
|
||||
|
||||
err := ParseTemplatesFS(embeddedThemesFS, themes.themes)
|
||||
err := themes.ParseTemplatesFS(embeddedThemesFS)
|
||||
if err != nil {
|
||||
// Should never happen
|
||||
log.Errorf("could not parse embedded templates: %v", err)
|
||||
logger.Error("could not parse embedded templates", slog.Any("reason", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = ParseTemplatesFS(custom, themes.themes)
|
||||
err = themes.ParseTemplatesFS(custom)
|
||||
if err != nil {
|
||||
log.Errorf("could not parse custom templates: %v", err)
|
||||
logger.Error("could not parse custom templates", slog.Any("reason", err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -2,12 +2,12 @@ package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/sablierapp/sablier/config"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
@@ -69,7 +69,7 @@ It provides an integrations with multiple reverse proxies and different loading
|
||||
viper.BindPFlag("sessions.expiration-interval", startCmd.Flags().Lookup("sessions.expiration-interval"))
|
||||
|
||||
// logging level
|
||||
rootCmd.PersistentFlags().StringVar(&conf.Logging.Level, "logging.level", log.InfoLevel.String(), "The logging level. Can be one of [panic, fatal, error, warn, info, debug, trace]")
|
||||
rootCmd.PersistentFlags().StringVar(&conf.Logging.Level, "logging.level", strings.ToLower(slog.LevelInfo.String()), "The logging level. Can be one of [panic, fatal, error, warn, info, debug, trace]")
|
||||
viper.BindPFlag("logging.level", rootCmd.PersistentFlags().Lookup("logging.level"))
|
||||
|
||||
// strategy
|
||||
|
||||
@@ -2,6 +2,7 @@ package config
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Logging struct {
|
||||
@@ -10,6 +11,6 @@ type Logging struct {
|
||||
|
||||
func NewLoggingConfig() Logging {
|
||||
return Logging{
|
||||
Level: slog.LevelInfo.String(),
|
||||
Level: strings.ToLower(slog.LevelInfo.String()),
|
||||
}
|
||||
}
|
||||
|
||||
13
go.mod
13
go.mod
@@ -10,13 +10,16 @@ require (
|
||||
github.com/docker/docker v27.4.1+incompatible
|
||||
github.com/gavv/httpexpect/v2 v2.15.0
|
||||
github.com/gin-gonic/gin v1.10.0
|
||||
github.com/google/go-cmp v0.6.0
|
||||
github.com/lmittmann/tint v1.0.7
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/samber/slog-gin v1.14.1
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/spf13/cobra v1.8.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.19.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/testcontainers/testcontainers-go v0.35.0
|
||||
github.com/testcontainers/testcontainers-go/modules/valkey v0.35.0
|
||||
github.com/tniswong/go.rfcx v0.0.0-20181019234604-07783c52761f
|
||||
github.com/valkey-io/valkey-go v1.0.53
|
||||
go.uber.org/mock v0.5.0
|
||||
@@ -29,6 +32,7 @@ require (
|
||||
|
||||
require (
|
||||
dario.cat/mergo v1.0.0 // indirect
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 // indirect
|
||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
|
||||
github.com/Microsoft/go-winio v0.6.2 // indirect
|
||||
github.com/ajg/form v1.5.1 // indirect
|
||||
@@ -67,7 +71,6 @@ require (
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/google/gnostic-models v0.6.8 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/google/go-querystring v1.1.0 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
@@ -98,6 +101,7 @@ require (
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/neilotoole/slogt v1.1.0 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.1.0 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
||||
@@ -109,13 +113,12 @@ require (
|
||||
github.com/sergi/go-diff v1.0.0 // indirect
|
||||
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
|
||||
github.com/shoenig/go-m1cpu v0.1.6 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
github.com/sourcegraph/conc v0.3.0 // indirect
|
||||
github.com/spf13/afero v1.11.0 // indirect
|
||||
github.com/spf13/cast v1.6.0 // indirect
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
github.com/testcontainers/testcontainers-go v0.35.0 // indirect
|
||||
github.com/testcontainers/testcontainers-go/modules/valkey v0.35.0 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.14 // indirect
|
||||
github.com/tklauser/numcpus v0.8.0 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
@@ -141,14 +144,12 @@ require (
|
||||
golang.org/x/arch v0.8.0 // indirect
|
||||
golang.org/x/crypto v0.31.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
|
||||
golang.org/x/mod v0.19.0 // indirect
|
||||
golang.org/x/net v0.33.0 // indirect
|
||||
golang.org/x/oauth2 v0.21.0 // indirect
|
||||
golang.org/x/sys v0.28.0 // indirect
|
||||
golang.org/x/term v0.27.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.23.0 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
|
||||
30
go.sum
30
go.sum
@@ -1,11 +1,9 @@
|
||||
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
|
||||
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk=
|
||||
github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
|
||||
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
|
||||
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||
github.com/acouvreur/httpexpect/v2 v2.16.0 h1:FGXaR9jt6IQMXxpqbM8YpX7EEvyERU0Lps3ooEc/gk8=
|
||||
@@ -32,19 +30,17 @@ github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GK
|
||||
github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
|
||||
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
|
||||
github.com/davecgh/go-spew v0.0.0-20161028175848-04cdfd42973b/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
|
||||
github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
||||
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||
github.com/docker/docker v27.4.1+incompatible h1:ZJvcY7gfwHn1JF48PfbyXg7Jyt9ZCWDW+GGXOIxEwp4=
|
||||
github.com/docker/docker v27.4.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
|
||||
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
|
||||
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
||||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
@@ -136,8 +132,6 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
|
||||
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
@@ -153,6 +147,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||
github.com/lmittmann/tint v1.0.7 h1:D/0OqWZ0YOGZ6AyC+5Y2kD8PBEzBk6rFHVSfOqCkF9Y=
|
||||
github.com/lmittmann/tint v1.0.7/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
|
||||
github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae h1:dIZY4ULFcto4tAFlj1FYZl8ztUZ13bdq+PLY+NOfbyI=
|
||||
github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k=
|
||||
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
|
||||
@@ -189,6 +185,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/neilotoole/slogt v1.1.0 h1:c7qE92sq+V0yvCuaxph+RQ2jOKL61c4hqS1Bv9W7FZE=
|
||||
github.com/neilotoole/slogt v1.1.0/go.mod h1:RCrGXkPc/hYybNulqQrMHRtvlQ7F6NktNVLuLwk6V+w=
|
||||
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
|
||||
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
|
||||
@@ -197,8 +195,6 @@ github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
|
||||
github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
|
||||
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
|
||||
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
|
||||
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
@@ -229,6 +225,8 @@ github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/i
|
||||
github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
|
||||
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
|
||||
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
|
||||
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
|
||||
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
|
||||
@@ -304,8 +302,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
|
||||
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg=
|
||||
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
|
||||
@@ -342,8 +338,6 @@ golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbR
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8=
|
||||
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@@ -405,8 +399,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 h1:
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5/go.mod h1:RGnPtTG7r4i8sPlNyDeikXF99hMM+hN6QMm4ooG9g2g=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 h1:AgADTJarZTBqgjiUzRgfaBchgYB3/WFTC80GPwsMcRI=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
|
||||
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
|
||||
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
|
||||
google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA=
|
||||
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
|
||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
@@ -2,6 +2,7 @@ package api
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/neilotoole/slogt"
|
||||
"github.com/sablierapp/sablier/app/http/routes"
|
||||
"github.com/sablierapp/sablier/app/sessions/sessionstest"
|
||||
"github.com/sablierapp/sablier/app/theme"
|
||||
@@ -17,7 +18,7 @@ func NewApiTest(t *testing.T) (app *gin.Engine, router *gin.RouterGroup, strateg
|
||||
t.Helper()
|
||||
gin.SetMode(gin.TestMode)
|
||||
ctrl := gomock.NewController(t)
|
||||
th, err := theme.New()
|
||||
th, err := theme.New(slogt.New(t))
|
||||
assert.NilError(t, err)
|
||||
|
||||
app = gin.New()
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"github.com/sablierapp/sablier/app/instance"
|
||||
"github.com/sablierapp/sablier/app/sessions"
|
||||
"github.com/sablierapp/sablier/app/theme"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -44,9 +43,9 @@ func StartDynamic(router *gin.RouterGroup, s *routes.ServeStrategy) {
|
||||
var sessionState *sessions.SessionState
|
||||
var err error
|
||||
if len(request.Names) > 0 {
|
||||
sessionState, err = s.SessionsManager.RequestSession(request.Names, request.SessionDuration)
|
||||
sessionState, err = s.SessionsManager.RequestSession(c, request.Names, request.SessionDuration)
|
||||
} else {
|
||||
sessionState, err = s.SessionsManager.RequestSessionGroup(request.Group, request.SessionDuration)
|
||||
sessionState, err = s.SessionsManager.RequestSessionGroup(c, request.Group, request.SessionDuration)
|
||||
var groupNotFoundError sessions.ErrGroupNotFound
|
||||
if errors.As(err, &groupNotFoundError) {
|
||||
AbortWithProblemDetail(c, ProblemGroupNotFound(groupNotFoundError))
|
||||
@@ -93,7 +92,6 @@ func StartDynamic(router *gin.RouterGroup, s *routes.ServeStrategy) {
|
||||
|
||||
func sessionStateToRenderOptionsInstanceState(sessionState *sessions.SessionState) (instances []theme.Instance) {
|
||||
if sessionState == nil {
|
||||
log.Warnf("sessionStateToRenderOptionsInstanceState: sessionState is nil")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -108,7 +106,7 @@ func sessionStateToRenderOptionsInstanceState(sessionState *sessions.SessionStat
|
||||
return
|
||||
}
|
||||
|
||||
func instanceStateToRenderOptionsRequestState(instanceState *instance.State) theme.Instance {
|
||||
func instanceStateToRenderOptionsRequestState(instanceState instance.State) theme.Instance {
|
||||
|
||||
var err error
|
||||
if instanceState.Message == "" {
|
||||
|
||||
@@ -17,11 +17,11 @@ func session() *sessions.SessionState {
|
||||
return &sessions.SessionState{
|
||||
Instances: map[string]sessions.InstanceState{
|
||||
"test": {
|
||||
Instance: &state,
|
||||
Instance: state,
|
||||
Error: nil,
|
||||
},
|
||||
"test2": {
|
||||
Instance: &state2,
|
||||
Instance: state2,
|
||||
Error: nil,
|
||||
},
|
||||
},
|
||||
@@ -53,7 +53,7 @@ func TestStartDynamic(t *testing.T) {
|
||||
t.Run("StartDynamicThemeNotFound", func(t *testing.T) {
|
||||
app, router, strategy, m := NewApiTest(t)
|
||||
StartDynamic(router, strategy)
|
||||
m.EXPECT().RequestSessionGroup("test", gomock.Any()).Return(session(), nil)
|
||||
m.EXPECT().RequestSessionGroup(gomock.Any(), "test", gomock.Any()).Return(session(), nil)
|
||||
r := PerformRequest(app, "GET", "/api/strategies/dynamic?group=test&theme=invalid")
|
||||
assert.Equal(t, http.StatusNotFound, r.Code)
|
||||
assert.Equal(t, rfc7807.JSONMediaType, r.Header().Get("Content-Type"))
|
||||
@@ -61,7 +61,7 @@ func TestStartDynamic(t *testing.T) {
|
||||
t.Run("StartDynamicByNames", func(t *testing.T) {
|
||||
app, router, strategy, m := NewApiTest(t)
|
||||
StartDynamic(router, strategy)
|
||||
m.EXPECT().RequestSession([]string{"test"}, gomock.Any()).Return(session(), nil)
|
||||
m.EXPECT().RequestSession(gomock.Any(), []string{"test"}, gomock.Any()).Return(session(), nil)
|
||||
r := PerformRequest(app, "GET", "/api/strategies/dynamic?names=test")
|
||||
assert.Equal(t, http.StatusOK, r.Code)
|
||||
assert.Equal(t, SablierStatusReady, r.Header().Get(SablierStatusHeader))
|
||||
@@ -69,7 +69,7 @@ func TestStartDynamic(t *testing.T) {
|
||||
t.Run("StartDynamicByGroup", func(t *testing.T) {
|
||||
app, router, strategy, m := NewApiTest(t)
|
||||
StartDynamic(router, strategy)
|
||||
m.EXPECT().RequestSessionGroup("test", gomock.Any()).Return(session(), nil)
|
||||
m.EXPECT().RequestSessionGroup(gomock.Any(), "test", gomock.Any()).Return(session(), nil)
|
||||
r := PerformRequest(app, "GET", "/api/strategies/dynamic?group=test")
|
||||
assert.Equal(t, http.StatusOK, r.Code)
|
||||
assert.Equal(t, SablierStatusReady, r.Header().Get(SablierStatusHeader))
|
||||
@@ -77,7 +77,7 @@ func TestStartDynamic(t *testing.T) {
|
||||
t.Run("StartDynamicErrGroupNotFound", func(t *testing.T) {
|
||||
app, router, strategy, m := NewApiTest(t)
|
||||
StartDynamic(router, strategy)
|
||||
m.EXPECT().RequestSessionGroup("test", gomock.Any()).Return(nil, sessions.ErrGroupNotFound{
|
||||
m.EXPECT().RequestSessionGroup(gomock.Any(), "test", gomock.Any()).Return(nil, sessions.ErrGroupNotFound{
|
||||
Group: "test",
|
||||
AvailableGroups: []string{"test1", "test2"},
|
||||
})
|
||||
@@ -88,7 +88,7 @@ func TestStartDynamic(t *testing.T) {
|
||||
t.Run("StartDynamicError", func(t *testing.T) {
|
||||
app, router, strategy, m := NewApiTest(t)
|
||||
StartDynamic(router, strategy)
|
||||
m.EXPECT().RequestSessionGroup("test", gomock.Any()).Return(nil, errors.New("unknown error"))
|
||||
m.EXPECT().RequestSessionGroup(gomock.Any(), "test", gomock.Any()).Return(nil, errors.New("unknown error"))
|
||||
r := PerformRequest(app, "GET", "/api/strategies/dynamic?group=test")
|
||||
assert.Equal(t, http.StatusInternalServerError, r.Code)
|
||||
assert.Equal(t, rfc7807.JSONMediaType, r.Header().Get("Content-Type"))
|
||||
@@ -96,7 +96,7 @@ func TestStartDynamic(t *testing.T) {
|
||||
t.Run("StartDynamicSessionNil", func(t *testing.T) {
|
||||
app, router, strategy, m := NewApiTest(t)
|
||||
StartDynamic(router, strategy)
|
||||
m.EXPECT().RequestSessionGroup("test", gomock.Any()).Return(nil, nil)
|
||||
m.EXPECT().RequestSessionGroup(gomock.Any(), "test", gomock.Any()).Return(nil, nil)
|
||||
r := PerformRequest(app, "GET", "/api/strategies/dynamic?group=test")
|
||||
assert.Equal(t, http.StatusInternalServerError, r.Code)
|
||||
assert.Equal(t, rfc7807.JSONMediaType, r.Header().Get("Content-Type"))
|
||||
|
||||
@@ -42,7 +42,9 @@ func Start(ctx context.Context, logger *slog.Logger, serverConf config.Server, s
|
||||
|
||||
logger.Info("starting ",
|
||||
slog.String("listen", server.Addr),
|
||||
slog.Duration("startup", time.Since(start)))
|
||||
slog.Duration("startup", time.Since(start)),
|
||||
slog.String("mode", gin.Mode()),
|
||||
)
|
||||
|
||||
go StartHttp(server, logger)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user