From f7c5daf4483449848cce7ec833559bf3105aca2b Mon Sep 17 00:00:00 2001 From: Christopher LaPointe Date: Mon, 29 May 2023 21:29:55 -0400 Subject: [PATCH] Simple dependency support --- assets.go | 3 +- assets/splash.html | 2 +- assets/status.html | 23 ++++++ main.go | 3 +- pkg/service/container.go | 13 ++++ pkg/service/service.go | 147 ++++++++++++++++++++++++++++++++------- 6 files changed, 164 insertions(+), 27 deletions(-) diff --git a/assets.go b/assets.go index 6ee5713..d271b29 100644 --- a/assets.go +++ b/assets.go @@ -15,7 +15,7 @@ const httpAssetPrefix = "/__llassets/" type SplashModel struct { *service.ContainerState - Name string + Hostname string } var splashTemplate = template.Must(template.ParseFS(httpAssets, path.Join("assets", config.Model.Splash))) @@ -23,6 +23,7 @@ var splashTemplate = template.Must(template.ParseFS(httpAssets, path.Join("asset type StatusPageModel struct { Active []*service.ContainerState Qualifying []service.ContainerWrapper + Providers []service.ContainerWrapper RuntimeMetrics string } diff --git a/assets/splash.html b/assets/splash.html index 69d6d5d..a725e18 100644 --- a/assets/splash.html +++ b/assets/splash.html @@ -22,7 +22,7 @@
-

Starting {{.Name}}

+

Starting {{.Hostname}}

{{.Name}}

diff --git a/assets/status.html b/assets/status.html index ea66458..649a261 100644 --- a/assets/status.html +++ b/assets/status.html @@ -55,6 +55,29 @@ {{end}} +

Provider Containers

+

These are all containers that act as dependencies for other containers

+ + + + + + + + {{range $val := .Providers}} + + + + + + + {{end}} +
NameStateStatusConfig
{{$val.NameID}}{{$val.State}}{{$val.Status}} + {{range $label, $lval := $val.ConfigLabels}} + {{$label}}={{$lval}} + {{end}} +
+

Runtime

{{.RuntimeMetrics}}

diff --git a/main.go b/main.go index 9805808..1f71a74 100644 --- a/main.go +++ b/main.go @@ -97,7 +97,7 @@ func ContainerHandler(w http.ResponseWriter, r *http.Request) { } else { w.WriteHeader(http.StatusAccepted) renderErr := splashTemplate.Execute(w, SplashModel{ - Name: host, + Hostname: host, ContainerState: sOpts, }) if renderErr != nil { @@ -114,6 +114,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { statusPageTemplate.Execute(w, StatusPageModel{ Active: core.ActiveContainers(), Qualifying: core.QualifyingContainers(r.Context()), + Providers: core.ProviderContainers(r.Context()), RuntimeMetrics: fmt.Sprintf("Heap=%d, InUse=%d, Total=%d, Sys=%d, NumGC=%d", stats.HeapAlloc, stats.HeapInuse, stats.TotalAlloc, stats.Sys, stats.NumGC), }) default: diff --git a/pkg/service/container.go b/pkg/service/container.go index b77263b..990db37 100644 --- a/pkg/service/container.go +++ b/pkg/service/container.go @@ -1,6 +1,7 @@ package service import ( + "sort" "strings" "time" "traefik-lazyload/pkg/config" @@ -22,6 +23,7 @@ type ContainerState struct { lastRecv, lastSend int64 // Last network traffic, used to see if idle lastActivity time.Time started time.Time + pinned bool // Don't remove, even if not started } func newStateFromContainer(ct *types.Container) *ContainerState { @@ -102,3 +104,14 @@ func (s *ContainerWrapper) ConfigLabels() map[string]string { } return ret } + +func wrapContainers(cts ...types.Container) []ContainerWrapper { + ret := make([]ContainerWrapper, len(cts)) + for i, c := range cts { + ret[i] = ContainerWrapper{c} + } + sort.Slice(ret, func(i, j int) bool { + return ret[i].NameID() < ret[j].NameID() + }) + return ret +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 7d6f0b9..3b7bf47 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -72,15 +72,27 @@ func (s *Core) StartHost(hostname string) (*ContainerState, error) { return ets, nil } - go s.startContainer(ctx, ct) - // add to active pool + logrus.Infof("Starting container for %s...", hostname) ets := newStateFromContainer(ct) s.active[ct.ID] = ets + ets.pinned = true // pin while starting + + go func() { + defer func() { + s.mux.Lock() + ets.pinned = false + ets.lastActivity = time.Now() + s.mux.Unlock() + }() + s.startDependencyFor(ctx, ets.needs, containerShort(ct)) + s.startContainerSync(ctx, ct) + }() return ets, nil } +// Stop all running containers pined with the configured label func (s *Core) StopAll() { s.mux.Lock() defer s.mux.Unlock() @@ -95,17 +107,85 @@ func (s *Core) StopAll() { } } -func (s *Core) startContainer(ctx context.Context, ct *types.Container) { +func (s *Core) startContainerSync(ctx context.Context, ct *types.Container) error { + if isRunning(ct) { + return nil + } + s.mux.Lock() defer s.mux.Unlock() if err := s.client.ContainerStart(ctx, ct.ID, types.ContainerStartOptions{}); err != nil { logrus.Warnf("Error starting container %s: %s", containerShort(ct), err) + return err } else { - logrus.Infof("Starting container %s", containerShort(ct)) + logrus.Infof("Started container %s", containerShort(ct)) + } + return nil +} + +func (s *Core) startDependencyFor(ctx context.Context, needs []string, forContainer string) { + for _, dep := range needs { + providers, err := s.findContainersByDepProvider(ctx, dep) + + if err != nil { + logrus.Errorf("Error finding dependency provider for %s: %v", dep, err) + } else if len(providers) == 0 { + logrus.Warnf("Unable to find any container that provides %s for %s", dep, forContainer) + } else { + for _, provider := range providers { + if !isRunning(&provider) { + logrus.Infof("Starting dependency for %s: %s", forContainer, containerShort(&provider)) + + s.startContainerSync(ctx, &provider) + + delay, _ := labelOrDefaultDuration(&provider, "provides.delay", 2*time.Second) + logrus.Debugf("Delaying %s to start %s", delay.String(), dep) + time.Sleep(delay) + } + } + } } } +func (s *Core) stopDependenciesFor(ctx context.Context, cid string, cts *ContainerState) { + // Look at our needs, and see if anything else needs them; if not, shut down + + deps := make(map[string]bool) // dep -> needed + for _, dep := range cts.needs { + deps[dep] = false + } + + for activeId, active := range s.active { + if activeId != cid { // ignore self + for _, need := range active.needs { + deps[need] = true + } + } + } + + for dep, needed := range deps { + if !needed { + logrus.Infof("Stopping dependency %s...", dep) + containers, err := s.findContainersByDepProvider(ctx, dep) + if err != nil { + logrus.Errorf("Unable to find dependency provider containers for %s: %v", dep, err) + } else if len(containers) == 0 { + logrus.Warnf("Unable to find any containers for dependency %s", dep) + } else { + for _, ct := range containers { + if isRunning(&ct) { + logrus.Infof("Stopping %s...", containerShort(&ct)) + go s.client.ContainerStop(ctx, ct.ID, container.StopOptions{}) + } + } + } + } + } + +} + +// Ticker loop that will check internal state against docker state (Call Poll) func (s *Core) pollThread(rate time.Duration) { ticker := time.NewTicker(rate) defer ticker.Stop() @@ -130,11 +210,11 @@ func (s *Core) Poll() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - s.checkForNewContainers(ctx) - s.watchForInactivity(ctx) + s.checkForNewContainersSync(ctx) + s.watchForInactivitySync(ctx) } -func (s *Core) checkForNewContainers(ctx context.Context) { +func (s *Core) checkForNewContainersSync(ctx context.Context) { containers, err := s.findAllLazyloadContainers(ctx, false) if err != nil { logrus.Warnf("Error checking for new containers: %v", err) @@ -150,9 +230,10 @@ func (s *Core) checkForNewContainers(ctx context.Context) { // check for containers we think are running, but aren't (destroyed, error'd, stop'd via another process, etc) for cid, cts := range s.active { - if _, ok := runningContainers[cid]; !ok { + if _, ok := runningContainers[cid]; !ok && !cts.pinned { logrus.Infof("Discover container had stopped, removing %s", cts.name) delete(s.active, cid) + s.stopDependenciesFor(ctx, cid, cts) } } @@ -165,24 +246,34 @@ func (s *Core) checkForNewContainers(ctx context.Context) { } } -func (s *Core) watchForInactivity(ctx context.Context) { +func (s *Core) watchForInactivitySync(ctx context.Context) { for cid, cts := range s.active { shouldStop, err := s.checkContainerForInactivity(ctx, cid, cts) if err != nil { logrus.Warnf("error checking container state for %s: %s", cts.name, err) } if shouldStop { - if err := s.client.ContainerStop(ctx, cid, container.StopOptions{}); err != nil { - logrus.Errorf("Error stopping container %s: %s", cts.name, err) - } else { - logrus.Infof("Stopped container %s", cts.name) - delete(s.active, cid) - } + s.stopContainerAndDependencies(ctx, cid, cts) } } } +func (s *Core) stopContainerAndDependencies(ctx context.Context, cid string, cts *ContainerState) { + // First, stop the host container + if err := s.client.ContainerStop(ctx, cid, container.StopOptions{}); err != nil { + logrus.Errorf("Error stopping container %s: %s", cts.name, err) + } else { + logrus.Infof("Stopped container %s", cts.name) + delete(s.active, cid) + s.stopDependenciesFor(ctx, cid, cts) + } +} + func (s *Core) checkContainerForInactivity(ctx context.Context, cid string, ct *ContainerState) (shouldStop bool, retErr error) { + if ct.pinned { + return false, nil + } + statsStream, err := s.client.ContainerStatsOneShot(ctx, cid) if err != nil { return false, err @@ -218,7 +309,7 @@ func (s *Core) checkContainerForInactivity(ctx context.Context, cid string, ct * func (s *Core) findContainersByDepProvider(ctx context.Context, name string) ([]types.Container, error) { filters := filters.NewArgs() - filters.Add("label", config.SubLabel("providers")+"="+name) + filters.Add("label", config.SubLabel("provides")+"="+name) return s.client.ContainerList(ctx, types.ContainerListOptions{ Filters: filters, All: true, @@ -283,12 +374,20 @@ func (s *Core) QualifyingContainers(ctx context.Context) []ContainerWrapper { return nil } - ret := make([]ContainerWrapper, len(ct)) - for i, c := range ct { - ret[i] = ContainerWrapper{c} - } - sort.Slice(ret, func(i, j int) bool { - return ret[i].NameID() < ret[j].NameID() - }) - return ret + return wrapContainers(ct...) +} + +func (s *Core) ProviderContainers(ctx context.Context) []ContainerWrapper { + filters := filters.NewArgs() + filters.Add("label", config.SubLabel("provides")) + + ct, err := s.client.ContainerList(ctx, types.ContainerListOptions{ + Filters: filters, + All: true, + }) + if err != nil { + return nil + } + + return wrapContainers(ct...) }