diff --git a/README.md b/README.md index 2da13f0..29e62b0 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ starting/stopping containers to save resources. * `lazyloader.stopdelay=5m` -- Amount of time to wait for idle network traffick before stopping a container * `lazyloader.waitforcode=200` -- Waits for this HTTP result from downstream before redirecting user * `lazyloader.waitforpath=/` -- Checks this path downstream to check for the process being ready, using the `waitforcode` +* `lazyloader.hosts=a.com,b.net,etc` -- Set specific hostnames that will trigger. By default, will look for traefik router # Features diff --git a/assets.go b/assets.go new file mode 100644 index 0000000..24c2fd9 --- /dev/null +++ b/assets.go @@ -0,0 +1,21 @@ +package main + +import ( + "embed" + "path" + "text/template" + "traefik-lazyload/pkg/config" +) + +//go:embed assets/* +var httpAssets embed.FS + +const httpAssetPrefix = "/__llassets/" + +type SplashModel struct { + Name string + WaitForCode int + WaitForPath string +} + +var splashTemplate = template.Must(template.ParseFS(httpAssets, path.Join("assets", config.Model.Splash))) diff --git a/config.yaml b/config.yaml index 89768f3..b937609 100644 --- a/config.yaml +++ b/config.yaml @@ -4,6 +4,9 @@ stopatboot: false splash: splash.html stopdelay: 5m +pollfreq: 10s +statushost: "" +verbose: false labels: prefix: lazyloader \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 9c8a074..858c72a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,9 +23,10 @@ services: labels: - traefik.enable=true - "traefik.http.routers.lazyload.priority=-100" - - "traefik.http.routers.lazyload.rule=Host(`whoami.d.lan`, `whoami2.d.lan`, `pdf.d.lan`)" + - "traefik.http.routers.lazyload.rule=Host(`whoami.d.lan`, `whoami2.d.lan`, `pdf.d.lan`, `noexist.d.lan`, `lazyloader.d.lan`)" environment: TLL_STOPATBOOT: true + TLL_STATUSHOST: lazyloader.d.lan networks: - traefik-bridge volumes: diff --git a/main.go b/main.go index ead7211..fb5f868 100644 --- a/main.go +++ b/main.go @@ -1,165 +1,53 @@ package main import ( - "context" - "embed" - "encoding/json" "errors" - "html/template" "io" "io/fs" "net/http" - "path" - "strconv" - "strings" - "time" "traefik-lazyload/pkg/config" + "traefik-lazyload/pkg/service" - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/sirupsen/logrus" ) -//go:embed assets/* -var httpAssets embed.FS +var core *service.Core -const httpAssetPrefix = "/__llassets/" - -type SplashModel struct { - Name string - WaitForCode int - WaitForPath string -} - -var splashTemplate = template.Must(template.ParseFS(httpAssets, path.Join("assets", config.Model.Splash))) - -var dockerClient *client.Client - -type containerState struct { - Name, ID string - IsRunning bool - LastWork time.Time - StopDelay time.Duration - WaitForCode int - WaitForPath string - - lastRecv, lastSend int64 // Last network traffic, used to see if idle -} - -// containerID -> State -var managedContainers = make(map[string]*containerState) - -func main() { - - // Connect to docker +func mustCreateDockerClient() *client.Client { cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { - panic(err) + logrus.Fatal("Unable to connect to docker: ", err) } - defer cli.Close() - dockerClient = cli + return cli +} - // Test - if info, err := cli.Info(context.Background()); err != nil { +func main() { + if config.Model.Verbose { + logrus.SetLevel(logrus.DebugLevel) + logrus.Debug("Verbose is on") + } + + var err error + core, err = service.New(mustCreateDockerClient(), config.Model.PollFreq) + if err != nil { logrus.Fatal(err) - } else { - logrus.Infof("Connected docker to %s", info.Name) } + defer core.Close() - if splash, err := httpAssets.ReadFile(path.Join("assets", config.Model.Splash)); err != nil || len(splash) == 0 { - logrus.Fatal("Unable to open splash file %s", config.Model.Splash) - } - - // Initial state - if config.Model.StopAtBoot { - stopAllLazyContainers() - } else { - //TODO: Inventory currently running containers - } - - go watchForInactive() - + // Set up http server subFs, _ := fs.Sub(httpAssets, "assets") http.Handle(httpAssetPrefix, http.StripPrefix(httpAssetPrefix, http.FileServer(http.FS(subFs)))) http.HandleFunc("/", ContainerHandler) logrus.Infof("Listening on %s...", config.Model.Listen) + if config.Model.StatusHost != "" { + logrus.Infof("Status host set to %s", config.Model.StatusHost) + } http.ListenAndServe(config.Model.Listen, nil) } -func stopAllLazyContainers() error { - filter := filters.NewArgs() - filter.Add("label", "lazyloader") - - containers, err := dockerClient.ContainerList(context.Background(), types.ContainerListOptions{Filters: filter, All: true}) - if err != nil { - return err - } - - ctx, _ := context.WithTimeout(context.Background(), 1*time.Minute) - - for _, c := range containers { - logrus.Infof("Stopping %s: %s", c.ID[:8], c.Names[0]) - dockerClient.ContainerStop(ctx, c.ID, container.StopOptions{}) - } - - return nil -} - -func watchForInactive() { - // TODO: Thread safety - for { - for cid, ct := range managedContainers { - if !ct.IsRunning { - continue - } - - statsStream, err := dockerClient.ContainerStatsOneShot(context.Background(), cid) - if err != nil { - logrus.Warn(err) - continue - } - - var stats types.StatsJSON - if err := json.NewDecoder(statsStream.Body).Decode(&stats); err != nil { - logrus.Warn(err) - continue - } - - if stats.PidsStats.Current == 0 { - // Probably stopped - *ct = containerState{} // Reset - continue - } - - // Check for network activity - rx, tx := sumNetworkBytes(stats.Networks) - if rx > ct.lastRecv || tx > ct.lastSend { - ct.lastRecv = rx - ct.lastSend = tx - ct.LastWork = time.Now() - continue - } - - // No network activity for a while, stop? - if time.Now().After(ct.LastWork.Add(ct.StopDelay)) { - logrus.Infof("Stopping idle container %s...", ct.Name) - err := dockerClient.ContainerStop(context.Background(), cid, container.StopOptions{}) - if err != nil { - logrus.Warnf("Error stopping container: %s", err) - } else { - delete(managedContainers, cid) - } - } - } - - time.Sleep(5 * time.Second) // TODO Increase/use-config - } -} - func ContainerHandler(w http.ResponseWriter, r *http.Request) { host := r.Host if host == "" { @@ -167,98 +55,38 @@ func ContainerHandler(w http.ResponseWriter, r *http.Request) { io.WriteString(w, "Not Found") return } + if host == config.Model.StatusHost && config.Model.StatusHost != "" { + StatusHandler(w, r) + return + } - ct, _ := findContainerByHostname(r.Context(), host) - if ct != nil { - // Look to start the container - state := getOrCreateState(ct.ID) - logrus.Infof("Found container %s for host %s, checking state...", containerShort(ct), host) - - if !state.IsRunning { // cache doesn't think it's running - if ct.State != "running" { - logrus.Infof("Container %s not running (is %s), starting...", state.Name, ct.State) - go dockerClient.ContainerStart(context.Background(), ct.ID, types.ContainerStartOptions{}) // TODO: Check error - } - - state.IsRunning = true - state.Name = containerShort(ct) - state.ID = ct.ID - state.LastWork = time.Now() - parseContainerSettings(state, ct) - } // TODO: What if container crahsed but we think it's started? - + if sOpts, err := core.StartHost(host); err != nil { + if errors.Is(err, service.ErrNotFound) { + w.WriteHeader(http.StatusNotFound) + io.WriteString(w, "not found") + } else { + w.WriteHeader(http.StatusInternalServerError) + io.WriteString(w, err.Error()) + } + } else { w.WriteHeader(http.StatusAccepted) renderErr := splashTemplate.Execute(w, SplashModel{ Name: host, - WaitForCode: state.WaitForCode, - WaitForPath: state.WaitForPath, + WaitForCode: sOpts.WaitForCode, + WaitForPath: sOpts.WaitForPath, }) if renderErr != nil { logrus.Error(renderErr) } - } else { - logrus.Warnf("Unable to find container for host %s", host) + } +} + +func StatusHandler(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + io.WriteString(w, "Status page") + default: w.WriteHeader(http.StatusNotFound) - io.WriteString(w, "Not Found") + io.WriteString(w, "Status page not found") } } - -func getOrCreateState(cid string) (ret *containerState) { - var ok bool - if ret, ok = managedContainers[cid]; !ok { - ret = &containerState{} - managedContainers[cid] = ret - } - return -} - -func parseContainerSettings(target *containerState, ct *types.Container) { - { // Parse stop delay - stopDelay, _ := labelOrDefault(ct, "stopdelay", config.Model.StopDelay.String()) - if dur, stopErr := time.ParseDuration(stopDelay); stopErr != nil { - target.StopDelay = config.Model.StopDelay - logrus.Warnf("Unable to parse stopdelay of %s, defaulting to %s", stopDelay, target.StopDelay.String()) - } else { - target.StopDelay = dur - } - } - { // WaitForCode - codeStr, _ := labelOrDefault(ct, "waitforcode", "200") - if code, err := strconv.Atoi(codeStr); err != nil { - target.WaitForCode = 200 - logrus.Warnf("Unable to parse WaitForCode of %s, defaulting to %d", target.Name, target.WaitForCode) - } else { - target.WaitForCode = code - } - } - - target.WaitForPath, _ = labelOrDefault(ct, "waitforpath", "/") -} - -func findContainerByHostname(ctx context.Context, hostname string) (*types.Container, error) { - containers, err := findAllLazyloadContainers(ctx, true) - if err != nil { - return nil, err - } - - for _, c := range containers { - for k, v := range c.Labels { - if strings.Contains(k, "traefik.http.routers.") && strings.Contains(v, hostname) { // TODO: More complex, and self-ignore - return &c, nil - } - } - } - - return nil, errors.New("not found") -} - -// Finds all containers on node that are labeled with lazyloader config -func findAllLazyloadContainers(ctx context.Context, includeStopped bool) ([]types.Container, error) { - filters := filters.NewArgs() - filters.Add("label", config.Model.Labels.Prefix) - - return dockerClient.ContainerList(ctx, types.ContainerListOptions{ - All: includeStopped, - Filters: filters, - }) -} diff --git a/pkg/config/config.go b/pkg/config/config.go index 229dde3..e1f13ce 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,8 +15,12 @@ type ConfigModel struct { Listen string // http listen StopAtBoot bool // Stop existing containers at start of app Splash string // Which splash page to serve + StatusHost string // Host that will serve the status page (empty is disabled) StopDelay time.Duration // Amount of time to wait before stopping a container + PollFreq time.Duration // How often to check for changes + + Verbose bool // Debug-level logging Labels struct { Prefix string `mapstructure:"prefix"` diff --git a/pkg/service/errors.go b/pkg/service/errors.go new file mode 100644 index 0000000..dad5aa2 --- /dev/null +++ b/pkg/service/errors.go @@ -0,0 +1,7 @@ +package service + +import "errors" + +var ( + ErrNotFound = errors.New("not found") +) diff --git a/pkg/service/service.go b/pkg/service/service.go new file mode 100644 index 0000000..8ae831f --- /dev/null +++ b/pkg/service/service.go @@ -0,0 +1,295 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "strconv" + "strings" + "sync" + "time" + "traefik-lazyload/pkg/config" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" + "github.com/sirupsen/logrus" +) + +type containerSettings struct { + stopDelay time.Duration + waitForCode int + waitForPath string +} + +type containerState struct { + Name string + containerSettings + lastRecv, lastSend int64 // Last network traffic, used to see if idle + lastActivity time.Time +} + +type Core struct { + mux sync.Mutex + term chan bool + + client *client.Client + + active map[string]*containerState // cid -> state +} + +func New(client *client.Client, pollRate time.Duration) (*Core, error) { + // Test client and report + if info, err := client.Info(context.Background()); err != nil { + return nil, err + } else { + logrus.Infof("Connected docker to %s (v%s)", info.Name, info.ServerVersion) + } + + // Make core + ret := &Core{ + client: client, + active: make(map[string]*containerState), + term: make(chan bool), + } + + ret.Poll() // initial force-poll to update + go ret.pollThread(pollRate) + + return ret, nil +} + +func (s *Core) Close() error { + s.mux.Lock() + defer s.mux.Unlock() + + s.term <- true + return s.client.Close() +} + +type StartResult struct { + WaitForCode int + WaitForPath string +} + +func (s *Core) StartHost(hostname string) (*StartResult, error) { + s.mux.Lock() + defer s.mux.Unlock() + + ctx := context.Background() + + ct, err := s.findContainerByHostname(ctx, hostname) + if err != nil { + logrus.Warnf("Unable to find container for host %s: %s", hostname, err) + return nil, err + } + + if ets, exists := s.active[ct.ID]; exists { + // TODO: Handle case we think it's active, but not? (eg. crash? slow boot?) + logrus.Debugf("Asked to start host, but we already think it's started: %s", ets.Name) + return &StartResult{ + WaitForCode: ets.waitForCode, + WaitForPath: ets.waitForPath, + }, nil + } + + go s.startContainer(ctx, ct) + + // add to active pool + ets := newStateFromContainer(ct) + s.active[ct.ID] = ets + + return &StartResult{ + WaitForCode: ets.waitForCode, + WaitForPath: ets.waitForPath, + }, nil +} + +func (s *Core) startContainer(ctx context.Context, ct *types.Container) { + s.mux.Lock() + defer s.mux.Unlock() + + if err := s.client.ContainerStart(ctx, ct.ID, types.ContainerStartOptions{}); err != nil { + logrus.Warnf("Error starting container %s: %s", containerShort(ct), err) + } else { + logrus.Infof("Starting container %s", containerShort(ct)) + } +} + +func (s *Core) pollThread(rate time.Duration) { + ticker := time.NewTicker(rate) + defer ticker.Stop() + + for { + select { + case <-s.term: + return + case <-ticker.C: + s.Poll() + } + } +} + +// Initiate a thread-safe state-update, adding containers to the system, or +// stopping idle containers +// Will normally happen in the background with the pollThread +func (s *Core) Poll() { + s.mux.Lock() + defer s.mux.Unlock() + + ctx, _ := context.WithTimeout(context.Background(), 30*time.Second) + + s.checkForNewContainers(ctx) + s.watchForInactivity(ctx) +} + +func (s *Core) checkForNewContainers(ctx context.Context) { + containers, err := s.findAllLazyloadContainers(ctx, false) + if err != nil { + logrus.Warnf("Error checking for new containers: %v", err) + return + } + + runningContainers := make(map[string]*types.Container) + for i, ct := range containers { + if isRunning(&ct) { + runningContainers[ct.ID] = &containers[i] + } + } + + // check for containers we think are running, but aren't (destroyed, error'd, stop'd via another process, etc) + for cid, cts := range s.active { + if _, ok := runningContainers[cid]; !ok { + logrus.Infof("Discover container had stopped, removing %s", cts.Name) + delete(s.active, cid) + } + } + + // now, look for containers that are running, but aren't in our active inventory + for _, ct := range runningContainers { + if _, ok := s.active[ct.ID]; !ok { + logrus.Infof("Discovered running container %s", containerShort(ct)) + s.active[ct.ID] = newStateFromContainer(ct) + } + } +} + +func (s *Core) watchForInactivity(ctx context.Context) { + for cid, cts := range s.active { + shouldStop, err := s.checkContainerForInactivity(ctx, cid, cts) + if err != nil { + logrus.Warnf("error checking container state for %s: %s", cts.Name, err) + } + if shouldStop { + if err := s.client.ContainerStop(ctx, cid, container.StopOptions{}); err != nil { + logrus.Errorf("Error stopping container %s: %s", cts.Name, err) + } else { + logrus.Infof("Stopped container %s", cts.Name) + delete(s.active, cid) + } + } + } +} + +func (s *Core) checkContainerForInactivity(ctx context.Context, cid string, ct *containerState) (shouldStop bool, retErr error) { + statsStream, err := s.client.ContainerStatsOneShot(ctx, cid) + if err != nil { + return false, err + } + + var stats types.StatsJSON + if err := json.NewDecoder(statsStream.Body).Decode(&stats); err != nil { + return false, err + } + + if stats.PidsStats.Current == 0 { + // Probably stopped. Will let next poll update container + return true, errors.New("container not running") + } + + // check for network activity + rx, tx := sumNetworkBytes(stats.Networks) + if rx > ct.lastRecv || tx > ct.lastSend { + ct.lastRecv = rx + ct.lastSend = tx + ct.lastActivity = time.Now() + return false, nil + } + + // No activity, stop? + if time.Now().After(ct.lastActivity.Add(ct.stopDelay)) { + logrus.Infof("Found idle container %s...", ct.Name) + return true, nil + } + + return false, nil +} + +func newStateFromContainer(ct *types.Container) *containerState { + return &containerState{ + Name: containerShort(ct), + containerSettings: extractContainerLabels(ct), + lastActivity: time.Now(), + } +} + +func extractContainerLabels(ct *types.Container) (target containerSettings) { + { // Parse stop delay + stopDelay, _ := labelOrDefault(ct, "stopdelay", config.Model.StopDelay.String()) + if dur, stopErr := time.ParseDuration(stopDelay); stopErr != nil { + target.stopDelay = config.Model.StopDelay + logrus.Warnf("Unable to parse stopdelay for %s of %s, defaulting to %s", containerShort(ct), stopDelay, target.stopDelay.String()) + } else { + target.stopDelay = dur + } + } + { // WaitForCode + codeStr, _ := labelOrDefault(ct, "waitforcode", "200") + if code, err := strconv.Atoi(codeStr); err != nil { + target.waitForCode = 200 + logrus.Warnf("Unable to parse WaitForCode of %s, defaulting to %d", containerShort(ct), target.waitForCode) + } else { + target.waitForCode = code + } + } + + target.waitForPath, _ = labelOrDefault(ct, "waitforpath", "/") + return +} + +func (s *Core) findContainerByHostname(ctx context.Context, hostname string) (*types.Container, error) { + containers, err := s.findAllLazyloadContainers(ctx, true) + if err != nil { + return nil, err + } + + for _, c := range containers { + if hostStr, ok := labelOrDefault(&c, "hosts", ""); ok { + hosts := strings.Split(hostStr, ",") + if strSliceContains(hosts, hostname) { + return &c, nil + } + } else { + // If not defined explicitely, infer from traefik route + for k, v := range c.Labels { + if strings.Contains(k, "traefik.http.routers.") && strings.Contains(v, hostname) { // TODO: More complex + return &c, nil + } + } + } + } + + return nil, ErrNotFound +} + +// Finds all containers on node that are labeled with lazyloader config +func (s *Core) findAllLazyloadContainers(ctx context.Context, includeStopped bool) ([]types.Container, error) { + filters := filters.NewArgs() + filters.Add("label", config.Model.Labels.Prefix) + + return s.client.ContainerList(ctx, types.ContainerListOptions{ + All: includeStopped, + Filters: filters, + }) +} diff --git a/util.go b/pkg/service/util.go similarity index 76% rename from util.go rename to pkg/service/util.go index 4fa5eb5..654362a 100644 --- a/util.go +++ b/pkg/service/util.go @@ -1,4 +1,4 @@ -package main +package service import ( "fmt" @@ -39,3 +39,16 @@ func containerShort(c *types.Container) string { } return fmt.Sprintf("%s(%s)", name, short(c.ID)) } + +func isRunning(c *types.Container) bool { + return c.State == "running" +} + +func strSliceContains(slice []string, s string) bool { + for _, item := range slice { + if item == s { + return true + } + } + return false +}