feat(strategies): add blocking strategy

This commit is contained in:
Alexis Couvreur
2022-10-28 15:14:44 +00:00
parent 56001a0164
commit 6eb0789b7b
3 changed files with 47 additions and 7 deletions

View File

@@ -61,12 +61,18 @@ func (s *ServeStrategy) ServeDynamic(c *gin.Context) {
func (s *ServeStrategy) ServeBlocking(c *gin.Context) {
request := models.BlockingRequest{}
if err := c.BindUri(&request); err != nil {
if err := c.ShouldBind(&request); err != nil {
c.AbortWithError(http.StatusBadRequest, err)
return
}
sessionState := s.SessionsManager.RequestReadySession(request.Names, request.SessionDuration, request.Timeout)
sessionState, err := s.SessionsManager.RequestReadySession(request.Names, request.SessionDuration, request.Timeout)
if err != nil {
c.Header("X-Sablier-Session-Status", "not-ready")
c.JSON(http.StatusGatewayTimeout, map[string]interface{}{"error": err.Error()})
return
}
if sessionState.IsReady() {
c.Header("X-Sablier-Session-Status", "ready")
@@ -74,6 +80,7 @@ func (s *ServeStrategy) ServeBlocking(c *gin.Context) {
c.Header("X-Sablier-Session-Status", "not-ready")
}
c.JSON(http.StatusOK, sessionState)
}
func sessionStateToRenderOptionsInstanceState(sessionState *sessions.SessionState) (instances []pages.RenderOptionsInstanceState) {

View File

@@ -26,8 +26,8 @@ func (s *SessionsManagerMock) RequestSession(names []string, duration time.Durat
return &s.SessionState
}
func (s *SessionsManagerMock) RequestReadySession(names []string, duration time.Duration, timeout time.Duration) *sessions.SessionState {
return &s.SessionState
func (s *SessionsManagerMock) RequestReadySession(names []string, duration time.Duration, timeout time.Duration) (*sessions.SessionState, error) {
return &s.SessionState, nil
}
func (s *SessionsManagerMock) LoadSessions(io.ReadCloser) error {

View File

@@ -2,6 +2,7 @@ package sessions
import (
"encoding/json"
"fmt"
"io"
"sync"
"time"
@@ -14,7 +15,7 @@ import (
type Manager interface {
RequestSession(names []string, duration time.Duration) *SessionState
RequestReadySession(names []string, duration time.Duration, timeout time.Duration) *SessionState
RequestReadySession(names []string, duration time.Duration, timeout time.Duration) (*SessionState, error)
LoadSessions(io.ReadCloser) error
SaveSessions(io.WriteCloser) error
@@ -139,8 +140,40 @@ func (s *SessionsManager) requestSessionInstance(name string, duration time.Dura
return &requestState, nil
}
func (s *SessionsManager) RequestReadySession(names []string, duration time.Duration, timeout time.Duration) *SessionState {
return s.RequestSession(names, duration)
func (s *SessionsManager) RequestReadySession(names []string, duration time.Duration, timeout time.Duration) (*SessionState, error) {
session := s.RequestSession(names, duration)
if session.IsReady() {
return session, nil
}
ticker := time.NewTicker(5 * time.Second)
readiness := make(chan *SessionState)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
session := s.RequestSession(names, duration)
if session.IsReady() {
readiness <- session
}
case <-quit:
ticker.Stop()
return
}
}
}()
select {
case status := <-readiness:
close(quit)
return status, nil
case <-time.After(timeout):
close(quit)
return nil, fmt.Errorf("session was not ready after %s", timeout.String())
}
}
func (s *SessionsManager) ExpiresAfter(instance *instance.State, duration time.Duration) {