mirror of
https://github.com/sablierapp/sablier.git
synced 2025-12-21 21:33:06 +01:00
refactor(events): close the channel from the sender
This commit is contained in:
@@ -41,6 +41,8 @@ func (s *SessionsManagerMock) SaveSessions(io.WriteCloser) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SessionsManagerMock) Stop() {}
|
||||||
|
|
||||||
func TestServeStrategy_ServeDynamic(t *testing.T) {
|
func TestServeStrategy_ServeDynamic(t *testing.T) {
|
||||||
type arg struct {
|
type arg struct {
|
||||||
body models.DynamicRequest
|
body models.DynamicRequest
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ func (provider *DockerClassicProvider) GetState(name string) (instance.State, er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *DockerClassicProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
|
func (provider *DockerClassicProvider) NotifyInsanceStopped(ctx context.Context, instance chan<- string) {
|
||||||
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
|
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
|
||||||
Filters: filters.NewArgs(
|
Filters: filters.NewArgs(
|
||||||
filters.Arg("scope", "local"),
|
filters.Arg("scope", "local"),
|
||||||
@@ -125,11 +125,9 @@ func (provider *DockerClassicProvider) NotifyInsanceStopped(ctx context.Context,
|
|||||||
case err := <-errs:
|
case err := <-errs:
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
log.Debug("provider event stream closed")
|
log.Debug("provider event stream closed")
|
||||||
close(instance)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
close(instance)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -427,15 +427,16 @@ func TestDockerClassicProvider_NotifyInsanceStopped(t *testing.T) {
|
|||||||
desiredReplicas: 1,
|
desiredReplicas: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
instanceC := make(chan string)
|
instanceC := make(chan string, 1)
|
||||||
|
|
||||||
provider.NotifyInsanceStopped(context.Background(), instanceC)
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
provider.NotifyInsanceStopped(ctx, instanceC)
|
||||||
|
|
||||||
var got []string
|
var got []string
|
||||||
|
|
||||||
for i := range instanceC {
|
got = append(got, <-instanceC)
|
||||||
got = append(got, i)
|
cancel()
|
||||||
}
|
close(instanceC)
|
||||||
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want)
|
t.Errorf("NotifyInsanceStopped() = %v, want %v", got, tt.want)
|
||||||
|
|||||||
@@ -124,7 +124,7 @@ func (provider *DockerSwarmProvider) getInstanceName(name string, service swarm.
|
|||||||
return fmt.Sprintf("%s (%s)", name, service.Spec.Name)
|
return fmt.Sprintf("%s (%s)", name, service.Spec.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
|
func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, instance chan<- string) {
|
||||||
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
|
msgs, errs := provider.Client.Events(ctx, types.EventsOptions{
|
||||||
Filters: filters.NewArgs(
|
Filters: filters.NewArgs(
|
||||||
filters.Arg("scope", "swarm"),
|
filters.Arg("scope", "swarm"),
|
||||||
@@ -143,11 +143,9 @@ func (provider *DockerSwarmProvider) NotifyInsanceStopped(ctx context.Context, i
|
|||||||
case err := <-errs:
|
case err := <-errs:
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
log.Debug("provider event stream closed")
|
log.Debug("provider event stream closed")
|
||||||
close(instance)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
close(instance)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -166,5 +166,5 @@ func (provider *KubernetesProvider) getStatefulsetState(config *Config) (instanc
|
|||||||
return instance.NotReadyInstanceState(config.OriginalName, int(ss.Status.ReadyReplicas), int(config.Replicas))
|
return instance.NotReadyInstanceState(config.OriginalName, int(ss.Status.ReadyReplicas), int(config.Replicas))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *KubernetesProvider) NotifyInsanceStopped(ctx context.Context, instance chan string) {
|
func (provider *KubernetesProvider) NotifyInsanceStopped(ctx context.Context, instance chan<- string) {
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ type Provider interface {
|
|||||||
Stop(name string) (instance.State, error)
|
Stop(name string) (instance.State, error)
|
||||||
GetState(name string) (instance.State, error)
|
GetState(name string) (instance.State, error)
|
||||||
|
|
||||||
NotifyInsanceStopped(ctx context.Context, instance chan string)
|
NotifyInsanceStopped(ctx context.Context, instance chan<- string)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProvider(config config.Provider) (Provider, error) {
|
func NewProvider(config config.Provider) (Provider, error) {
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ func Start(conf config.Config) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
sessionsManager := sessions.NewSessionsManager(store, provider)
|
sessionsManager := sessions.NewSessionsManager(store, provider)
|
||||||
|
defer sessionsManager.Stop()
|
||||||
|
|
||||||
if storage.Enabled() {
|
if storage.Enabled() {
|
||||||
defer saveSessions(storage, sessionsManager)
|
defer saveSessions(storage, sessionsManager)
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ func NewProviderMock(stoppedInstances []string) *ProviderMock {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *ProviderMock) NotifyInsanceStopped(ctx context.Context, instance chan string) {
|
func (provider *ProviderMock) NotifyInsanceStopped(ctx context.Context, instance chan<- string) {
|
||||||
go func() {
|
go func() {
|
||||||
defer close(instance)
|
defer close(instance)
|
||||||
for i := 0; i < len(provider.stoppedInstances); i++ {
|
for i := 0; i < len(provider.stoppedInstances); i++ {
|
||||||
|
|||||||
@@ -20,12 +20,17 @@ type Manager interface {
|
|||||||
|
|
||||||
LoadSessions(io.ReadCloser) error
|
LoadSessions(io.ReadCloser) error
|
||||||
SaveSessions(io.WriteCloser) error
|
SaveSessions(io.WriteCloser) error
|
||||||
|
|
||||||
|
Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
type SessionsManager struct {
|
type SessionsManager struct {
|
||||||
|
events context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
|
||||||
store tinykv.KV[instance.State]
|
store tinykv.KV[instance.State]
|
||||||
provider providers.Provider
|
provider providers.Provider
|
||||||
insanceStopped chan string
|
instanceStopped chan string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSessionsManager(store tinykv.KV[instance.State], provider providers.Provider) Manager {
|
func NewSessionsManager(store tinykv.KV[instance.State], provider providers.Provider) Manager {
|
||||||
@@ -41,12 +46,15 @@ func NewSessionsManager(store tinykv.KV[instance.State], provider providers.Prov
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
provider.NotifyInsanceStopped(context.Background(), instanceStopped)
|
events, cancel := context.WithCancel(context.Background())
|
||||||
|
provider.NotifyInsanceStopped(events, instanceStopped)
|
||||||
|
|
||||||
return &SessionsManager{
|
return &SessionsManager{
|
||||||
|
events: events,
|
||||||
|
cancel: cancel,
|
||||||
store: store,
|
store: store,
|
||||||
provider: provider,
|
provider: provider,
|
||||||
insanceStopped: instanceStopped,
|
instanceStopped: instanceStopped,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,9 +140,6 @@ func (s *SessionsManager) requestSessionInstance(name string, duration time.Dura
|
|||||||
|
|
||||||
requestState, exists := s.store.Get(name)
|
requestState, exists := s.store.Get(name)
|
||||||
|
|
||||||
// Trust the stored value
|
|
||||||
// TODO: Provider background check on the store
|
|
||||||
// Via polling or whatever
|
|
||||||
if !exists {
|
if !exists {
|
||||||
log.Debugf("starting %s...", name)
|
log.Debugf("starting %s...", name)
|
||||||
|
|
||||||
@@ -212,6 +217,17 @@ func (s *SessionsManager) ExpiresAfter(instance *instance.State, duration time.D
|
|||||||
s.store.Put(instance.Name, *instance, duration)
|
s.store.Put(instance.Name, *instance, duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SessionsManager) Stop() {
|
||||||
|
// Stop event listeners
|
||||||
|
s.cancel()
|
||||||
|
|
||||||
|
// Stop receiving stopped instance
|
||||||
|
close(s.instanceStopped)
|
||||||
|
|
||||||
|
// Stop the store
|
||||||
|
s.store.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SessionState) MarshalJSON() ([]byte, error) {
|
func (s *SessionState) MarshalJSON() ([]byte, error) {
|
||||||
instances := []InstanceState{}
|
instances := []InstanceState{}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user