From 5992f79eb9dc2e8139aefd10c5b0aed74b6a76fb Mon Sep 17 00:00:00 2001 From: Alexis Couvreur Date: Sat, 4 Jan 2025 10:41:35 -0500 Subject: [PATCH] add problem detail response for errors --- Makefile | 3 ++ cmd/sablier/main.go | 14 ++++++- internal/api/problemdetail.go | 33 ++++++++++++++++ internal/api/start_blocking.go | 69 +++++++++++++++++++++++++++------- internal/api/start_dynamic.go | 9 ++++- internal/api/themes.go | 45 ++++++++++++++++++++-- internal/server/routes.go | 3 ++ pkg/provider/docker/docker.go | 5 --- pkg/provider/docker/events.go | 68 ++++++++++++++++++++++++++++++++- pkg/provider/docker/list.go | 14 +++++-- pkg/sablier/instance.go | 11 +++--- pkg/sablier/sablier.go | 41 ++++++++++++++++---- pkg/sablier/sessions.go | 29 +++++++------- pkg/sablier/start_instance.go | 19 +++++----- 14 files changed, 296 insertions(+), 67 deletions(-) create mode 100644 internal/api/problemdetail.go diff --git a/Makefile b/Makefile index 202fc61..370a534 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ GO_LDFLAGS := -s -w -X $(VPREFIX).Branch=$(GIT_BRANCH) -X $(VPREFIX).Version=$(V $(PLATFORMS): CGO_ENABLED=0 GOOS=$(os) GOARCH=$(arch) go build -trimpath -tags=nomsgpack -v -ldflags="${GO_LDFLAGS}" -o 'sablier_$(VERSION)_$(os)-$(arch)' . +run: + go run cmd/sablier/main.go + build: mockery go build -v . diff --git a/cmd/sablier/main.go b/cmd/sablier/main.go index 4e8392c..d243c31 100644 --- a/cmd/sablier/main.go +++ b/cmd/sablier/main.go @@ -2,8 +2,10 @@ package main import ( "context" + "github.com/docker/docker/client" "github.com/rs/zerolog" "github.com/sablierapp/sablier/internal/server" + "github.com/sablierapp/sablier/pkg/provider/docker" "github.com/sablierapp/sablier/pkg/sablier" "os/signal" "syscall" @@ -15,7 +17,17 @@ func main() { zerolog.SetGlobalLevel(zerolog.TraceLevel) - s := sablier.NewSablier(ctx, nil) + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + panic(err) + } + + p, err := docker.NewDockerProvider(cli) + if err != nil { + panic(err) + } + + s := sablier.NewSablier(ctx, p) server.Start(ctx, s) } diff --git a/internal/api/problemdetail.go b/internal/api/problemdetail.go new file mode 100644 index 0000000..18c7bb7 --- /dev/null +++ b/internal/api/problemdetail.go @@ -0,0 +1,33 @@ +package api + +import ( + "github.com/gin-gonic/gin" + "net/http" +) + +type ProblemDetail struct { + // Type is a unique error code + Type string `json:"type,omitempty"` + // Title is a human-readable error message + Title string `json:"title,omitempty"` + // Status is the HTTP Status code + Status int `json:"status,omitempty"` + // Detail is a human-readable error description + Detail string `json:"detail,omitempty"` + error error +} + +func ValidationError(err error) ProblemDetail { + return ProblemDetail{ + Type: "validation-error", + Title: "Bad Request", + Status: http.StatusBadRequest, + Detail: err.Error(), + error: err, + } +} + +func AbortWithProblemDetail(c *gin.Context, p ProblemDetail) { + _ = c.Error(p.error) + c.IndentedJSON(p.Status, p) +} diff --git a/internal/api/start_blocking.go b/internal/api/start_blocking.go index c81c25e..0e42e0a 100644 --- a/internal/api/start_blocking.go +++ b/internal/api/start_blocking.go @@ -1,51 +1,94 @@ package api import ( + "errors" + "fmt" "github.com/gin-gonic/gin" - "github.com/rs/zerolog/log" "github.com/sablierapp/sablier/pkg/sablier" "net/http" "time" ) type BlockingRequest struct { - Names []string `form:"names"` - Group string `form:"group"` - SessionDuration time.Duration `form:"session_duration"` - Timeout time.Duration `form:"timeout"` + // Names are the instances names on your provider. + // - Container name for docker + // - Service name for docker swarm + // - Deployment or StatefulSet name for Kubernetes + // - etc. + // + // Deprecated: Please use Group instead. + Names []string `form:"names"` + + // Group is + Group string `form:"group"` + SessionDuration time.Duration `form:"session_duration"` + Timeout time.Duration `form:"timeout"` + ConsiderReadyAfter time.Duration `form:"consider_ready_after"` + DesiredReplicas uint32 `form:"desired_replicas"` } func StartBlocking(router *gin.RouterGroup, s *sablier.Sablier) { router.GET("/blocking", func(c *gin.Context) { request := BlockingRequest{ - // Timeout: s.StrategyConfig.Blocking.DefaultTimeout, + SessionDuration: 10 * time.Second, + Timeout: 30 * time.Second, + ConsiderReadyAfter: 0, } + // Validation if err := c.ShouldBind(&request); err != nil { - log.Err(err).Msg("error binding request") - _ = c.AbortWithError(http.StatusBadRequest, err) + AbortWithProblemDetail(c, ValidationError(fmt.Errorf("could not bind request: %w", err))) return } + if len(request.Names) == 0 && request.Group == "" { + AbortWithProblemDetail(c, ValidationError(errors.New("'names' or 'group' query parameter must be set"))) + return + } + + if len(request.Names) > 0 && request.Group != "" { + AbortWithProblemDetail(c, ValidationError(errors.New("'names' and 'group' query parameters are both set, only one must be set"))) + return + } + + // Build instance config + var instances []sablier.InstanceConfig + if request.Group != "" { + i, ok := s.GetGroup(request.Group) + if !ok { + AbortWithProblemDetail(c, ValidationError(fmt.Errorf("group name [%s] does not exist", request.Group))) + return + } + instances = i + } else { + instances = make([]sablier.InstanceConfig, 0, len(request.Names)) + for i := 0; i < len(instances); i++ { + instances[i] = sablier.InstanceConfig{ + Name: request.Names[i], + DesiredReplicas: request.DesiredReplicas, + } + } + } + opts := sablier.StartSessionOptions{ Wait: true, StartOptions: sablier.StartOptions{ DesiredReplicas: 1, ExpiresAfter: request.SessionDuration, - ConsiderReadyAfter: 0, + ConsiderReadyAfter: request.ConsiderReadyAfter, Timeout: request.Timeout, }, } - session, err := s.StartSessionByGroup(c, request.Group, opts) + // Call + session, err := s.StartSession(c, instances, opts) if err != nil { - log.Err(err).Msg("error starting session") - _ = c.AbortWithError(http.StatusBadRequest, err) + AbortWithProblemDetail(c, ValidationError(err)) return } AddSablierHeader(c, session) - c.JSON(http.StatusOK, map[string]interface{}{"session": session}) + c.IndentedJSON(http.StatusOK, map[string]interface{}{"session": session}) }) } diff --git a/internal/api/start_dynamic.go b/internal/api/start_dynamic.go index b2b49d0..ae1be61 100644 --- a/internal/api/start_dynamic.go +++ b/internal/api/start_dynamic.go @@ -7,17 +7,22 @@ import ( "time" ) +// TODO: Add missing theme customization type DynamicRequest struct { Names []string `form:"names"` Group string `form:"group"` SessionDuration time.Duration `form:"session_duration"` Timeout time.Duration `form:"timeout"` + Theme string `form:"theme"` } func StartDynamic(router *gin.RouterGroup, s *sablier.Sablier) { router.GET("/dynamic", func(c *gin.Context) { - request := BlockingRequest{ + request := DynamicRequest{ // Timeout: s.StrategyConfig.Blocking.DefaultTimeout, + Group: "", + SessionDuration: 10 * time.Second, + Timeout: 30 * time.Second, } if err := c.ShouldBind(&request); err != nil { @@ -35,7 +40,7 @@ func StartDynamic(router *gin.RouterGroup, s *sablier.Sablier) { }, } - session, err := s.StartSessionByGroup(c, request.Group, opts) + session, err := s.StartSessionByNames(c, request.Names, opts) if err != nil { _ = c.AbortWithError(http.StatusBadRequest, err) return diff --git a/internal/api/themes.go b/internal/api/themes.go index 0152084..c6a8a9d 100644 --- a/internal/api/themes.go +++ b/internal/api/themes.go @@ -1,19 +1,58 @@ package api import ( + "errors" "github.com/gin-gonic/gin" "github.com/sablierapp/sablier/pkg/sablier" + "github.com/sablierapp/sablier/pkg/theme" "net/http" + "time" ) func GetThemes(router *gin.RouterGroup, s *sablier.Sablier) { router.GET("/themes", func(c *gin.Context) { - c.JSON(http.StatusOK, s.Groups()) + c.JSON(http.StatusOK, map[string]interface{}{"themes": s.Theme.List()}) }) } func PreviewTheme(router *gin.RouterGroup, s *sablier.Sablier) { - router.GET("/themes/", func(c *gin.Context) { - c.JSON(http.StatusOK, s.Groups()) + router.GET("/themes/:theme", func(c *gin.Context) { + t := c.Param("theme") + + opts := theme.Options{ + DisplayName: "Preview Theme", + ShowDetails: true, + InstanceStates: []theme.Instance{ + { + Name: "preview-ready", + Status: "ready", + Error: nil, + CurrentReplicas: 1, + DesiredReplicas: 1, + }, + { + Name: "preview-starting", + Status: "not-ready", + Error: nil, + CurrentReplicas: 0, + DesiredReplicas: 1, + }, + { + Name: "preview-error", + Status: "error", + Error: errors.New("container does not exist"), + CurrentReplicas: 0, + DesiredReplicas: 0, + }, + }, + SessionDuration: 10 * time.Minute, + RefreshFrequency: 10 * time.Second, + } + + err := s.Theme.Render(t, opts, c.Writer) + if err != nil { + c.AbortWithError(500, err) + return + } }) } diff --git a/internal/server/routes.go b/internal/server/routes.go index bec6f54..39d3ffb 100644 --- a/internal/server/routes.go +++ b/internal/server/routes.go @@ -16,4 +16,7 @@ func registerRoutes(router *gin.Engine, s *sablier.Sablier) { api.StartBlocking(APIv1, s) api.StartDynamic(APIv1, s) + api.GetGroups(APIv1, s) + api.GetThemes(APIv1, s) + api.PreviewTheme(APIv1, s) } diff --git a/pkg/provider/docker/docker.go b/pkg/provider/docker/docker.go index 6661be2..3735058 100644 --- a/pkg/provider/docker/docker.go +++ b/pkg/provider/docker/docker.go @@ -42,8 +42,3 @@ func NewDockerProvider(cli *client.Client) (*DockerProvider, error) { log: logger, }, nil } - -func (d *DockerProvider) Events(ctx context.Context) (<-chan sablier.Message, <-chan error) { - //TODO implement me - panic("implement me") -} diff --git a/pkg/provider/docker/events.go b/pkg/provider/docker/events.go index 4a97f96..55c402e 100644 --- a/pkg/provider/docker/events.go +++ b/pkg/provider/docker/events.go @@ -9,6 +9,70 @@ import ( "time" ) +func (d *DockerProvider) Events(ctx context.Context) (<-chan sablier.Message, <-chan error) { + ch := make(chan sablier.Message) + errCh := make(chan error) + started := make(chan struct{}) + + go func() { + defer close(ch) + msgs, errs := d.Client.Events(ctx, events.ListOptions{ + Filters: filters.NewArgs( + filters.Arg("scope", "local"), + filters.Arg("type", string(events.ContainerEventType)), + ), + }) + + close(started) + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + return + case msg, ok := <-msgs: + if !ok { + errCh <- fmt.Errorf("events channel closed") + return + } + d.log.Trace().Any("event", msg).Msg("event received") + e, ignore := d.parseEvent(msg) + if !ignore { + ch <- e + } + case err, ok := <-errs: + if !ok { + errCh <- fmt.Errorf("events channel closed") + return + } + errCh <- err + return + } + } + }() + <-started + + return ch, errCh +} + +func (d *DockerProvider) parseEvent(message events.Message) (sablier.Message, bool) { + switch message.Action { + case events.ActionStart: + return sablier.Message{ + Instance: sablier.InstanceConfig{}, + Action: "", + }, false + case events.ActionHealthStatusHealthy: + case events.ActionCreate: + case events.ActionDestroy: + case events.ActionDie: + case events.ActionDelete: + case events.ActionKill: + + } + + return sablier.Message{}, true +} + func (d *DockerProvider) AfterReady(ctx context.Context, name string) <-chan error { ch := make(chan error, 1) started := make(chan struct{}) @@ -24,10 +88,10 @@ func (d *DockerProvider) AfterReady(ctx context.Context, name string) <-chan err action := events.ActionStart if c.Config.Healthcheck != nil { - d.log.Trace().Str("name", name).Msg("container has healthcheck, will be waiting for \"health_status: healthy\"") + d.log.Trace().Str("name", c.Name).Msg("container has healthcheck, will be waiting for \"health_status: healthy\"") action = events.ActionHealthStatusHealthy } else { - d.log.Trace().Str("name", name).Msg("container has no healthcheck, will be waiting for \"start\"") + d.log.Trace().Str("name", c.Name).Msg("container has no healthcheck, will be waiting for \"start\"") } ready := d.afterAction(ctx, name, action) diff --git a/pkg/provider/docker/list.go b/pkg/provider/docker/list.go index 70bea4d..4b8214c 100644 --- a/pkg/provider/docker/list.go +++ b/pkg/provider/docker/list.go @@ -2,7 +2,6 @@ package docker import ( "context" - "fmt" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/sablierapp/sablier/pkg/provider" @@ -12,7 +11,6 @@ import ( func (d *DockerProvider) List(ctx context.Context, opts provider.ListOptions) ([]sablier.InstanceConfig, error) { args := filters.NewArgs() args.Add("label", "sablier.enable") - args.Add("label", "sablier.enable=true") found, err := d.Client.ContainerList(ctx, container.ListOptions{ Filters: args, @@ -22,10 +20,18 @@ func (d *DockerProvider) List(ctx context.Context, opts provider.ListOptions) ([ return nil, err } - fmt.Printf("found %d containers\n", len(found)) + // d.log.Trace().Msgf("found [%d] containers", len(found)) infos := make([]sablier.InstanceConfig, 0, len(found)) for _, c := range found { - fmt.Printf("container: %v", c) + // d.log.Trace().Any("container", c).Msg("container details") + registered, ok := c.Labels["sablier.enable"] + if !ok { + continue + } + if !(registered == "" || registered == "true" || registered == "yes") { + continue + } + group, ok := c.Labels["sablier.group"] if !ok || group == "" { group = FormatName(c.Names[0]) // Group defaults to the container name diff --git a/pkg/sablier/instance.go b/pkg/sablier/instance.go index f8c2faa..3adc5cf 100644 --- a/pkg/sablier/instance.go +++ b/pkg/sablier/instance.go @@ -16,11 +16,12 @@ const ( type InstanceInfo struct { // The Name of the targeted container, service, deployment // of which the state is being represented - Name string - CurrentReplicas uint32 - DesiredReplicas uint32 - Status InstanceStatus - StartedAt time.Time + Name string `json:"name"` + CurrentReplicas uint32 `json:"currentReplicas"` + DesiredReplicas uint32 `json:"desiredReplicas"` + Status InstanceStatus `json:"status"` + StartedAt time.Time `json:"startedAt"` + ExpiresAt time.Time `json:"expiresAt"` } type InstanceConfig struct { diff --git a/pkg/sablier/sablier.go b/pkg/sablier/sablier.go index c55da89..b620d7a 100644 --- a/pkg/sablier/sablier.go +++ b/pkg/sablier/sablier.go @@ -2,18 +2,21 @@ package sablier import ( "context" + "github.com/rs/zerolog" + "github.com/sablierapp/sablier/pkg/theme" "maps" + "os" "slices" "sync" "time" "github.com/sablierapp/sablier/pkg/promise" "github.com/sablierapp/sablier/pkg/tinykv" - log "github.com/sirupsen/logrus" ) type Sablier struct { Provider Provider + Theme *theme.Themes promises map[string]*promise.Promise[InstanceInfo] pmu *sync.RWMutex @@ -21,9 +24,15 @@ type Sablier struct { gmu *sync.RWMutex expirations tinykv.KV[string] + + log zerolog.Logger } func NewSablier(ctx context.Context, provider Provider) *Sablier { + logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}). + With().Timestamp(). + Logger() + pmu := &sync.RWMutex{} promises := make(map[string]*promise.Promise[InstanceInfo]) @@ -33,23 +42,33 @@ func NewSablier(ctx context.Context, provider Provider) *Sablier { expirations := tinykv.New(time.Second, func(k string, _ string) { pmu.Lock() defer pmu.Unlock() - log.Printf("instance [%s] expired - removing from promises", k) + logger.Trace().Str("instance", k).Msg("instance expired") err := provider.Stop(ctx, k) if err != nil { - log.Printf("error stopping instance [%s]: %v", k, err) + logger.Error().Str("instance", k).Err(err).Msg("error stopping instance") } delete(promises, k) }) + // TODO: This should be through the constructor + t, err := theme.New() + if err != nil { + panic(err) + } + s := &Sablier{ Provider: provider, + Theme: t, promises: promises, pmu: pmu, groups: groups, gmu: gmu, expirations: expirations, + log: logger, } + go s.updateGroups(ctx) + go s.WatchGroups(ctx, time.Second*5) go s.stop(ctx) return s @@ -67,20 +86,26 @@ func (s *Sablier) RegisteredInstances() []string { } func (s *Sablier) SetGroups(groups map[string][]InstanceConfig) { + if groups == nil { + return + } s.gmu.Lock() defer s.gmu.Unlock() s.groups = groups } func (s *Sablier) GetGroup(group string) ([]InstanceConfig, bool) { - s.gmu.Lock() - defer s.gmu.Unlock() + s.gmu.RLock() + defer s.gmu.RUnlock() instances, ok := s.groups[group] return instances, ok } func (s *Sablier) Groups() []string { - s.gmu.Lock() - defer s.gmu.Unlock() - return slices.Collect(maps.Keys(s.groups)) + s.gmu.RLock() + defer s.gmu.RUnlock() + m := s.groups + k := maps.Keys(m) + sl := slices.Collect(k) + return sl } diff --git a/pkg/sablier/sessions.go b/pkg/sablier/sessions.go index 7da41bf..71757dd 100644 --- a/pkg/sablier/sessions.go +++ b/pkg/sablier/sessions.go @@ -20,14 +20,14 @@ const ( ) type InstanceInfoWithError struct { - Error error + Error error `json:"error,omitempty"` InstanceInfo } type SessionInfo struct { - Instances []InstanceInfoWithError - Status SessionStatus + Instances []InstanceInfoWithError `json:"instances"` + Status SessionStatus `json:"status"` } func (s *Sablier) NewSessionInfo(ctx context.Context, promises map[string]*promise.Promise[InstanceInfo]) SessionInfo { @@ -82,7 +82,7 @@ func (s *Sablier) NewSessionInfo(ctx context.Context, promises map[string]*promi func (s *Sablier) StartSessionByNames(ctx context.Context, names []string, opts StartSessionOptions) (SessionInfo, error) { if len(names) == 0 { - return SessionInfo{}, errors.New("no names") + return SessionInfo{}, errors.New("at least one name is required") } promises := make(map[string]*promise.Promise[InstanceInfo], len(names)) @@ -100,20 +100,19 @@ func (s *Sablier) StartSessionByNames(ctx context.Context, names []string, opts return s.NewSessionInfo(ctx, promises), nil } -func (s *Sablier) StartSessionByGroup(ctx context.Context, name string, opts StartSessionOptions) (SessionInfo, error) { - if len(name) == 0 { - return SessionInfo{}, errors.New("group name is mandatory") - } - - instances, ok := s.GetGroup(name) - if !ok { - return SessionInfo{}, errors.New("group not found") +func (s *Sablier) StartSession(ctx context.Context, instances []InstanceConfig, opts StartSessionOptions) (SessionInfo, error) { + if len(instances) == 0 { + return SessionInfo{}, errors.New("at least one name is required") } promises := make(map[string]*promise.Promise[InstanceInfo], len(instances)) - for _, instance := range instances { - // TODO: Merge start options with the one defined in the InstanceConfig - promises[instance.Name] = s.StartInstance(instance.Name, opts.StartOptions) + for _, conf := range instances { + promises[conf.Name] = s.StartInstance(conf.Name, StartOptions{ + DesiredReplicas: conf.DesiredReplicas, + ExpiresAfter: opts.ExpiresAfter, + ConsiderReadyAfter: opts.ConsiderReadyAfter, + Timeout: opts.Timeout, + }) } if opts.Wait { diff --git a/pkg/sablier/start_instance.go b/pkg/sablier/start_instance.go index 04dd1bb..66e62b3 100644 --- a/pkg/sablier/start_instance.go +++ b/pkg/sablier/start_instance.go @@ -2,7 +2,6 @@ package sablier import ( "context" - "log" "time" "github.com/sablierapp/sablier/pkg/promise" @@ -20,28 +19,28 @@ type StartOptions struct { func (s *Sablier) StartInstance(name string, opts StartOptions) *promise.Promise[InstanceInfo] { s.pmu.Lock() defer s.pmu.Unlock() - log.Printf("request to start instance [%v] received", name) + s.log.Trace().Str("instance", name).Msg("request to start instance received") // If there is an ongoing request, return it // If the last request was rejected, recreate one pr, ok := s.promises[name] if ok && pr.Pending() { - log.Printf("request to start instance [%v] is already in progress", name) + s.log.Trace().Str("instance", name).Msg("request to start instance is already in progress") return pr } if ok && pr.Fulfilled() { - log.Printf("instance [%s] will expire after [%v]", name, opts.ExpiresAfter) + s.log.Trace().Str("instance", name).Dur("expiration", opts.ExpiresAfter).Msgf("instance will expire after [%v]", opts.ExpiresAfter) err := s.expirations.Put(name, name, opts.ExpiresAfter) if err != nil { - log.Printf("failed to refresh instance [%v]: %v", name, err) + s.log.Warn().Err(err).Str("instance", name).Msg("failed to refresh instance") } return pr } // Otherwise, create a new request pr = s.startInstancePromise(name, opts) - log.Printf("request to start instance [%v] created", name) + s.log.Trace().Str("instance", name).Msg("request to start instance created") s.promises[name] = pr return pr @@ -60,6 +59,8 @@ func (s *Sablier) startInstancePromise(name string, opts StartOptions) *promise. CurrentReplicas: opts.DesiredReplicas, // Current replicas are assumed DesiredReplicas: opts.DesiredReplicas, Status: InstanceReady, + StartedAt: time.Now(), + ExpiresAt: time.Now().Add(opts.ExpiresAfter), } resolve(started) }) @@ -69,16 +70,16 @@ func (s *Sablier) startInstance(name string, opts StartOptions) error { ctx, cancel := context.WithTimeout(context.Background(), opts.Timeout) defer cancel() - log.Printf("starting instance [%s]", name) + s.log.Trace().Str("instance", name).Msg("starting instance") err := s.Provider.Start(ctx, name, provider.StartOptions{ DesiredReplicas: opts.DesiredReplicas, ConsiderReadyAfter: opts.ConsiderReadyAfter, }) if err != nil { - log.Printf("instance [%s] could not be started: %v", name, err) + s.log.Trace().Str("instance", name).Err(err).Msg("instance could not be started") return err } - log.Printf("instance [%s] will expire after [%v]", name, opts.ExpiresAfter) + s.log.Trace().Str("instance", name).Dur("expiration", opts.ExpiresAfter).Msgf("instance will expire after [%v]", opts.ExpiresAfter) return s.expirations.Put(name, name, opts.ExpiresAfter) }