Files
sablier/app/sessions/sessions_manager.go
Alexis Couvreur 00cc153d7a fix: add http error responses (#494)
This features adds rfc7807 Problem detail responses when an error happens processing a request.

This will greatly improve the common issues  with "blank pages" and "404 pages" issues which should now properly tell the user what input was wrong (group that does not exist, container name that does not exist, etc.)
2025-02-02 00:00:49 -05:00

334 lines
7.9 KiB
Go

package sessions
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"maps"
"slices"
"sync"
"time"
"github.com/sablierapp/sablier/app/instance"
"github.com/sablierapp/sablier/app/providers"
"github.com/sablierapp/sablier/pkg/tinykv"
log "github.com/sirupsen/logrus"
)
const defaultRefreshFrequency = 2 * time.Second
//go:generate mockgen -package sessionstest -source=sessions_manager.go -destination=sessionstest/mocks_sessions_manager.go *
type Manager interface {
RequestSession(names []string, duration time.Duration) (*SessionState, error)
RequestSessionGroup(group string, duration time.Duration) (*SessionState, error)
RequestReadySession(ctx context.Context, names []string, duration time.Duration, timeout time.Duration) (*SessionState, error)
RequestReadySessionGroup(ctx context.Context, group string, duration time.Duration, timeout time.Duration) (*SessionState, error)
LoadSessions(io.ReadCloser) error
SaveSessions(io.WriteCloser) error
Stop()
}
type SessionsManager struct {
ctx context.Context
cancel context.CancelFunc
store tinykv.KV[instance.State]
provider providers.Provider
groups map[string][]string
}
func NewSessionsManager(store tinykv.KV[instance.State], provider providers.Provider) Manager {
ctx, cancel := context.WithCancel(context.Background())
groups, err := provider.GetGroups(ctx)
if err != nil {
groups = make(map[string][]string)
log.Warn("could not get groups", err)
}
sm := &SessionsManager{
ctx: ctx,
cancel: cancel,
store: store,
provider: provider,
groups: groups,
}
sm.initWatchers()
return sm
}
func (sm *SessionsManager) initWatchers() {
updateGroups := make(chan map[string][]string)
go watchGroups(sm.ctx, sm.provider, defaultRefreshFrequency, updateGroups)
go sm.consumeGroups(updateGroups)
instanceStopped := make(chan string)
go sm.provider.NotifyInstanceStopped(sm.ctx, instanceStopped)
go sm.consumeInstanceStopped(instanceStopped)
}
func (sm *SessionsManager) consumeGroups(receive chan map[string][]string) {
for groups := range receive {
sm.groups = groups
}
}
func (sm *SessionsManager) consumeInstanceStopped(instanceStopped chan string) {
for instance := range instanceStopped {
// Will delete from the store containers that have been stop either by external sources
// or by the internal expiration loop, if the deleted entry does not exist, it doesn't matter
log.Debugf("received event instance %s is stopped, removing from store", instance)
sm.store.Delete(instance)
}
}
func (sm *SessionsManager) LoadSessions(reader io.ReadCloser) error {
defer reader.Close()
return json.NewDecoder(reader).Decode(sm.store)
}
func (sm *SessionsManager) SaveSessions(writer io.WriteCloser) error {
defer writer.Close()
encoder := json.NewEncoder(writer)
encoder.SetEscapeHTML(false)
encoder.SetIndent("", " ")
return encoder.Encode(sm.store)
}
type InstanceState struct {
Instance *instance.State `json:"instance"`
Error error `json:"error"`
}
type SessionState struct {
Instances *sync.Map
}
func (s *SessionState) IsReady() bool {
ready := true
if s.Instances == nil {
s.Instances = &sync.Map{}
}
s.Instances.Range(func(key, value interface{}) bool {
state := value.(InstanceState)
if state.Error != nil || state.Instance.Status != instance.Ready {
ready = false
return false
}
return true
})
return ready
}
func (s *SessionState) Status() string {
if s.IsReady() {
return "ready"
}
return "not-ready"
}
func (s *SessionsManager) RequestSession(names []string, duration time.Duration) (sessionState *SessionState, err error) {
if len(names) == 0 {
return nil, fmt.Errorf("names cannot be empty")
}
var wg sync.WaitGroup
sessionState = &SessionState{
Instances: &sync.Map{},
}
wg.Add(len(names))
for i := 0; i < len(names); i++ {
go func(name string) {
defer wg.Done()
state, err := s.requestSessionInstance(name, duration)
sessionState.Instances.Store(name, InstanceState{
Instance: state,
Error: err,
})
}(names[i])
}
wg.Wait()
return sessionState, nil
}
func (s *SessionsManager) RequestSessionGroup(group string, duration time.Duration) (sessionState *SessionState, err error) {
if len(group) == 0 {
return nil, fmt.Errorf("group is mandatory")
}
names, ok := s.groups[group]
if !ok {
return nil, ErrGroupNotFound{
Group: group,
AvailableGroups: slices.Collect(maps.Keys(s.groups)),
}
}
if len(names) == 0 {
return nil, fmt.Errorf("group has no member")
}
return s.RequestSession(names, duration)
}
func (s *SessionsManager) requestSessionInstance(name string, duration time.Duration) (*instance.State, error) {
if name == "" {
return nil, errors.New("instance name cannot be empty")
}
requestState, exists := s.store.Get(name)
if !exists {
log.Debugf("starting [%s]...", name)
err := s.provider.Start(s.ctx, name)
if err != nil {
return nil, err
}
state, err := s.provider.GetState(s.ctx, name)
if err != nil {
return nil, err
}
requestState.Name = name
requestState.CurrentReplicas = state.CurrentReplicas
requestState.DesiredReplicas = state.DesiredReplicas
requestState.Status = state.Status
requestState.Message = state.Message
log.Debugf("status for [%s]=[%s]", name, requestState.Status)
} else if requestState.Status != instance.Ready {
log.Debugf("checking [%s]...", name)
state, err := s.provider.GetState(s.ctx, name)
if err != nil {
return nil, err
}
requestState.Name = state.Name
requestState.CurrentReplicas = state.CurrentReplicas
requestState.DesiredReplicas = state.DesiredReplicas
requestState.Status = state.Status
requestState.Message = state.Message
log.Debugf("status for %s=%s", name, requestState.Status)
}
log.Debugf("expiring %+v in %v", requestState, duration)
// Refresh the duration
s.ExpiresAfter(&requestState, duration)
return &requestState, nil
}
func (s *SessionsManager) RequestReadySession(ctx context.Context, names []string, duration time.Duration, timeout time.Duration) (*SessionState, error) {
session, err := s.RequestSession(names, duration)
if err != nil {
return nil, err
}
if session.IsReady() {
return session, nil
}
ticker := time.NewTicker(5 * time.Second)
readiness := make(chan *SessionState)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
session, err := s.RequestSession(names, duration)
if err != nil {
return
}
if session.IsReady() {
readiness <- session
}
case <-quit:
ticker.Stop()
return
}
}
}()
select {
case <-ctx.Done():
log.Debug("request cancelled by user, stopping timeout")
close(quit)
return nil, fmt.Errorf("request cancelled by user")
case status := <-readiness:
close(quit)
return status, nil
case <-time.After(timeout):
close(quit)
return nil, fmt.Errorf("session was not ready after %s", timeout.String())
}
}
func (s *SessionsManager) RequestReadySessionGroup(ctx context.Context, group string, duration time.Duration, timeout time.Duration) (sessionState *SessionState, err error) {
if len(group) == 0 {
return nil, fmt.Errorf("group is mandatory")
}
names, ok := s.groups[group]
if !ok {
return nil, ErrGroupNotFound{
Group: group,
AvailableGroups: slices.Collect(maps.Keys(s.groups)),
}
}
if len(names) == 0 {
return nil, fmt.Errorf("group has no member")
}
return s.RequestReadySession(ctx, names, duration, timeout)
}
func (s *SessionsManager) ExpiresAfter(instance *instance.State, duration time.Duration) {
s.store.Put(instance.Name, *instance, duration)
}
func (s *SessionsManager) Stop() {
// Stop event listeners
s.cancel()
// Stop the store
s.store.Stop()
}
func (s *SessionState) MarshalJSON() ([]byte, error) {
instances := []InstanceState{}
s.Instances.Range(func(key, value interface{}) bool {
state := value.(InstanceState)
instances = append(instances, state)
return true
})
return json.Marshal(map[string]any{
"instances": instances,
"status": s.Status(),
})
}