feat: add docker classic support (#9)

Defaults with docker swarm support for retro compatibiliy.
You can add --swarmMode=false to deactivate it.

Closes #4
This commit is contained in:
Alexis Couvreur
2021-09-26 00:36:45 +02:00
committed by GitHub
parent f78af94572
commit c96482ef67
9 changed files with 1103 additions and 250 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
vendor
traefik-ondemand-plugin

View File

@@ -1,6 +1,5 @@
FROM golang:1.15.3-alpine AS build
FROM golang:1.17-alpine AS build
ENV APP_NAME ondemand-service
ENV PORT 10000
COPY . /go/src/ondemand-service

30
go.mod
View File

@@ -1,14 +1,32 @@
module githuc.com/acouvreur/traefik-ondemand-plugin
module github.com/acouvreur/traefik-ondemand-plugin
go 1.15
go 1.17
require (
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/containerd/containerd v1.5.5 // indirect
github.com/docker/docker v20.10.8+incompatible
github.com/docker/go-connections v0.4.0 // indirect
github.com/gorilla/mux v1.7.3 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/net v0.0.0-20210924151903-3ad01bbaa167 // indirect
golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
gopkg.in/dc0d/tinykv.v4 v4.0.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
require (
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v1.13.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/cobra v1.1.1 // indirect
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
)

767
go.sum

File diff suppressed because it is too large Load Diff

285
main.go
View File

@@ -1,71 +1,112 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"net/url"
"strconv"
"time"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types"
"github.com/acouvreur/traefik-ondemand-plugin/pkg/scaler"
"github.com/docker/docker/client"
"github.com/docker/docker/opts"
"gopkg.in/dc0d/tinykv.v4"
)
const oneReplica = uint64(1)
const zeroReplica = uint64(0)
var defaultTimeout = time.Second * 5
// Status is the service status
type Status string
const (
// UP represents a service that is running (with at least a container running)
UP Status = "up"
// DOWN represents a service that is not running (with 0 container running)
DOWN Status = "down"
// STARTING represents a service that is starting (with at least a container starting)
STARTING Status = "starting"
// UNKNOWN represents a service for which the docker status is not know
UNKNOWN Status = "unknown"
)
// Service holds all information related to a service
type Service struct {
name string
timeout uint64
time chan uint64
isHandled bool
type OnDemandRequestState struct {
State string `json:"state"`
Name string `json:"name"`
}
var services = map[string]*Service{}
func main() {
fmt.Println("Server listening on port 10000.")
http.HandleFunc("/", handleRequests())
log.Fatal(http.ListenAndServe(":10000", nil))
}
func handleRequests() func(w http.ResponseWriter, r *http.Request) {
cli, err := client.NewEnvClient()
swarmMode := flag.Bool("swarmMode", true, "Enable swarm mode")
flag.Parse()
cli, err := client.NewClientWithOpts()
if err != nil {
log.Fatal(fmt.Errorf("%+v", "Could not connect to docker API"))
}
return func(w http.ResponseWriter, r *http.Request) {
serviceName, serviceTimeout, err := parseParams(r)
dockerScaler := getDockerScaler(*swarmMode)
fmt.Printf("Server listening on port 10000, swarmMode: %t\n", *swarmMode)
http.HandleFunc("/", onDemand(cli, dockerScaler))
log.Fatal(http.ListenAndServe(":10000", nil))
}
func getDockerScaler(swarmMode bool) scaler.Scaler {
if swarmMode {
return scaler.DockerSwarmScaler{}
}
return scaler.DockerClassicScaler{}
}
func onDemand(client *client.Client, scaler scaler.Scaler) func(w http.ResponseWriter, r *http.Request) {
store := tinykv.New(time.Second*20, func(key string, _ interface{}) {
// Auto scale down after timeout
scaler.ScaleDown(client, key)
})
return func(rw http.ResponseWriter, r *http.Request) {
name, err := getParam(r.URL.Query(), "name")
if err != nil {
fmt.Fprintf(w, "%+v", err)
ServeHTTPInternalError(rw, err)
return
}
service := GetOrCreateService(serviceName, serviceTimeout)
status, err := service.HandleServiceState(cli)
to, err := getParam(r.URL.Query(), "timeout")
if err != nil {
fmt.Printf("Error: %+v\n ", err)
fmt.Fprintf(w, "%+v", err)
ServeHTTPInternalError(rw, err)
return
}
timeout, err := time.ParseDuration(to)
if err != nil {
ServeHTTPInternalError(rw, err)
return
}
replicas := uint64(1)
requestState, exists := store.Get(name)
// 1. Check against the current state
if !exists || requestState.(OnDemandRequestState).State != "started" {
if scaler.IsUp(client, name) {
requestState = OnDemandRequestState{
State: "started",
Name: name,
}
} else {
requestState = OnDemandRequestState{
State: "starting",
Name: name,
}
scaler.ScaleUp(client, name, &replicas)
}
}
// 2. Store the updated state
store.Put(name, requestState, tinykv.ExpiresAfter(timeout))
// 3. Serve depending on the current state
switch requestState.(OnDemandRequestState).State {
case "starting":
ServeHTTPRequestState(rw, requestState.(OnDemandRequestState))
case "started":
ServeHTTPRequestState(rw, requestState.(OnDemandRequestState))
default:
ServeHTTPInternalError(rw, fmt.Errorf("unknown state %s", requestState.(OnDemandRequestState).State))
}
fmt.Fprintf(w, "%+s", status)
}
}
@@ -76,155 +117,17 @@ func getParam(queryParams url.Values, paramName string) (string, error) {
return queryParams[paramName][0], nil
}
func parseParams(r *http.Request) (string, uint64, error) {
queryParams := r.URL.Query()
serviceName, err := getParam(queryParams, "name")
if err != nil {
return "", 0, nil
}
timeoutString, err := getParam(queryParams, "timeout")
if err != nil {
return "", 0, nil
}
serviceTimeout, err := strconv.Atoi(timeoutString)
if err != nil {
return "", 0, fmt.Errorf("timeout should be an integer")
}
return serviceName, uint64(serviceTimeout), nil
func ServeHTTPInternalError(rw http.ResponseWriter, err error) {
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte(err.Error()))
}
// GetOrCreateService return an existing service or create one
func GetOrCreateService(name string, timeout uint64) *Service {
if services[name] != nil {
return services[name]
}
service := &Service{name, timeout, make(chan uint64), false}
services[name] = service
return service
}
// HandleServiceState up the service if down or set timeout for downing the service
func (service *Service) HandleServiceState(cli *client.Client) (string, error) {
status, err := service.getStatus(cli)
if err != nil {
return "", err
}
if status == UP {
fmt.Printf("- Service %v is up\n", service.name)
if !service.isHandled {
go service.stopAfterTimeout(cli)
}
select {
case service.time <- service.timeout:
default:
}
return "started", nil
} else if status == STARTING {
fmt.Printf("- Service %v is starting\n", service.name)
if err != nil {
return "", err
}
go service.stopAfterTimeout(cli)
return "starting", nil
} else if status == DOWN {
fmt.Printf("- Service %v is down\n", service.name)
service.start(cli)
return "starting", nil
func ServeHTTPRequestState(rw http.ResponseWriter, requestState OnDemandRequestState) {
rw.Header().Set("Content-Type", "application/json")
if requestState.State == "started" {
rw.WriteHeader(http.StatusCreated)
} else {
fmt.Printf("- Service %v status is unknown\n", service.name)
if err != nil {
return "", err
}
return service.HandleServiceState(cli)
rw.WriteHeader(http.StatusAccepted)
}
}
func (service *Service) getStatus(client *client.Client) (Status, error) {
ctx := context.Background()
dockerService, err := service.getDockerService(ctx, client)
if err != nil {
return "", err
}
if *dockerService.Spec.Mode.Replicated.Replicas == zeroReplica {
return DOWN, nil
}
return UP, nil
}
func (service *Service) start(client *client.Client) {
fmt.Printf("Starting service %s\n", service.name)
service.isHandled = true
service.setServiceReplicas(client, 1)
go service.stopAfterTimeout(client)
service.time <- service.timeout
}
func (service *Service) stopAfterTimeout(client *client.Client) {
service.isHandled = true
for {
select {
case timeout, ok := <-service.time:
if ok {
time.Sleep(time.Duration(timeout) * time.Second)
} else {
fmt.Println("That should not happen, but we never know ;)")
}
default:
fmt.Printf("Stopping service %s\n", service.name)
service.setServiceReplicas(client, 0)
return
}
}
}
func (service *Service) setServiceReplicas(client *client.Client, replicas uint64) error {
ctx := context.Background()
dockerService, err := service.getDockerService(ctx, client)
if err != nil {
return err
}
dockerService.Spec.Mode.Replicated = &swarm.ReplicatedService{
Replicas: getPointer(replicas),
}
client.ServiceUpdate(ctx, dockerService.ID, dockerService.Meta.Version, dockerService.Spec, types.ServiceUpdateOptions{})
return nil
}
func (service *Service) getDockerService(ctx context.Context, client *client.Client) (*swarm.Service, error) {
filterOPt := opts.NewFilterOpt()
listOpts := types.ServiceListOptions{
Filters: filterOPt.Value(),
}
services, err := client.ServiceList(ctx, listOpts)
if err != nil {
return nil, err
}
dockerService, err := findService(services, service.name)
if err != nil {
return nil, err
}
return dockerService, nil
}
func findService(services []swarm.Service, name string) (*swarm.Service, error) {
for _, service := range services {
if name == service.Spec.Name {
return &service, nil
}
}
return &swarm.Service{}, fmt.Errorf("Could not find service %s", name)
}
func getPointer(x uint64) *uint64 {
return &x
rw.Write([]byte(requestState.State))
}

73
main_test.go Normal file
View File

@@ -0,0 +1,73 @@
package main
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/docker/docker/client"
)
type ScalerMock struct {
isUp bool
}
func (s ScalerMock) IsUp(client *client.Client, name string) bool {
return s.isUp
}
func (ScalerMock) ScaleUp(client *client.Client, name string, replicas *uint64) {}
func (ScalerMock) ScaleDown(client *client.Client, name string) {}
func TestOndemand_ServeHTTP(t *testing.T) {
testCases := []struct {
desc string
scaler ScalerMock
status string
statusCode int
contentType string
}{
{
desc: "service is starting",
status: "starting",
scaler: ScalerMock{isUp: false},
statusCode: http.StatusAccepted,
contentType: "application/json",
},
{
desc: "service is started",
status: "started",
scaler: ScalerMock{isUp: true},
statusCode: http.StatusCreated,
contentType: "application/json",
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Logf("IsUp: %t", test.scaler.isUp)
request := httptest.NewRequest("GET", "/?name=whoami&timeout=5m", nil)
responseRecorder := httptest.NewRecorder()
onDemandHandler := onDemand(nil, test.scaler)
onDemandHandler(responseRecorder, request)
body := responseRecorder.Body.String()
if responseRecorder.Code != test.statusCode {
t.Errorf("Want status '%d', got '%d'", test.statusCode, responseRecorder.Code)
}
if responseRecorder.Body.String() != test.status {
t.Errorf("Want body '%s', got '%s'", test.status, body)
}
if responseRecorder.Header().Get("Content-Type") != test.contentType {
t.Errorf("Want content type '%s', got '%s'", test.contentType, responseRecorder.Header().Get("Content-Type"))
}
})
}
}

View File

@@ -0,0 +1,92 @@
package scaler
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"
)
type DockerClassicScaler struct{}
func (DockerClassicScaler) ScaleUp(client *client.Client, name string, replicas *uint64) {
log.Infof("Scaling up %s to %d", name, *replicas)
ctx := context.Background()
container, err := GetContainerByName(client, name, ctx)
if err != nil {
println(err)
return
}
err = client.ContainerStart(ctx, container.ID, types.ContainerStartOptions{})
if err != nil {
println(err)
return
}
}
func (DockerClassicScaler) ScaleDown(client *client.Client, name string) {
log.Infof("Scaling down %s to 0", name)
ctx := context.Background()
container, err := GetContainerByName(client, name, ctx)
if err != nil {
println(err)
return
}
err = client.ContainerStop(ctx, container.ID, nil)
if err != nil {
println(err)
return
}
}
func (DockerClassicScaler) IsUp(client *client.Client, name string) bool {
ctx := context.Background()
container, err := GetContainerByName(client, name, ctx)
if err != nil {
println(err)
return false
}
spec, err := client.ContainerInspect(ctx, container.ID)
if err != nil {
println(err)
return false
}
return spec.State.Running
}
func GetContainerByName(client *client.Client, name string, ctx context.Context) (*types.Container, error) {
opts := types.ContainerListOptions{
All: true,
Filters: filters.NewArgs(),
}
opts.Filters.Add("name", name)
containers, err := client.ContainerList(ctx, opts)
if err != nil {
return nil, err
}
if len(containers) == 0 {
return nil, fmt.Errorf(fmt.Sprintf("container with name %s was not found", name))
}
if len(containers) > 1 {
return nil, fmt.Errorf("multiple containers (%d) with name %s were found: %v", len(containers), name, containers)
}
return &containers[0], nil
}

View File

@@ -0,0 +1,92 @@
package scaler
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"
)
type DockerSwarmScaler struct{}
func (DockerSwarmScaler) ScaleUp(client *client.Client, name string, replicas *uint64) {
log.Infof("Scaling up %s to %d", name, *replicas)
ctx := context.Background()
service, err := GetServiceByName(client, name, ctx)
if err != nil {
println(err)
return
}
service.Spec.Mode.Replicated = &swarm.ReplicatedService{
Replicas: replicas,
}
response, err := client.ServiceUpdate(ctx, service.ID, service.Meta.Version, service.Spec, types.ServiceUpdateOptions{})
if err != nil {
println(err)
return
}
if len(response.Warnings) > 0 {
fmt.Printf("Warnings received scaling up service %s: %v", name, response.Warnings)
}
}
func (DockerSwarmScaler) ScaleDown(client *client.Client, name string) {
log.Infof("Scaling down %s to 0", name)
ctx := context.Background()
container, err := GetContainerByName(client, name, ctx)
if err != nil {
println(err)
return
}
err = client.ContainerStop(ctx, container.ID, nil)
if err != nil {
println(err)
return
}
}
func (DockerSwarmScaler) IsUp(client *client.Client, name string) bool {
ctx := context.Background()
service, err := GetServiceByName(client, name, ctx)
if err != nil {
println(err)
return false
}
return *service.Spec.Mode.Replicated.Replicas > 0
}
func GetServiceByName(client *client.Client, name string, ctx context.Context) (*swarm.Service, error) {
opts := types.ServiceListOptions{
Filters: filters.NewArgs(),
}
opts.Filters.Add("name", name)
services, err := client.ServiceList(ctx, opts)
if err != nil {
return nil, err
}
if len(services) == 0 {
return nil, fmt.Errorf(fmt.Sprintf("service with name %s was not found", name))
}
if len(services) > 1 {
return nil, fmt.Errorf("multiple services (%d) with name %s were found: %v", len(services), name, services)
}
return &services[0], nil
}

9
pkg/scaler/scaler.go Normal file
View File

@@ -0,0 +1,9 @@
package scaler
import "github.com/docker/docker/client"
type Scaler interface {
ScaleUp(client *client.Client, name string, replicas *uint64)
ScaleDown(client *client.Client, name string)
IsUp(client *client.Client, name string) bool
}