mirror of
https://github.com/sablierapp/sablier.git
synced 2025-12-27 23:46:36 +01:00
add problem detail response for errors
This commit is contained in:
3
Makefile
3
Makefile
@@ -17,6 +17,9 @@ GO_LDFLAGS := -s -w -X $(VPREFIX).Branch=$(GIT_BRANCH) -X $(VPREFIX).Version=$(V
|
||||
$(PLATFORMS):
|
||||
CGO_ENABLED=0 GOOS=$(os) GOARCH=$(arch) go build -trimpath -tags=nomsgpack -v -ldflags="${GO_LDFLAGS}" -o 'sablier_$(VERSION)_$(os)-$(arch)' .
|
||||
|
||||
run:
|
||||
go run cmd/sablier/main.go
|
||||
|
||||
build:
|
||||
mockery
|
||||
go build -v .
|
||||
|
||||
@@ -2,8 +2,10 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/sablierapp/sablier/internal/server"
|
||||
"github.com/sablierapp/sablier/pkg/provider/docker"
|
||||
"github.com/sablierapp/sablier/pkg/sablier"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
@@ -15,7 +17,17 @@ func main() {
|
||||
|
||||
zerolog.SetGlobalLevel(zerolog.TraceLevel)
|
||||
|
||||
s := sablier.NewSablier(ctx, nil)
|
||||
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
p, err := docker.NewDockerProvider(cli)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s := sablier.NewSablier(ctx, p)
|
||||
|
||||
server.Start(ctx, s)
|
||||
}
|
||||
|
||||
33
internal/api/problemdetail.go
Normal file
33
internal/api/problemdetail.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type ProblemDetail struct {
|
||||
// Type is a unique error code
|
||||
Type string `json:"type,omitempty"`
|
||||
// Title is a human-readable error message
|
||||
Title string `json:"title,omitempty"`
|
||||
// Status is the HTTP Status code
|
||||
Status int `json:"status,omitempty"`
|
||||
// Detail is a human-readable error description
|
||||
Detail string `json:"detail,omitempty"`
|
||||
error error
|
||||
}
|
||||
|
||||
func ValidationError(err error) ProblemDetail {
|
||||
return ProblemDetail{
|
||||
Type: "validation-error",
|
||||
Title: "Bad Request",
|
||||
Status: http.StatusBadRequest,
|
||||
Detail: err.Error(),
|
||||
error: err,
|
||||
}
|
||||
}
|
||||
|
||||
func AbortWithProblemDetail(c *gin.Context, p ProblemDetail) {
|
||||
_ = c.Error(p.error)
|
||||
c.IndentedJSON(p.Status, p)
|
||||
}
|
||||
@@ -1,51 +1,94 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/sablierapp/sablier/pkg/sablier"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BlockingRequest struct {
|
||||
Names []string `form:"names"`
|
||||
Group string `form:"group"`
|
||||
SessionDuration time.Duration `form:"session_duration"`
|
||||
Timeout time.Duration `form:"timeout"`
|
||||
// Names are the instances names on your provider.
|
||||
// - Container name for docker
|
||||
// - Service name for docker swarm
|
||||
// - Deployment or StatefulSet name for Kubernetes
|
||||
// - etc.
|
||||
//
|
||||
// Deprecated: Please use Group instead.
|
||||
Names []string `form:"names"`
|
||||
|
||||
// Group is
|
||||
Group string `form:"group"`
|
||||
SessionDuration time.Duration `form:"session_duration"`
|
||||
Timeout time.Duration `form:"timeout"`
|
||||
ConsiderReadyAfter time.Duration `form:"consider_ready_after"`
|
||||
DesiredReplicas uint32 `form:"desired_replicas"`
|
||||
}
|
||||
|
||||
func StartBlocking(router *gin.RouterGroup, s *sablier.Sablier) {
|
||||
router.GET("/blocking", func(c *gin.Context) {
|
||||
request := BlockingRequest{
|
||||
// Timeout: s.StrategyConfig.Blocking.DefaultTimeout,
|
||||
SessionDuration: 10 * time.Second,
|
||||
Timeout: 30 * time.Second,
|
||||
ConsiderReadyAfter: 0,
|
||||
}
|
||||
|
||||
// Validation
|
||||
if err := c.ShouldBind(&request); err != nil {
|
||||
log.Err(err).Msg("error binding request")
|
||||
_ = c.AbortWithError(http.StatusBadRequest, err)
|
||||
AbortWithProblemDetail(c, ValidationError(fmt.Errorf("could not bind request: %w", err)))
|
||||
return
|
||||
}
|
||||
|
||||
if len(request.Names) == 0 && request.Group == "" {
|
||||
AbortWithProblemDetail(c, ValidationError(errors.New("'names' or 'group' query parameter must be set")))
|
||||
return
|
||||
}
|
||||
|
||||
if len(request.Names) > 0 && request.Group != "" {
|
||||
AbortWithProblemDetail(c, ValidationError(errors.New("'names' and 'group' query parameters are both set, only one must be set")))
|
||||
return
|
||||
}
|
||||
|
||||
// Build instance config
|
||||
var instances []sablier.InstanceConfig
|
||||
if request.Group != "" {
|
||||
i, ok := s.GetGroup(request.Group)
|
||||
if !ok {
|
||||
AbortWithProblemDetail(c, ValidationError(fmt.Errorf("group name [%s] does not exist", request.Group)))
|
||||
return
|
||||
}
|
||||
instances = i
|
||||
} else {
|
||||
instances = make([]sablier.InstanceConfig, 0, len(request.Names))
|
||||
for i := 0; i < len(instances); i++ {
|
||||
instances[i] = sablier.InstanceConfig{
|
||||
Name: request.Names[i],
|
||||
DesiredReplicas: request.DesiredReplicas,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opts := sablier.StartSessionOptions{
|
||||
Wait: true,
|
||||
StartOptions: sablier.StartOptions{
|
||||
DesiredReplicas: 1,
|
||||
ExpiresAfter: request.SessionDuration,
|
||||
ConsiderReadyAfter: 0,
|
||||
ConsiderReadyAfter: request.ConsiderReadyAfter,
|
||||
Timeout: request.Timeout,
|
||||
},
|
||||
}
|
||||
|
||||
session, err := s.StartSessionByGroup(c, request.Group, opts)
|
||||
// Call
|
||||
session, err := s.StartSession(c, instances, opts)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error starting session")
|
||||
_ = c.AbortWithError(http.StatusBadRequest, err)
|
||||
AbortWithProblemDetail(c, ValidationError(err))
|
||||
return
|
||||
}
|
||||
|
||||
AddSablierHeader(c, session)
|
||||
|
||||
c.JSON(http.StatusOK, map[string]interface{}{"session": session})
|
||||
c.IndentedJSON(http.StatusOK, map[string]interface{}{"session": session})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -7,17 +7,22 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// TODO: Add missing theme customization
|
||||
type DynamicRequest struct {
|
||||
Names []string `form:"names"`
|
||||
Group string `form:"group"`
|
||||
SessionDuration time.Duration `form:"session_duration"`
|
||||
Timeout time.Duration `form:"timeout"`
|
||||
Theme string `form:"theme"`
|
||||
}
|
||||
|
||||
func StartDynamic(router *gin.RouterGroup, s *sablier.Sablier) {
|
||||
router.GET("/dynamic", func(c *gin.Context) {
|
||||
request := BlockingRequest{
|
||||
request := DynamicRequest{
|
||||
// Timeout: s.StrategyConfig.Blocking.DefaultTimeout,
|
||||
Group: "",
|
||||
SessionDuration: 10 * time.Second,
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
if err := c.ShouldBind(&request); err != nil {
|
||||
@@ -35,7 +40,7 @@ func StartDynamic(router *gin.RouterGroup, s *sablier.Sablier) {
|
||||
},
|
||||
}
|
||||
|
||||
session, err := s.StartSessionByGroup(c, request.Group, opts)
|
||||
session, err := s.StartSessionByNames(c, request.Names, opts)
|
||||
if err != nil {
|
||||
_ = c.AbortWithError(http.StatusBadRequest, err)
|
||||
return
|
||||
|
||||
@@ -1,19 +1,58 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/sablierapp/sablier/pkg/sablier"
|
||||
"github.com/sablierapp/sablier/pkg/theme"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
func GetThemes(router *gin.RouterGroup, s *sablier.Sablier) {
|
||||
router.GET("/themes", func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, s.Groups())
|
||||
c.JSON(http.StatusOK, map[string]interface{}{"themes": s.Theme.List()})
|
||||
})
|
||||
}
|
||||
|
||||
func PreviewTheme(router *gin.RouterGroup, s *sablier.Sablier) {
|
||||
router.GET("/themes/", func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, s.Groups())
|
||||
router.GET("/themes/:theme", func(c *gin.Context) {
|
||||
t := c.Param("theme")
|
||||
|
||||
opts := theme.Options{
|
||||
DisplayName: "Preview Theme",
|
||||
ShowDetails: true,
|
||||
InstanceStates: []theme.Instance{
|
||||
{
|
||||
Name: "preview-ready",
|
||||
Status: "ready",
|
||||
Error: nil,
|
||||
CurrentReplicas: 1,
|
||||
DesiredReplicas: 1,
|
||||
},
|
||||
{
|
||||
Name: "preview-starting",
|
||||
Status: "not-ready",
|
||||
Error: nil,
|
||||
CurrentReplicas: 0,
|
||||
DesiredReplicas: 1,
|
||||
},
|
||||
{
|
||||
Name: "preview-error",
|
||||
Status: "error",
|
||||
Error: errors.New("container does not exist"),
|
||||
CurrentReplicas: 0,
|
||||
DesiredReplicas: 0,
|
||||
},
|
||||
},
|
||||
SessionDuration: 10 * time.Minute,
|
||||
RefreshFrequency: 10 * time.Second,
|
||||
}
|
||||
|
||||
err := s.Theme.Render(t, opts, c.Writer)
|
||||
if err != nil {
|
||||
c.AbortWithError(500, err)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -16,4 +16,7 @@ func registerRoutes(router *gin.Engine, s *sablier.Sablier) {
|
||||
|
||||
api.StartBlocking(APIv1, s)
|
||||
api.StartDynamic(APIv1, s)
|
||||
api.GetGroups(APIv1, s)
|
||||
api.GetThemes(APIv1, s)
|
||||
api.PreviewTheme(APIv1, s)
|
||||
}
|
||||
|
||||
@@ -42,8 +42,3 @@ func NewDockerProvider(cli *client.Client) (*DockerProvider, error) {
|
||||
log: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *DockerProvider) Events(ctx context.Context) (<-chan sablier.Message, <-chan error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@@ -9,6 +9,70 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func (d *DockerProvider) Events(ctx context.Context) (<-chan sablier.Message, <-chan error) {
|
||||
ch := make(chan sablier.Message)
|
||||
errCh := make(chan error)
|
||||
started := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
msgs, errs := d.Client.Events(ctx, events.ListOptions{
|
||||
Filters: filters.NewArgs(
|
||||
filters.Arg("scope", "local"),
|
||||
filters.Arg("type", string(events.ContainerEventType)),
|
||||
),
|
||||
})
|
||||
|
||||
close(started)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
errCh <- ctx.Err()
|
||||
return
|
||||
case msg, ok := <-msgs:
|
||||
if !ok {
|
||||
errCh <- fmt.Errorf("events channel closed")
|
||||
return
|
||||
}
|
||||
d.log.Trace().Any("event", msg).Msg("event received")
|
||||
e, ignore := d.parseEvent(msg)
|
||||
if !ignore {
|
||||
ch <- e
|
||||
}
|
||||
case err, ok := <-errs:
|
||||
if !ok {
|
||||
errCh <- fmt.Errorf("events channel closed")
|
||||
return
|
||||
}
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
<-started
|
||||
|
||||
return ch, errCh
|
||||
}
|
||||
|
||||
func (d *DockerProvider) parseEvent(message events.Message) (sablier.Message, bool) {
|
||||
switch message.Action {
|
||||
case events.ActionStart:
|
||||
return sablier.Message{
|
||||
Instance: sablier.InstanceConfig{},
|
||||
Action: "",
|
||||
}, false
|
||||
case events.ActionHealthStatusHealthy:
|
||||
case events.ActionCreate:
|
||||
case events.ActionDestroy:
|
||||
case events.ActionDie:
|
||||
case events.ActionDelete:
|
||||
case events.ActionKill:
|
||||
|
||||
}
|
||||
|
||||
return sablier.Message{}, true
|
||||
}
|
||||
|
||||
func (d *DockerProvider) AfterReady(ctx context.Context, name string) <-chan error {
|
||||
ch := make(chan error, 1)
|
||||
started := make(chan struct{})
|
||||
@@ -24,10 +88,10 @@ func (d *DockerProvider) AfterReady(ctx context.Context, name string) <-chan err
|
||||
|
||||
action := events.ActionStart
|
||||
if c.Config.Healthcheck != nil {
|
||||
d.log.Trace().Str("name", name).Msg("container has healthcheck, will be waiting for \"health_status: healthy\"")
|
||||
d.log.Trace().Str("name", c.Name).Msg("container has healthcheck, will be waiting for \"health_status: healthy\"")
|
||||
action = events.ActionHealthStatusHealthy
|
||||
} else {
|
||||
d.log.Trace().Str("name", name).Msg("container has no healthcheck, will be waiting for \"start\"")
|
||||
d.log.Trace().Str("name", c.Name).Msg("container has no healthcheck, will be waiting for \"start\"")
|
||||
}
|
||||
|
||||
ready := d.afterAction(ctx, name, action)
|
||||
|
||||
@@ -2,7 +2,6 @@ package docker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/sablierapp/sablier/pkg/provider"
|
||||
@@ -12,7 +11,6 @@ import (
|
||||
func (d *DockerProvider) List(ctx context.Context, opts provider.ListOptions) ([]sablier.InstanceConfig, error) {
|
||||
args := filters.NewArgs()
|
||||
args.Add("label", "sablier.enable")
|
||||
args.Add("label", "sablier.enable=true")
|
||||
|
||||
found, err := d.Client.ContainerList(ctx, container.ListOptions{
|
||||
Filters: args,
|
||||
@@ -22,10 +20,18 @@ func (d *DockerProvider) List(ctx context.Context, opts provider.ListOptions) ([
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Printf("found %d containers\n", len(found))
|
||||
// d.log.Trace().Msgf("found [%d] containers", len(found))
|
||||
infos := make([]sablier.InstanceConfig, 0, len(found))
|
||||
for _, c := range found {
|
||||
fmt.Printf("container: %v", c)
|
||||
// d.log.Trace().Any("container", c).Msg("container details")
|
||||
registered, ok := c.Labels["sablier.enable"]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if !(registered == "" || registered == "true" || registered == "yes") {
|
||||
continue
|
||||
}
|
||||
|
||||
group, ok := c.Labels["sablier.group"]
|
||||
if !ok || group == "" {
|
||||
group = FormatName(c.Names[0]) // Group defaults to the container name
|
||||
|
||||
@@ -16,11 +16,12 @@ const (
|
||||
type InstanceInfo struct {
|
||||
// The Name of the targeted container, service, deployment
|
||||
// of which the state is being represented
|
||||
Name string
|
||||
CurrentReplicas uint32
|
||||
DesiredReplicas uint32
|
||||
Status InstanceStatus
|
||||
StartedAt time.Time
|
||||
Name string `json:"name"`
|
||||
CurrentReplicas uint32 `json:"currentReplicas"`
|
||||
DesiredReplicas uint32 `json:"desiredReplicas"`
|
||||
Status InstanceStatus `json:"status"`
|
||||
StartedAt time.Time `json:"startedAt"`
|
||||
ExpiresAt time.Time `json:"expiresAt"`
|
||||
}
|
||||
|
||||
type InstanceConfig struct {
|
||||
|
||||
@@ -2,18 +2,21 @@ package sablier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/sablierapp/sablier/pkg/theme"
|
||||
"maps"
|
||||
"os"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sablierapp/sablier/pkg/promise"
|
||||
"github.com/sablierapp/sablier/pkg/tinykv"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Sablier struct {
|
||||
Provider Provider
|
||||
Theme *theme.Themes
|
||||
promises map[string]*promise.Promise[InstanceInfo]
|
||||
pmu *sync.RWMutex
|
||||
|
||||
@@ -21,9 +24,15 @@ type Sablier struct {
|
||||
gmu *sync.RWMutex
|
||||
|
||||
expirations tinykv.KV[string]
|
||||
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
func NewSablier(ctx context.Context, provider Provider) *Sablier {
|
||||
logger := zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}).
|
||||
With().Timestamp().
|
||||
Logger()
|
||||
|
||||
pmu := &sync.RWMutex{}
|
||||
promises := make(map[string]*promise.Promise[InstanceInfo])
|
||||
|
||||
@@ -33,23 +42,33 @@ func NewSablier(ctx context.Context, provider Provider) *Sablier {
|
||||
expirations := tinykv.New(time.Second, func(k string, _ string) {
|
||||
pmu.Lock()
|
||||
defer pmu.Unlock()
|
||||
log.Printf("instance [%s] expired - removing from promises", k)
|
||||
logger.Trace().Str("instance", k).Msg("instance expired")
|
||||
err := provider.Stop(ctx, k)
|
||||
if err != nil {
|
||||
log.Printf("error stopping instance [%s]: %v", k, err)
|
||||
logger.Error().Str("instance", k).Err(err).Msg("error stopping instance")
|
||||
}
|
||||
delete(promises, k)
|
||||
})
|
||||
|
||||
// TODO: This should be through the constructor
|
||||
t, err := theme.New()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s := &Sablier{
|
||||
Provider: provider,
|
||||
Theme: t,
|
||||
promises: promises,
|
||||
pmu: pmu,
|
||||
groups: groups,
|
||||
gmu: gmu,
|
||||
expirations: expirations,
|
||||
log: logger,
|
||||
}
|
||||
|
||||
go s.updateGroups(ctx)
|
||||
go s.WatchGroups(ctx, time.Second*5)
|
||||
go s.stop(ctx)
|
||||
|
||||
return s
|
||||
@@ -67,20 +86,26 @@ func (s *Sablier) RegisteredInstances() []string {
|
||||
}
|
||||
|
||||
func (s *Sablier) SetGroups(groups map[string][]InstanceConfig) {
|
||||
if groups == nil {
|
||||
return
|
||||
}
|
||||
s.gmu.Lock()
|
||||
defer s.gmu.Unlock()
|
||||
s.groups = groups
|
||||
}
|
||||
|
||||
func (s *Sablier) GetGroup(group string) ([]InstanceConfig, bool) {
|
||||
s.gmu.Lock()
|
||||
defer s.gmu.Unlock()
|
||||
s.gmu.RLock()
|
||||
defer s.gmu.RUnlock()
|
||||
instances, ok := s.groups[group]
|
||||
return instances, ok
|
||||
}
|
||||
|
||||
func (s *Sablier) Groups() []string {
|
||||
s.gmu.Lock()
|
||||
defer s.gmu.Unlock()
|
||||
return slices.Collect(maps.Keys(s.groups))
|
||||
s.gmu.RLock()
|
||||
defer s.gmu.RUnlock()
|
||||
m := s.groups
|
||||
k := maps.Keys(m)
|
||||
sl := slices.Collect(k)
|
||||
return sl
|
||||
}
|
||||
|
||||
@@ -20,14 +20,14 @@ const (
|
||||
)
|
||||
|
||||
type InstanceInfoWithError struct {
|
||||
Error error
|
||||
Error error `json:"error,omitempty"`
|
||||
|
||||
InstanceInfo
|
||||
}
|
||||
|
||||
type SessionInfo struct {
|
||||
Instances []InstanceInfoWithError
|
||||
Status SessionStatus
|
||||
Instances []InstanceInfoWithError `json:"instances"`
|
||||
Status SessionStatus `json:"status"`
|
||||
}
|
||||
|
||||
func (s *Sablier) NewSessionInfo(ctx context.Context, promises map[string]*promise.Promise[InstanceInfo]) SessionInfo {
|
||||
@@ -82,7 +82,7 @@ func (s *Sablier) NewSessionInfo(ctx context.Context, promises map[string]*promi
|
||||
|
||||
func (s *Sablier) StartSessionByNames(ctx context.Context, names []string, opts StartSessionOptions) (SessionInfo, error) {
|
||||
if len(names) == 0 {
|
||||
return SessionInfo{}, errors.New("no names")
|
||||
return SessionInfo{}, errors.New("at least one name is required")
|
||||
}
|
||||
|
||||
promises := make(map[string]*promise.Promise[InstanceInfo], len(names))
|
||||
@@ -100,20 +100,19 @@ func (s *Sablier) StartSessionByNames(ctx context.Context, names []string, opts
|
||||
return s.NewSessionInfo(ctx, promises), nil
|
||||
}
|
||||
|
||||
func (s *Sablier) StartSessionByGroup(ctx context.Context, name string, opts StartSessionOptions) (SessionInfo, error) {
|
||||
if len(name) == 0 {
|
||||
return SessionInfo{}, errors.New("group name is mandatory")
|
||||
}
|
||||
|
||||
instances, ok := s.GetGroup(name)
|
||||
if !ok {
|
||||
return SessionInfo{}, errors.New("group not found")
|
||||
func (s *Sablier) StartSession(ctx context.Context, instances []InstanceConfig, opts StartSessionOptions) (SessionInfo, error) {
|
||||
if len(instances) == 0 {
|
||||
return SessionInfo{}, errors.New("at least one name is required")
|
||||
}
|
||||
|
||||
promises := make(map[string]*promise.Promise[InstanceInfo], len(instances))
|
||||
for _, instance := range instances {
|
||||
// TODO: Merge start options with the one defined in the InstanceConfig
|
||||
promises[instance.Name] = s.StartInstance(instance.Name, opts.StartOptions)
|
||||
for _, conf := range instances {
|
||||
promises[conf.Name] = s.StartInstance(conf.Name, StartOptions{
|
||||
DesiredReplicas: conf.DesiredReplicas,
|
||||
ExpiresAfter: opts.ExpiresAfter,
|
||||
ConsiderReadyAfter: opts.ConsiderReadyAfter,
|
||||
Timeout: opts.Timeout,
|
||||
})
|
||||
}
|
||||
|
||||
if opts.Wait {
|
||||
|
||||
@@ -2,7 +2,6 @@ package sablier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/sablierapp/sablier/pkg/promise"
|
||||
@@ -20,28 +19,28 @@ type StartOptions struct {
|
||||
func (s *Sablier) StartInstance(name string, opts StartOptions) *promise.Promise[InstanceInfo] {
|
||||
s.pmu.Lock()
|
||||
defer s.pmu.Unlock()
|
||||
log.Printf("request to start instance [%v] received", name)
|
||||
s.log.Trace().Str("instance", name).Msg("request to start instance received")
|
||||
|
||||
// If there is an ongoing request, return it
|
||||
// If the last request was rejected, recreate one
|
||||
pr, ok := s.promises[name]
|
||||
if ok && pr.Pending() {
|
||||
log.Printf("request to start instance [%v] is already in progress", name)
|
||||
s.log.Trace().Str("instance", name).Msg("request to start instance is already in progress")
|
||||
return pr
|
||||
}
|
||||
|
||||
if ok && pr.Fulfilled() {
|
||||
log.Printf("instance [%s] will expire after [%v]", name, opts.ExpiresAfter)
|
||||
s.log.Trace().Str("instance", name).Dur("expiration", opts.ExpiresAfter).Msgf("instance will expire after [%v]", opts.ExpiresAfter)
|
||||
err := s.expirations.Put(name, name, opts.ExpiresAfter)
|
||||
if err != nil {
|
||||
log.Printf("failed to refresh instance [%v]: %v", name, err)
|
||||
s.log.Warn().Err(err).Str("instance", name).Msg("failed to refresh instance")
|
||||
}
|
||||
return pr
|
||||
}
|
||||
|
||||
// Otherwise, create a new request
|
||||
pr = s.startInstancePromise(name, opts)
|
||||
log.Printf("request to start instance [%v] created", name)
|
||||
s.log.Trace().Str("instance", name).Msg("request to start instance created")
|
||||
s.promises[name] = pr
|
||||
|
||||
return pr
|
||||
@@ -60,6 +59,8 @@ func (s *Sablier) startInstancePromise(name string, opts StartOptions) *promise.
|
||||
CurrentReplicas: opts.DesiredReplicas, // Current replicas are assumed
|
||||
DesiredReplicas: opts.DesiredReplicas,
|
||||
Status: InstanceReady,
|
||||
StartedAt: time.Now(),
|
||||
ExpiresAt: time.Now().Add(opts.ExpiresAfter),
|
||||
}
|
||||
resolve(started)
|
||||
})
|
||||
@@ -69,16 +70,16 @@ func (s *Sablier) startInstance(name string, opts StartOptions) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), opts.Timeout)
|
||||
defer cancel()
|
||||
|
||||
log.Printf("starting instance [%s]", name)
|
||||
s.log.Trace().Str("instance", name).Msg("starting instance")
|
||||
err := s.Provider.Start(ctx, name, provider.StartOptions{
|
||||
DesiredReplicas: opts.DesiredReplicas,
|
||||
ConsiderReadyAfter: opts.ConsiderReadyAfter,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("instance [%s] could not be started: %v", name, err)
|
||||
s.log.Trace().Str("instance", name).Err(err).Msg("instance could not be started")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("instance [%s] will expire after [%v]", name, opts.ExpiresAfter)
|
||||
s.log.Trace().Str("instance", name).Dur("expiration", opts.ExpiresAfter).Msgf("instance will expire after [%v]", opts.ExpiresAfter)
|
||||
return s.expirations.Put(name, name, opts.ExpiresAfter)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user