mirror of
https://github.com/sablierapp/sablier.git
synced 2025-12-25 14:59:16 +01:00
feat: add persistent storage (#30)
* refactor: remove unused default timeout * feat: add persistent storage Allows you to save the state to a file and load it upon restarting the app to restore the previous state. * chore: upgrade to go 1.18 * use tinykv with generics * build: add "-buildvcs=false" flag Git is not available in golang:1.18-alpine image
This commit is contained in:
4
.github/workflows/build.yml
vendored
4
.github/workflows/build.yml
vendored
@@ -11,10 +11,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
||||
- name: Set up Go 1.17
|
||||
- name: Set up Go 1.18
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ^1.17
|
||||
go-version: ^1.18
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -16,10 +16,10 @@ jobs:
|
||||
fetch-depth: 0
|
||||
persist-credentials: false
|
||||
|
||||
- name: Set up Go 1.17
|
||||
- name: Set up Go 1.18
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ^1.17
|
||||
go-version: ^1.18
|
||||
|
||||
- name: Setup Node.js
|
||||
uses: actions/setup-node@v1
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM golang:1.17-alpine AS build
|
||||
FROM golang:1.18-alpine AS build
|
||||
|
||||
ENV CGO_ENABLED=0
|
||||
ENV PORT 10000
|
||||
@@ -8,7 +8,7 @@ WORKDIR /go/src/ondemand-service
|
||||
|
||||
ARG TARGETOS
|
||||
ARG TARGETARCH
|
||||
RUN GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -o /go/bin/ondemand-service
|
||||
RUN GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -buildvcs=false -o /go/bin/ondemand-service
|
||||
|
||||
FROM alpine
|
||||
EXPOSE 10000
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
- Dynamic loading page (cloudflare or grafana cloud style)
|
||||
- Automatic scale to zero after configured timeout upon last request the service received
|
||||
- Support container/service healthcheck and will not redirect until service is healthy
|
||||
|
||||
## Usage
|
||||
|
||||
### CLI
|
||||
@@ -37,17 +38,17 @@
|
||||
`./traefik-ondemand-service --swarmMode=true --kubernetesMode=false`
|
||||
|
||||
| Argument | Value | Description |
|
||||
| ----------- | ----------------- | ----------------------------------------------------------------------- |
|
||||
| ---------------- | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
|
||||
| `swarmMode` | true,false (default true) | Enable/Disable swarm mode. Used to determine the scaler implementation. |
|
||||
| `kubernetesMode` | true,false (default false) | Enable/Disable Kubernetes mode. Used to determine the scaler implementation. |
|
||||
| `storagePath` | path/to/storage/file (default nil) | Enables persistent storage, file will be used to load previous state upon starting and will sync the current content to memory into the file every 5 seconds |
|
||||
|
||||
### Docker
|
||||
|
||||
- Docker Hub `acouvreur/traefik-ondemand-service`
|
||||
- Ghcr `ghcr.io/acouvreur/traefik-ondemand-service`
|
||||
|
||||
`docker run -v /var/run/docker.sock:/var/run/docker.sock -p 10000:10000
|
||||
ghcr.io/acouvreur/traefik-ondemand-service:latest --swarmode=true`
|
||||
`docker run -v /var/run/docker.sock:/var/run/docker.sock -p 10000:10000 ghcr.io/acouvreur/traefik-ondemand-service:latest --swarmode=true`
|
||||
|
||||
### Kubernetes
|
||||
|
||||
|
||||
74
go.mod
74
go.mod
@@ -1,57 +1,65 @@
|
||||
module github.com/acouvreur/traefik-ondemand-service
|
||||
|
||||
go 1.17
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/Microsoft/go-winio v0.5.0 // indirect
|
||||
github.com/containerd/containerd v1.5.5 // indirect
|
||||
github.com/docker/docker v20.10.8+incompatible
|
||||
github.com/Microsoft/go-winio v0.5.2 // indirect
|
||||
github.com/docker/docker v20.10.15+incompatible
|
||||
github.com/docker/go-connections v0.4.0 // indirect
|
||||
github.com/gorilla/mux v1.8.0 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/stretchr/testify v1.7.0
|
||||
golang.org/x/net v0.0.0-20210924151903-3ad01bbaa167 // indirect
|
||||
golang.org/x/sys v0.0.0-20210927052749-1cf2251ac284 // indirect
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
|
||||
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0 // indirect
|
||||
gopkg.in/dc0d/tinykv.v4 v4.0.1
|
||||
github.com/stretchr/testify v1.7.1
|
||||
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
|
||||
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
|
||||
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
k8s.io/apimachinery v0.22.3
|
||||
k8s.io/client-go v0.22.3
|
||||
k8s.io/apimachinery v0.24.0
|
||||
k8s.io/client-go v0.24.0
|
||||
)
|
||||
|
||||
require k8s.io/api v0.22.3
|
||||
require (
|
||||
github.com/acouvreur/tinykv v0.0.0-20220508151744-f4c55f3d44f6
|
||||
k8s.io/api v0.24.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/docker/distribution v2.7.1+incompatible // indirect
|
||||
github.com/docker/distribution v2.8.1+incompatible // indirect
|
||||
github.com/docker/go-units v0.4.0 // indirect
|
||||
github.com/go-logr/logr v0.4.0 // indirect
|
||||
github.com/emicklei/go-restful v2.15.0+incompatible // indirect
|
||||
github.com/go-logr/logr v1.2.3 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.0 // indirect
|
||||
github.com/go-openapi/swag v0.21.1 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/go-cmp v0.5.5 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/googleapis/gnostic v0.5.5 // indirect
|
||||
github.com/json-iterator/go v1.1.11 // indirect
|
||||
github.com/google/gnostic v0.6.9 // indirect
|
||||
github.com/google/go-cmp v0.5.8 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.1 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/stretchr/objx v0.3.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
|
||||
golang.org/x/text v0.3.6 // indirect
|
||||
google.golang.org/appengine v1.6.5 // indirect
|
||||
google.golang.org/grpc v1.41.0 // indirect
|
||||
google.golang.org/protobuf v1.27.1 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
|
||||
golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/protobuf v1.28.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
k8s.io/klog/v2 v2.9.0 // indirect
|
||||
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect
|
||||
sigs.k8s.io/yaml v1.2.0 // indirect
|
||||
gotest.tools/v3 v3.0.3 // indirect
|
||||
k8s.io/klog/v2 v2.60.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20220413171646-5e7f5fdc6da6 // indirect
|
||||
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
)
|
||||
|
||||
53
main.go
53
main.go
@@ -1,22 +1,23 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/acouvreur/tinykv"
|
||||
"github.com/acouvreur/traefik-ondemand-service/pkg/scaler"
|
||||
"github.com/acouvreur/traefik-ondemand-service/pkg/storage"
|
||||
"github.com/docker/docker/client"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"gopkg.in/dc0d/tinykv.v4"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
var defaultTimeout = time.Second * 5
|
||||
|
||||
type OnDemandRequestState struct {
|
||||
State string `json:"state"`
|
||||
Name string `json:"name"`
|
||||
@@ -26,13 +27,34 @@ func main() {
|
||||
|
||||
swarmMode := flag.Bool("swarmMode", true, "Enable swarm mode")
|
||||
kubernetesMode := flag.Bool("kubernetesMode", false, "Enable Kubernetes mode")
|
||||
storagePath := flag.String("storagePath", "", "Enable persistent storage")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
dockerScaler := getDockerScaler(*swarmMode, *kubernetesMode)
|
||||
|
||||
store := tinykv.New[OnDemandRequestState](time.Second*20, func(key string, _ OnDemandRequestState) {
|
||||
// Auto scale down after timeout
|
||||
err := dockerScaler.ScaleDown(key)
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("error scaling down %s: %s", key, err.Error())
|
||||
}
|
||||
})
|
||||
|
||||
if storagePath != nil && len(*storagePath) > 0 {
|
||||
file, err := os.OpenFile(*storagePath, os.O_RDWR|os.O_CREATE, 0755)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
json.NewDecoder(file).Decode(store)
|
||||
storage.New(file, time.Second*5, store)
|
||||
}
|
||||
|
||||
fmt.Printf("Server listening on port 10000, swarmMode: %t, kubernetesMode: %t\n", *swarmMode, *kubernetesMode)
|
||||
http.HandleFunc("/", onDemand(dockerScaler))
|
||||
http.HandleFunc("/", onDemand(dockerScaler, store))
|
||||
log.Fatal(http.ListenAndServe(":10000", nil))
|
||||
}
|
||||
|
||||
@@ -71,18 +93,7 @@ func getDockerScaler(swarmMode, kubernetesMode bool) scaler.Scaler {
|
||||
panic("invalid mode")
|
||||
}
|
||||
|
||||
func onDemand(scaler scaler.Scaler) func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
store := tinykv.New(time.Second*20, func(key string, _ interface{}) {
|
||||
// Auto scale down after timeout
|
||||
err := scaler.ScaleDown(key)
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("error scaling down %s: %s", key, err.Error())
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
func onDemand(scaler scaler.Scaler, store tinykv.KV[OnDemandRequestState]) func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
name, err := getParam(r.URL.Query(), "name")
|
||||
@@ -109,7 +120,7 @@ func onDemand(scaler scaler.Scaler) func(w http.ResponseWriter, r *http.Request)
|
||||
requestState, exists := store.Get(name)
|
||||
|
||||
// 1. Check against the current state
|
||||
if !exists || requestState.(OnDemandRequestState).State != "started" {
|
||||
if !exists || requestState.State != "started" {
|
||||
if scaler.IsUp(name) {
|
||||
requestState = OnDemandRequestState{
|
||||
State: "started",
|
||||
@@ -133,13 +144,13 @@ func onDemand(scaler scaler.Scaler) func(w http.ResponseWriter, r *http.Request)
|
||||
store.Put(name, requestState, tinykv.ExpiresAfter(timeout))
|
||||
|
||||
// 3. Serve depending on the current state
|
||||
switch requestState.(OnDemandRequestState).State {
|
||||
switch requestState.State {
|
||||
case "starting":
|
||||
ServeHTTPRequestState(rw, requestState.(OnDemandRequestState))
|
||||
ServeHTTPRequestState(rw, requestState)
|
||||
case "started":
|
||||
ServeHTTPRequestState(rw, requestState.(OnDemandRequestState))
|
||||
ServeHTTPRequestState(rw, requestState)
|
||||
default:
|
||||
ServeHTTPInternalError(rw, fmt.Errorf("unknown state %s", requestState.(OnDemandRequestState).State))
|
||||
ServeHTTPInternalError(rw, fmt.Errorf("unknown state %s", requestState.State))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,9 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/acouvreur/tinykv"
|
||||
)
|
||||
|
||||
type ScalerMock struct {
|
||||
@@ -50,7 +53,9 @@ func TestOndemand_ServeHTTP(t *testing.T) {
|
||||
request := httptest.NewRequest("GET", "/?name=whoami&timeout=5m", nil)
|
||||
responseRecorder := httptest.NewRecorder()
|
||||
|
||||
onDemandHandler := onDemand(test.scaler)
|
||||
store := tinykv.New[OnDemandRequestState](time.Second * 20)
|
||||
|
||||
onDemandHandler := onDemand(test.scaler, store)
|
||||
onDemandHandler(responseRecorder, request)
|
||||
|
||||
body := responseRecorder.Body.String()
|
||||
|
||||
62
pkg/storage/periodic_sync.go
Normal file
62
pkg/storage/periodic_sync.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PS interface {
|
||||
Stop()
|
||||
}
|
||||
|
||||
type PeriodicSync struct {
|
||||
File *os.File
|
||||
Interval time.Duration
|
||||
Content interface{}
|
||||
|
||||
stop chan struct{}
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
func New(file *os.File, interval time.Duration, content interface{}) PS {
|
||||
sync := &PeriodicSync{
|
||||
File: file,
|
||||
Interval: interval,
|
||||
Content: content,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
go sync.syncLoop()
|
||||
|
||||
return sync
|
||||
}
|
||||
|
||||
func (sync *PeriodicSync) Stop() {
|
||||
sync.stopOnce.Do(func() { close(sync.stop) })
|
||||
}
|
||||
|
||||
func (sync *PeriodicSync) save() error {
|
||||
sync.File.Truncate(0)
|
||||
sync.File.Seek(0, 0)
|
||||
return json.NewEncoder(sync.File).Encode(sync.Content)
|
||||
}
|
||||
|
||||
func (sync *PeriodicSync) syncLoop() {
|
||||
interval := sync.Interval
|
||||
expireTime := time.NewTimer(interval)
|
||||
for {
|
||||
select {
|
||||
case <-sync.stop:
|
||||
return
|
||||
case <-expireTime.C:
|
||||
err := sync.save()
|
||||
if err != nil {
|
||||
log.Println("Error saving", err)
|
||||
}
|
||||
expireTime.Reset(interval)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user