mirror of
https://github.com/zix99/traefik-lazyload.git
synced 2025-12-21 13:23:04 +01:00
321 lines
8.0 KiB
Go
321 lines
8.0 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
"traefik-lazyload/pkg/containers"
|
|
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/client"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type Core struct {
|
|
mux sync.Mutex
|
|
term chan bool
|
|
|
|
client *client.Client
|
|
discovery *containers.Discovery
|
|
|
|
active map[string]*ContainerState // cid -> state
|
|
}
|
|
|
|
func New(client *client.Client, discovery *containers.Discovery, pollRate time.Duration) (*Core, error) {
|
|
// Test client and report
|
|
if info, err := client.Info(context.Background()); err != nil {
|
|
return nil, err
|
|
} else {
|
|
logrus.Infof("Connected docker to %s (v%s)", info.Name, info.ServerVersion)
|
|
}
|
|
|
|
// Make core
|
|
ret := &Core{
|
|
client: client,
|
|
discovery: discovery,
|
|
active: make(map[string]*ContainerState),
|
|
term: make(chan bool),
|
|
}
|
|
|
|
ret.Poll() // initial force-poll to update
|
|
go ret.pollThread(pollRate)
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (s *Core) Close() error {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
s.term <- true
|
|
return s.client.Close()
|
|
}
|
|
|
|
func (s *Core) StartHost(hostname string) (*ContainerState, error) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
ctx := context.Background()
|
|
|
|
ct, err := s.discovery.FindContainerByHostname(ctx, hostname)
|
|
if err != nil {
|
|
logrus.Warnf("Unable to find container for host %s: %s", hostname, err)
|
|
return nil, err
|
|
}
|
|
|
|
if ets, exists := s.active[ct.ID]; exists {
|
|
logrus.Debugf("Asked to start host, but we already think it's started: %s", ets.name)
|
|
return ets, nil
|
|
}
|
|
|
|
// add to active pool
|
|
logrus.Infof("Starting container for %s...", hostname)
|
|
ets := newStateFromContainer(ct)
|
|
s.active[ct.ID] = ets
|
|
ets.pinned = true // pin while starting
|
|
|
|
go func() {
|
|
defer func() {
|
|
s.mux.Lock()
|
|
ets.pinned = false
|
|
ets.lastActivity = time.Now()
|
|
s.mux.Unlock()
|
|
}()
|
|
s.startDependencyFor(ctx, ets.needs, ct.NameID())
|
|
s.startContainerSync(ctx, ct)
|
|
}()
|
|
|
|
return ets, nil
|
|
}
|
|
|
|
// Stop all running containers pined with the configured label
|
|
func (s *Core) StopAll() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
ctx := context.Background()
|
|
|
|
logrus.Info("Stopping all containers...")
|
|
for cid, ct := range s.active {
|
|
logrus.Infof("Stopping %s...", ct.name)
|
|
s.client.ContainerStop(ctx, cid, container.StopOptions{})
|
|
delete(s.active, cid)
|
|
}
|
|
}
|
|
|
|
// Returns all actively managed containers
|
|
func (s *Core) ActiveContainers() []*ContainerState {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
ret := make([]*ContainerState, 0, len(s.active))
|
|
for _, item := range s.active {
|
|
ret = append(ret, item)
|
|
}
|
|
sort.Slice(ret, func(i, j int) bool {
|
|
return ret[i].name < ret[j].name
|
|
})
|
|
return ret
|
|
}
|
|
|
|
func (s *Core) startContainerSync(ctx context.Context, ct *containers.Wrapper) error {
|
|
if ct.IsRunning() {
|
|
return nil
|
|
}
|
|
|
|
if err := s.client.ContainerStart(ctx, ct.ID, types.ContainerStartOptions{}); err != nil {
|
|
logrus.Warnf("Error starting container %s: %s", ct.NameID(), err)
|
|
return err
|
|
} else {
|
|
logrus.Infof("Started container %s", ct.NameID())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Core) startDependencyFor(ctx context.Context, needs []string, forContainer string) {
|
|
for _, dep := range needs {
|
|
providers, err := s.discovery.FindDepProvider(ctx, dep)
|
|
|
|
if err != nil {
|
|
logrus.Errorf("Error finding dependency provider for %s: %v", dep, err)
|
|
} else if len(providers) == 0 {
|
|
logrus.Warnf("Unable to find any container that provides %s for %s", dep, forContainer)
|
|
} else {
|
|
for _, provider := range providers {
|
|
if !provider.IsRunning() {
|
|
logrus.Infof("Starting dependency for %s: %s", forContainer, provider.NameID())
|
|
|
|
s.startContainerSync(ctx, &provider)
|
|
|
|
delay, _ := provider.ConfigDuration("provides.delay", 2*time.Second)
|
|
logrus.Debugf("Delaying %s to start %s", delay.String(), dep)
|
|
time.Sleep(delay)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Core) stopDependenciesFor(ctx context.Context, cid string, cts *ContainerState) {
|
|
// Look at our needs, and see if anything else needs them; if not, shut down
|
|
|
|
deps := make(map[string]bool) // dep -> needed
|
|
for _, dep := range cts.needs {
|
|
deps[dep] = false
|
|
}
|
|
|
|
for activeId, active := range s.active {
|
|
if activeId != cid { // ignore self
|
|
for _, need := range active.needs {
|
|
deps[need] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
for dep, needed := range deps {
|
|
if !needed {
|
|
logrus.Infof("Stopping dependency %s...", dep)
|
|
containers, err := s.discovery.FindDepProvider(ctx, dep)
|
|
if err != nil {
|
|
logrus.Errorf("Unable to find dependency provider containers for %s: %v", dep, err)
|
|
} else if len(containers) == 0 {
|
|
logrus.Warnf("Unable to find any containers for dependency %s", dep)
|
|
} else {
|
|
for _, ct := range containers {
|
|
if ct.IsRunning() {
|
|
logrus.Infof("Stopping %s...", ct.NameID())
|
|
go s.client.ContainerStop(ctx, ct.ID, container.StopOptions{})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// Ticker loop that will check internal state against docker state (Call Poll)
|
|
func (s *Core) pollThread(rate time.Duration) {
|
|
ticker := time.NewTicker(rate)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-s.term:
|
|
return
|
|
case <-ticker.C:
|
|
s.Poll()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initiate a thread-safe state-update, adding containers to the system, or
|
|
// stopping idle containers
|
|
// Will normally happen in the background with the pollThread
|
|
func (s *Core) Poll() {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
s.checkForNewContainersSync(ctx)
|
|
s.watchForInactivitySync(ctx)
|
|
}
|
|
|
|
func (s *Core) checkForNewContainersSync(ctx context.Context) {
|
|
cts, err := s.discovery.FindAllLazyload(ctx, false)
|
|
if err != nil {
|
|
logrus.Warnf("Error checking for new containers: %v", err)
|
|
return
|
|
}
|
|
|
|
runningContainers := make(map[string]*containers.Wrapper)
|
|
for i, ct := range cts {
|
|
if ct.IsRunning() {
|
|
runningContainers[ct.ID] = &cts[i]
|
|
}
|
|
}
|
|
|
|
// check for containers we think are running, but aren't (destroyed, error'd, stop'd via another process, etc)
|
|
for cid, cts := range s.active {
|
|
if _, ok := runningContainers[cid]; !ok && !cts.pinned {
|
|
logrus.Infof("Discover container had stopped, removing %s", cts.name)
|
|
delete(s.active, cid)
|
|
s.stopDependenciesFor(ctx, cid, cts)
|
|
}
|
|
}
|
|
|
|
// now, look for containers that are running, but aren't in our active inventory
|
|
for _, ct := range runningContainers {
|
|
if _, ok := s.active[ct.ID]; !ok {
|
|
logrus.Infof("Discovered running container %s", ct.NameID())
|
|
s.active[ct.ID] = newStateFromContainer(ct)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Core) watchForInactivitySync(ctx context.Context) {
|
|
for cid, cts := range s.active {
|
|
shouldStop, err := s.checkContainerForInactivity(ctx, cid, cts)
|
|
if err != nil {
|
|
logrus.Warnf("error checking container state for %s: %s", cts.name, err)
|
|
}
|
|
if shouldStop {
|
|
s.stopContainerAndDependencies(ctx, cid, cts)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Core) stopContainerAndDependencies(ctx context.Context, cid string, cts *ContainerState) {
|
|
// First, stop the host container
|
|
if err := s.client.ContainerStop(ctx, cid, container.StopOptions{}); err != nil {
|
|
logrus.Errorf("Error stopping container %s: %s", cts.name, err)
|
|
} else {
|
|
logrus.Infof("Stopped container %s", cts.name)
|
|
delete(s.active, cid)
|
|
s.stopDependenciesFor(ctx, cid, cts)
|
|
}
|
|
}
|
|
|
|
func (s *Core) checkContainerForInactivity(ctx context.Context, cid string, ct *ContainerState) (shouldStop bool, retErr error) {
|
|
if ct.pinned {
|
|
return false, nil
|
|
}
|
|
|
|
statsStream, err := s.client.ContainerStatsOneShot(ctx, cid)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
var stats types.StatsJSON
|
|
if err := json.NewDecoder(statsStream.Body).Decode(&stats); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if stats.PidsStats.Current == 0 {
|
|
// Probably stopped. Will let next poll update container
|
|
return true, errors.New("container not running")
|
|
}
|
|
|
|
// check for network activity
|
|
rx, tx := sumNetworkBytes(stats.Networks)
|
|
if rx > ct.lastRecv || tx > ct.lastSend {
|
|
ct.lastRecv = rx
|
|
ct.lastSend = tx
|
|
ct.lastActivity = time.Now()
|
|
return false, nil
|
|
}
|
|
|
|
// No activity, stop?
|
|
if time.Now().After(ct.lastActivity.Add(ct.stopDelay)) {
|
|
logrus.Infof("Found idle container %s...", ct.name)
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|