Simple dependency support

This commit is contained in:
Christopher LaPointe
2023-05-29 21:29:55 -04:00
parent ebb45b55c3
commit f7c5daf448
6 changed files with 164 additions and 27 deletions

View File

@@ -15,7 +15,7 @@ const httpAssetPrefix = "/__llassets/"
type SplashModel struct {
*service.ContainerState
Name string
Hostname string
}
var splashTemplate = template.Must(template.ParseFS(httpAssets, path.Join("assets", config.Model.Splash)))
@@ -23,6 +23,7 @@ var splashTemplate = template.Must(template.ParseFS(httpAssets, path.Join("asset
type StatusPageModel struct {
Active []*service.ContainerState
Qualifying []service.ContainerWrapper
Providers []service.ContainerWrapper
RuntimeMetrics string
}

View File

@@ -22,7 +22,7 @@
<div class="square last"></div>
</div>
<div class="message">
<h2>Starting {{.Name}}</h2>
<h2>Starting {{.Hostname}}</h2>
<h3>{{.Name}}</h3>
</div>
</div>

View File

@@ -55,6 +55,29 @@
{{end}}
</table>
<h2>Provider Containers</h2>
<p>These are all containers that act as dependencies for other containers</p>
<table>
<tr>
<th>Name</th>
<th>State</th>
<th>Status</th>
<th>Config</th>
</tr>
{{range $val := .Providers}}
<tr>
<td>{{$val.NameID}}</td>
<td>{{$val.State}}</td>
<td><em>{{$val.Status}}</em></td>
<td>
{{range $label, $lval := $val.ConfigLabels}}
<span><strong>{{$label}}</strong>={{$lval}}</span>
{{end}}
</td>
</tr>
{{end}}
</table>
<h2>Runtime</h2>
<p>{{.RuntimeMetrics}}</p>
</body>

View File

@@ -97,7 +97,7 @@ func ContainerHandler(w http.ResponseWriter, r *http.Request) {
} else {
w.WriteHeader(http.StatusAccepted)
renderErr := splashTemplate.Execute(w, SplashModel{
Name: host,
Hostname: host,
ContainerState: sOpts,
})
if renderErr != nil {
@@ -114,6 +114,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
statusPageTemplate.Execute(w, StatusPageModel{
Active: core.ActiveContainers(),
Qualifying: core.QualifyingContainers(r.Context()),
Providers: core.ProviderContainers(r.Context()),
RuntimeMetrics: fmt.Sprintf("Heap=%d, InUse=%d, Total=%d, Sys=%d, NumGC=%d", stats.HeapAlloc, stats.HeapInuse, stats.TotalAlloc, stats.Sys, stats.NumGC),
})
default:

View File

@@ -1,6 +1,7 @@
package service
import (
"sort"
"strings"
"time"
"traefik-lazyload/pkg/config"
@@ -22,6 +23,7 @@ type ContainerState struct {
lastRecv, lastSend int64 // Last network traffic, used to see if idle
lastActivity time.Time
started time.Time
pinned bool // Don't remove, even if not started
}
func newStateFromContainer(ct *types.Container) *ContainerState {
@@ -102,3 +104,14 @@ func (s *ContainerWrapper) ConfigLabels() map[string]string {
}
return ret
}
func wrapContainers(cts ...types.Container) []ContainerWrapper {
ret := make([]ContainerWrapper, len(cts))
for i, c := range cts {
ret[i] = ContainerWrapper{c}
}
sort.Slice(ret, func(i, j int) bool {
return ret[i].NameID() < ret[j].NameID()
})
return ret
}

View File

@@ -72,15 +72,27 @@ func (s *Core) StartHost(hostname string) (*ContainerState, error) {
return ets, nil
}
go s.startContainer(ctx, ct)
// add to active pool
logrus.Infof("Starting container for %s...", hostname)
ets := newStateFromContainer(ct)
s.active[ct.ID] = ets
ets.pinned = true // pin while starting
go func() {
defer func() {
s.mux.Lock()
ets.pinned = false
ets.lastActivity = time.Now()
s.mux.Unlock()
}()
s.startDependencyFor(ctx, ets.needs, containerShort(ct))
s.startContainerSync(ctx, ct)
}()
return ets, nil
}
// Stop all running containers pined with the configured label
func (s *Core) StopAll() {
s.mux.Lock()
defer s.mux.Unlock()
@@ -95,17 +107,85 @@ func (s *Core) StopAll() {
}
}
func (s *Core) startContainer(ctx context.Context, ct *types.Container) {
func (s *Core) startContainerSync(ctx context.Context, ct *types.Container) error {
if isRunning(ct) {
return nil
}
s.mux.Lock()
defer s.mux.Unlock()
if err := s.client.ContainerStart(ctx, ct.ID, types.ContainerStartOptions{}); err != nil {
logrus.Warnf("Error starting container %s: %s", containerShort(ct), err)
return err
} else {
logrus.Infof("Starting container %s", containerShort(ct))
logrus.Infof("Started container %s", containerShort(ct))
}
return nil
}
func (s *Core) startDependencyFor(ctx context.Context, needs []string, forContainer string) {
for _, dep := range needs {
providers, err := s.findContainersByDepProvider(ctx, dep)
if err != nil {
logrus.Errorf("Error finding dependency provider for %s: %v", dep, err)
} else if len(providers) == 0 {
logrus.Warnf("Unable to find any container that provides %s for %s", dep, forContainer)
} else {
for _, provider := range providers {
if !isRunning(&provider) {
logrus.Infof("Starting dependency for %s: %s", forContainer, containerShort(&provider))
s.startContainerSync(ctx, &provider)
delay, _ := labelOrDefaultDuration(&provider, "provides.delay", 2*time.Second)
logrus.Debugf("Delaying %s to start %s", delay.String(), dep)
time.Sleep(delay)
}
}
}
}
}
func (s *Core) stopDependenciesFor(ctx context.Context, cid string, cts *ContainerState) {
// Look at our needs, and see if anything else needs them; if not, shut down
deps := make(map[string]bool) // dep -> needed
for _, dep := range cts.needs {
deps[dep] = false
}
for activeId, active := range s.active {
if activeId != cid { // ignore self
for _, need := range active.needs {
deps[need] = true
}
}
}
for dep, needed := range deps {
if !needed {
logrus.Infof("Stopping dependency %s...", dep)
containers, err := s.findContainersByDepProvider(ctx, dep)
if err != nil {
logrus.Errorf("Unable to find dependency provider containers for %s: %v", dep, err)
} else if len(containers) == 0 {
logrus.Warnf("Unable to find any containers for dependency %s", dep)
} else {
for _, ct := range containers {
if isRunning(&ct) {
logrus.Infof("Stopping %s...", containerShort(&ct))
go s.client.ContainerStop(ctx, ct.ID, container.StopOptions{})
}
}
}
}
}
}
// Ticker loop that will check internal state against docker state (Call Poll)
func (s *Core) pollThread(rate time.Duration) {
ticker := time.NewTicker(rate)
defer ticker.Stop()
@@ -130,11 +210,11 @@ func (s *Core) Poll() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.checkForNewContainers(ctx)
s.watchForInactivity(ctx)
s.checkForNewContainersSync(ctx)
s.watchForInactivitySync(ctx)
}
func (s *Core) checkForNewContainers(ctx context.Context) {
func (s *Core) checkForNewContainersSync(ctx context.Context) {
containers, err := s.findAllLazyloadContainers(ctx, false)
if err != nil {
logrus.Warnf("Error checking for new containers: %v", err)
@@ -150,9 +230,10 @@ func (s *Core) checkForNewContainers(ctx context.Context) {
// check for containers we think are running, but aren't (destroyed, error'd, stop'd via another process, etc)
for cid, cts := range s.active {
if _, ok := runningContainers[cid]; !ok {
if _, ok := runningContainers[cid]; !ok && !cts.pinned {
logrus.Infof("Discover container had stopped, removing %s", cts.name)
delete(s.active, cid)
s.stopDependenciesFor(ctx, cid, cts)
}
}
@@ -165,24 +246,34 @@ func (s *Core) checkForNewContainers(ctx context.Context) {
}
}
func (s *Core) watchForInactivity(ctx context.Context) {
func (s *Core) watchForInactivitySync(ctx context.Context) {
for cid, cts := range s.active {
shouldStop, err := s.checkContainerForInactivity(ctx, cid, cts)
if err != nil {
logrus.Warnf("error checking container state for %s: %s", cts.name, err)
}
if shouldStop {
if err := s.client.ContainerStop(ctx, cid, container.StopOptions{}); err != nil {
logrus.Errorf("Error stopping container %s: %s", cts.name, err)
} else {
logrus.Infof("Stopped container %s", cts.name)
delete(s.active, cid)
}
s.stopContainerAndDependencies(ctx, cid, cts)
}
}
}
func (s *Core) stopContainerAndDependencies(ctx context.Context, cid string, cts *ContainerState) {
// First, stop the host container
if err := s.client.ContainerStop(ctx, cid, container.StopOptions{}); err != nil {
logrus.Errorf("Error stopping container %s: %s", cts.name, err)
} else {
logrus.Infof("Stopped container %s", cts.name)
delete(s.active, cid)
s.stopDependenciesFor(ctx, cid, cts)
}
}
func (s *Core) checkContainerForInactivity(ctx context.Context, cid string, ct *ContainerState) (shouldStop bool, retErr error) {
if ct.pinned {
return false, nil
}
statsStream, err := s.client.ContainerStatsOneShot(ctx, cid)
if err != nil {
return false, err
@@ -218,7 +309,7 @@ func (s *Core) checkContainerForInactivity(ctx context.Context, cid string, ct *
func (s *Core) findContainersByDepProvider(ctx context.Context, name string) ([]types.Container, error) {
filters := filters.NewArgs()
filters.Add("label", config.SubLabel("providers")+"="+name)
filters.Add("label", config.SubLabel("provides")+"="+name)
return s.client.ContainerList(ctx, types.ContainerListOptions{
Filters: filters,
All: true,
@@ -283,12 +374,20 @@ func (s *Core) QualifyingContainers(ctx context.Context) []ContainerWrapper {
return nil
}
ret := make([]ContainerWrapper, len(ct))
for i, c := range ct {
ret[i] = ContainerWrapper{c}
}
sort.Slice(ret, func(i, j int) bool {
return ret[i].NameID() < ret[j].NameID()
})
return ret
return wrapContainers(ct...)
}
func (s *Core) ProviderContainers(ctx context.Context) []ContainerWrapper {
filters := filters.NewArgs()
filters.Add("label", config.SubLabel("provides"))
ct, err := s.client.ContainerList(ctx, types.ContainerListOptions{
Filters: filters,
All: true,
})
if err != nil {
return nil
}
return wrapContainers(ct...)
}