From a8981dc751d60df6a193e64a86aff943024249df Mon Sep 17 00:00:00 2001 From: Alexis Couvreur Date: Sun, 16 Nov 2025 18:42:17 -0500 Subject: [PATCH] feat: add nomad provider --- README.md | 17 + docs/providers/nomad.md | 427 ++++++++++++++++++++++ docs/providers/overview.md | 1 + go.mod | 7 + go.sum | 14 + pkg/config/provider.go | 25 +- pkg/provider/nomad/events.go | 161 ++++++++ pkg/provider/nomad/events_test.go | 57 +++ pkg/provider/nomad/job_groups_test.go | 79 ++++ pkg/provider/nomad/job_inspect.go | 93 +++++ pkg/provider/nomad/job_inspect_test.go | 102 ++++++ pkg/provider/nomad/job_list.go | 123 +++++++ pkg/provider/nomad/job_list_test.go | 82 +++++ pkg/provider/nomad/job_start.go | 78 ++++ pkg/provider/nomad/job_start_test.go | 104 ++++++ pkg/provider/nomad/job_stop.go | 76 ++++ pkg/provider/nomad/job_stop_test.go | 104 ++++++ pkg/provider/nomad/nomad.go | 69 ++++ pkg/provider/nomad/testcontainers_test.go | 194 ++++++++++ pkg/provider/nomad/utils.go | 31 ++ pkg/sabliercmd/provider.go | 35 ++ 21 files changed, 1878 insertions(+), 1 deletion(-) create mode 100644 docs/providers/nomad.md create mode 100644 pkg/provider/nomad/events.go create mode 100644 pkg/provider/nomad/events_test.go create mode 100644 pkg/provider/nomad/job_groups_test.go create mode 100644 pkg/provider/nomad/job_inspect.go create mode 100644 pkg/provider/nomad/job_inspect_test.go create mode 100644 pkg/provider/nomad/job_list.go create mode 100644 pkg/provider/nomad/job_list_test.go create mode 100644 pkg/provider/nomad/job_start.go create mode 100644 pkg/provider/nomad/job_start_test.go create mode 100644 pkg/provider/nomad/job_stop.go create mode 100644 pkg/provider/nomad/job_stop_test.go create mode 100644 pkg/provider/nomad/nomad.go create mode 100644 pkg/provider/nomad/testcontainers_test.go create mode 100644 pkg/provider/nomad/utils.go diff --git a/README.md b/README.md index 72c5e76..1f10596 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Whether you don't want to overload your Raspberry Pi, or your QA environment is - [Docker Swarm](#docker-swarm) - [Podman](#podman) - [Kubernetes](#kubernetes) + - [Nomad](#nomad) - [Usage with Reverse Proxies](#usage-with-reverse-proxies) - [Apache APISIX](#apache-apisix) - [Caddy](#caddy) @@ -283,6 +284,22 @@ Sablier provides native Kubernetes support for managing deployments, scaling wor 📚 **[Full Documentation](https://sablierapp.dev/#/providers/kubernetes)** +--- + +### Nomad + +Nomad + +Sablier integrates with HashiCorp Nomad for dynamic scaling of task groups across your cluster. + +**Features:** +- Scale Nomad task groups on demand +- Support for ACL tokens and namespaces +- Event-driven allocation monitoring +- Multi-region support + +📚 **[Full Documentation](https://sablierapp.dev/#/providers/nomad)** + ## Usage with Reverse Proxies ### Apache APISIX diff --git a/docs/providers/nomad.md b/docs/providers/nomad.md new file mode 100644 index 0000000..96e66ca --- /dev/null +++ b/docs/providers/nomad.md @@ -0,0 +1,427 @@ +# Nomad + +The Nomad provider allows Sablier to manage [HashiCorp Nomad](https://www.nomadproject.io/) job task groups, scaling them from 0 to N allocations on demand. + +## Overview + +Sablier integrates with Nomad to: +- Scale task groups to zero when idle +- Scale task groups up on first request +- Monitor allocation health and readiness + +## Use the Nomad Provider + +Configure the provider name as `nomad`: + + + +#### **File (YAML)** + +```yaml +provider: + name: nomad + nomad: + address: "http://127.0.0.1:4646" + namespace: "default" + token: "" # Optional ACL token + region: "" # Optional region +``` + +#### **CLI** + +```bash +sablier start --provider.name=nomad \ + --provider.nomad.address=http://127.0.0.1:4646 \ + --provider.nomad.namespace=default +``` + +#### **Environment Variables** + +```bash +PROVIDER_NAME=nomad +PROVIDER_NOMAD_ADDRESS=http://127.0.0.1:4646 +PROVIDER_NOMAD_NAMESPACE=default +PROVIDER_NOMAD_TOKEN=your-acl-token +PROVIDER_NOMAD_REGION=us-east-1 +``` + + + +## Configuration + +### Connection Settings + +| Setting | Description | Default | Environment Variable | +|------------|---------------------------------------------------------|--------------------------|-------------------------| +| `address` | HTTP address of the Nomad server | `http://127.0.0.1:4646` | `NOMAD_ADDR` | +| `token` | Secret ID of an ACL token (if ACLs are enabled) | `""` | `NOMAD_TOKEN` | +| `namespace`| Target namespace for operations | `default` | `NOMAD_NAMESPACE` | +| `region` | Target region for operations | `""` | `NOMAD_REGION` | + +### Example Configuration + +```yaml +provider: + name: nomad + nomad: + address: "https://nomad.example.com:4646" + namespace: "production" + token: "your-secret-acl-token" + region: "us-west-2" + +server: + port: 10000 + +sessions: + default-duration: 5m + +strategy: + dynamic: + default-theme: "hacker-terminal" +``` + +## Labeling Jobs + +Mark task groups for Sablier management using metadata: + +```hcl +job "whoami" { + datacenters = ["dc1"] + + group "web" { + count = 0 # Start at 0 + + meta { + sablier.enable = "true" + sablier.group = "whoami" # Optional group name + } + + task "server" { + driver = "docker" + + config { + image = "containous/whoami" + ports = ["http"] + } + + resources { + cpu = 100 + memory = 128 + } + } + + network { + port "http" { + to = 80 + } + } + } +} +``` + +### Required Labels + +| Label | Value | Description | +|-------|-------|-------------| +| `sablier.enable` | `"true"` | Enables Sablier management for this task group | + +### Optional Labels + +| Label | Value | Description | +|-------|-------|-------------| +| `sablier.group` | `string` | Group name for managing multiple task groups together (default: `"default"`) | + +## Instance Naming + +Nomad instances are identified using the format: `jobID/taskGroupName` + +Examples: +- `whoami/web` - Task group "web" in job "whoami" +- `api/backend` - Task group "backend" in job "api" + +If you only provide the job ID (e.g., `whoami`), Sablier will assume the task group has the same name as the job. + +## Reverse Proxy Integration + +### Traefik + +```yaml +http: + middlewares: + sablier-whoami: + plugin: + sablier: + sablierUrl: http://sablier:10000 + names: "whoami/web" + sessionDuration: 1m + + routers: + whoami: + rule: "Host(`whoami.localhost`)" + middlewares: + - sablier-whoami + service: whoami + + services: + whoami: + loadBalancer: + servers: + - url: "http://whoami.service.consul:80" +``` + +### Nginx + +```nginx +location / { + set $sablierUrl 'http://sablier:10000'; + set $sablierNames 'whoami/web'; + set $sablierSessionDuration '1m'; + set $sablierNginxInternalRedirect '@whoami'; + + js_content sablier.call; +} + +location @whoami { + proxy_pass http://whoami.service.consul; +} +``` + +## Scaling Behavior + +### Scale Up (0 → N) + +When a request arrives for a scaled-down task group: + +1. Sablier updates the job's task group `count` to the desired value (default: 1) +2. Nomad scheduler places allocations on available nodes +3. Allocations transition through: `pending` → `running` +4. If health checks are configured, Sablier waits for them to pass +5. Once all allocations are healthy, the instance is marked as `ready` + +### Scale Down (N → 0) + +When the session expires: + +1. Sablier updates the job's task group `count` to 0 +2. Nomad gracefully stops all allocations +3. The instance is marked as `not-ready` + +## Health Checks + +Sablier respects Nomad's deployment health checks. If your task group has health checks configured, Sablier will wait for allocations to be marked as healthy before considering the instance ready. + +Example with Consul health checks: + +```hcl +group "web" { + count = 0 + + meta { + sablier.enable = "true" + } + + task "server" { + driver = "docker" + + service { + name = "whoami" + port = "http" + + check { + type = "http" + path = "/" + interval = "10s" + timeout = "2s" + } + } + } +} +``` + +## Permissions (ACL) + +If Nomad ACLs are enabled, the token must have the following permissions: + +```hcl +namespace "default" { + policy = "write" + + capabilities = [ + "read-job", + "submit-job", + "dispatch-job", + "read-logs", + "read-fs", + "alloc-node-exec", + "list-jobs", + "parse-job", + ] +} + +node { + policy = "read" +} +``` + +Create a policy file `sablier-policy.hcl`: + +```hcl +namespace "default" { + policy = "write" +} +``` + +Apply it: + +```bash +nomad acl policy apply sablier ./sablier-policy.hcl +nomad acl token create -name="sablier" -policy=sablier +``` + +Use the generated token's Secret ID in your Sablier configuration. + +## Example Deployment + +### Nomad Job for Sablier + +```hcl +job "sablier" { + datacenters = ["dc1"] + type = "service" + + group "sablier" { + count = 1 + + network { + port "http" { + static = 10000 + to = 10000 + } + } + + task "sablier" { + driver = "docker" + + config { + image = "sablierapp/sablier:1.10.1" + ports = ["http"] + + args = [ + "start", + "--provider.name=nomad", + ] + } + + env { + NOMAD_ADDR = "http://nomad.service.consul:4646" + NOMAD_NAMESPACE = "default" + NOMAD_TOKEN = "${NOMAD_TOKEN}" # Pass via template + } + + template { + data = < +``` + +## Further Reading + +- [Nomad Documentation](https://www.nomadproject.io/docs) +- [Nomad API Reference](https://www.nomadproject.io/api-docs) +- [Nomad ACL System](https://www.nomadproject.io/docs/operations/acl) +- [Nomad Job Specification](https://www.nomadproject.io/docs/job-specification) diff --git a/docs/providers/overview.md b/docs/providers/overview.md index c5d7069..55d1181 100644 --- a/docs/providers/overview.md +++ b/docs/providers/overview.md @@ -18,6 +18,7 @@ A Provider typically has the following capabilities: | [Docker Swarm](docker_swarm) | `docker_swarm` or `swarm` | Scale down to zero and up **services** on demand | | [Kubernetes](kubernetes) | `kubernetes` | Scale down and up **deployments** and **statefulsets** on demand | | [Podman](podman) | `podman` | Stop and start **containers** on demand | +| [Nomad](nomad) | `nomad` | Scale down to zero and up **job task groups** on demand | *Your Provider is not on the list? [Open an issue to request the missing provider here!](https://github.com/sablierapp/sablier/issues/new?assignees=&labels=enhancement%2C+provider&projects=&template=instance-provider-request.md&title=Add+%60%5BPROVIDER%5D%60+provider)* diff --git a/go.mod b/go.mod index a7280b9..02bb856 100644 --- a/go.mod +++ b/go.mod @@ -116,8 +116,13 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/schema v1.4.1 // indirect + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect + github.com/hashicorp/cronexpr v1.1.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-rootcerts v1.0.2 // indirect + github.com/hashicorp/nomad/api v0.0.0-20251112174658-75f43fd4b041 // indirect github.com/hpcloud/tail v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jedib0t/go-pretty/v6 v6.6.6 // indirect @@ -145,6 +150,8 @@ require ( github.com/mdelapenya/tlscert v0.2.0 // indirect github.com/miekg/pkcs11 v1.1.1 // indirect github.com/mistifyio/go-zfs/v3 v3.0.1 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/go-archive v0.1.0 // indirect github.com/moby/patternmatcher v0.6.0 // indirect diff --git a/go.sum b/go.sum index 1838cd7..329d3dd 100644 --- a/go.sum +++ b/go.sum @@ -254,13 +254,23 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E= github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= +github.com/hashicorp/cronexpr v1.1.3 h1:rl5IkxXN2m681EfivTlccqIryzYJSXRGRNa0xeG7NA4= +github.com/hashicorp/cronexpr v1.1.3/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc= +github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8= +github.com/hashicorp/nomad/api v0.0.0-20251112174658-75f43fd4b041 h1:bPS6GhvnL8ae2CG3uz1yJPH+ymYmYCZe7JAXge6eskA= +github.com/hashicorp/nomad/api v0.0.0-20251112174658-75f43fd4b041/go.mod h1:sldFTIgs+FsUeKU3LwVjviAIuksxD8TzDOn02MYwslE= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= @@ -328,6 +338,10 @@ github.com/miekg/pkcs11 v1.1.1 h1:Ugu9pdy6vAYku5DEpVWVFPYnzV+bxB+iRdbuFSu7TvU= github.com/miekg/pkcs11 v1.1.1/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= github.com/mistifyio/go-zfs/v3 v3.0.1 h1:YaoXgBePoMA12+S1u/ddkv+QqxcfiZK4prI6HPnkFiU= github.com/mistifyio/go-zfs/v3 v3.0.1/go.mod h1:CzVgeB0RvF2EGzQnytKVvVSDwmKJXxkOTUGbNrTja/k= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ= diff --git a/pkg/config/provider.go b/pkg/config/provider.go index 9833700..86b12f1 100644 --- a/pkg/config/provider.go +++ b/pkg/config/provider.go @@ -12,6 +12,7 @@ type Provider struct { AutoStopOnStartup bool `yaml:"auto-stop-on-startup,omitempty" default:"true"` Kubernetes Kubernetes Podman Podman + Nomad Nomad } type Kubernetes struct { @@ -34,7 +35,23 @@ type Podman struct { Uri string `mapstructure:"URI" yaml:"uri,omitempty" default:"unix:///run/podman/podman.sock"` } -var providers = []string{"docker", "docker_swarm", "swarm", "kubernetes", "podman"} +type Nomad struct { + // Address is the HTTP address of the Nomad server. + // Defaults to http://127.0.0.1:4646 + // Can also be set via the NOMAD_ADDR environment variable. + Address string `mapstructure:"ADDRESS" yaml:"address,omitempty" default:"http://127.0.0.1:4646"` + // Token is the secret ID of an ACL token for authentication. + // Can also be set via the NOMAD_TOKEN environment variable. + Token string `mapstructure:"TOKEN" yaml:"token,omitempty"` + // Namespace is the target namespace for queries. + // Can also be set via the NOMAD_NAMESPACE environment variable. + Namespace string `mapstructure:"NAMESPACE" yaml:"namespace,omitempty" default:"default"` + // Region is the target region for queries. + // Can also be set via the NOMAD_REGION environment variable. + Region string `mapstructure:"REGION" yaml:"region,omitempty"` +} + +var providers = []string{"docker", "docker_swarm", "swarm", "kubernetes", "podman", "nomad"} func NewProviderConfig() Provider { return Provider{ @@ -48,6 +65,12 @@ func NewProviderConfig() Provider { Podman: Podman{ Uri: "unix:///run/podman/podman.sock", }, + Nomad: Nomad{ + Address: "http://127.0.0.1:4646", + Namespace: "default", + Token: "", + Region: "", + }, } } diff --git a/pkg/provider/nomad/events.go b/pkg/provider/nomad/events.go new file mode 100644 index 0000000..e062750 --- /dev/null +++ b/pkg/provider/nomad/events.go @@ -0,0 +1,161 @@ +package nomad + +import ( + "context" + "time" + + "github.com/hashicorp/nomad/api" +) + +// NotifyInstanceStopped watches for job allocations being stopped/completed +// and sends the instance name to the channel when detected +func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) { + p.l.InfoContext(ctx, "starting nomad events watcher") + + // Use Nomad's event stream API to watch for allocation updates + topics := map[api.Topic][]string{ + api.TopicAllocation: {"*"}, + } + + streamCh, err := p.Client.EventStream().Stream(ctx, topics, 0, &api.QueryOptions{ + Namespace: p.namespace, + }) + + if err != nil { + p.l.ErrorContext(ctx, "failed to start event stream", "error", err) + return + } + + p.l.InfoContext(ctx, "nomad event stream started") + + // Track last seen count for each task group to detect scale-downs + lastSeen := make(map[string]int32) + + // Poll job allocations periodically as a fallback + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + p.l.InfoContext(ctx, "stopping nomad events watcher") + return + + case event := <-streamCh: + if event.Err != nil { + p.l.ErrorContext(ctx, "event stream error", "error", event.Err) + continue + } + + // Process allocation events + for _, e := range event.Events { + if e.Type == "AllocationUpdated" { + p.processAllocationEvent(ctx, e, instance, lastSeen) + } + } + + case <-ticker.C: + // Periodically check all jobs for scale-down as a fallback + p.pollJobAllocations(ctx, instance, lastSeen) + } + } +} + +func (p *Provider) processAllocationEvent(ctx context.Context, event api.Event, instance chan<- string, lastSeen map[string]int32) { + alloc, ok := event.Payload["Allocation"] + if !ok { + return + } + + allocMap, ok := alloc.(map[string]interface{}) + if !ok { + return + } + + jobID, _ := allocMap["JobID"].(string) + taskGroup, _ := allocMap["TaskGroup"].(string) + clientStatus, _ := allocMap["ClientStatus"].(string) + + if jobID == "" || taskGroup == "" { + return + } + + // If allocation stopped, check if this was a scale-down + if clientStatus == "complete" || clientStatus == "failed" || clientStatus == "lost" { + instanceName := formatJobName(jobID, taskGroup) + + // Check current job state + info, err := p.InstanceInspect(ctx, instanceName) + if err != nil { + p.l.WarnContext(ctx, "cannot inspect instance after allocation event", + "instance", instanceName, + "error", err, + ) + return + } + + // If scaled to zero, notify + if info.DesiredReplicas == 0 { + p.l.InfoContext(ctx, "instance scaled to zero detected", + "instance", instanceName, + ) + select { + case instance <- instanceName: + case <-ctx.Done(): + } + } + } +} + +func (p *Provider) pollJobAllocations(ctx context.Context, instance chan<- string, lastSeen map[string]int32) { + jobs := p.Client.Jobs() + jobList, _, err := jobs.List(&api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + p.l.WarnContext(ctx, "failed to list jobs for polling", "error", err) + return + } + + for _, jobStub := range jobList { + job, _, err := jobs.Info(jobStub.ID, &api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + continue + } + + for _, tg := range job.TaskGroups { + if tg.Name == nil || tg.Meta == nil { + continue + } + + // Only watch enabled instances + enabled, hasEnable := tg.Meta[enableLabel] + if !hasEnable || enabled != "true" { + continue + } + + instanceName := formatJobName(*job.ID, *tg.Name) + currentCount := int32(0) + if tg.Count != nil { + currentCount = int32(*tg.Count) + } + + // Check if scaled down to zero + if prev, exists := lastSeen[instanceName]; exists && prev > 0 && currentCount == 0 { + p.l.InfoContext(ctx, "instance scaled to zero detected via polling", + "instance", instanceName, + "previous_count", prev, + ) + select { + case instance <- instanceName: + case <-ctx.Done(): + return + } + } + + lastSeen[instanceName] = currentCount + } + } +} diff --git a/pkg/provider/nomad/events_test.go b/pkg/provider/nomad/events_test.go new file mode 100644 index 0000000..d0529be --- /dev/null +++ b/pkg/provider/nomad/events_test.go @@ -0,0 +1,57 @@ +package nomad_test + +import ( + "context" + "testing" + "time" + + "github.com/neilotoole/slogt" + "github.com/sablierapp/sablier/pkg/provider/nomad" + "gotest.tools/v3/assert" +) + +func TestNomadProvider_NotifyInstanceStopped(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + ctx := context.Background() + nc := setupNomad(t) + + p, err := nomad.New(ctx, nc.client, "default", slogt.New(t)) + assert.NilError(t, err) + + // Create a job with 1 allocation + job, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + Count: 1, + Meta: map[string]string{ + "sablier.enable": "true", + }, + }) + assert.NilError(t, err) + + instanceName := formatJobName(*job.ID, *job.TaskGroups[0].Name) + + // Wait for allocation to be running + err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1) + assert.NilError(t, err) + + // Start watching for stop events + stoppedChan := make(chan string, 1) + go p.NotifyInstanceStopped(ctx, stoppedChan) + + // Give the watcher time to initialize + time.Sleep(2 * time.Second) + + // Scale the job to 0 + err = p.InstanceStop(ctx, instanceName) + assert.NilError(t, err) + + // Wait for the notification + select { + case name := <-stoppedChan: + assert.Equal(t, instanceName, name) + case <-time.After(30 * time.Second): + t.Fatal("timeout waiting for instance stopped notification") + } +} diff --git a/pkg/provider/nomad/job_groups_test.go b/pkg/provider/nomad/job_groups_test.go new file mode 100644 index 0000000..f9473a3 --- /dev/null +++ b/pkg/provider/nomad/job_groups_test.go @@ -0,0 +1,79 @@ +package nomad_test + +import ( + "context" + "testing" + + "github.com/neilotoole/slogt" + "github.com/sablierapp/sablier/pkg/provider/nomad" + "gotest.tools/v3/assert" +) + +func TestNomadProvider_InstanceGroups(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + ctx := context.Background() + nc := setupNomad(t) + + p, err := nomad.New(ctx, nc.client, "default", slogt.New(t)) + assert.NilError(t, err) + + // Create jobs with different groups + job1, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + JobID: "groups-test-1", + Count: 0, + Meta: map[string]string{ + "sablier.enable": "true", + "sablier.group": "group1", + }, + }) + assert.NilError(t, err) + + job2, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + JobID: "groups-test-2", + Count: 0, + Meta: map[string]string{ + "sablier.enable": "true", + "sablier.group": "group1", + }, + }) + assert.NilError(t, err) + + job3, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + JobID: "groups-test-3", + Count: 0, + Meta: map[string]string{ + "sablier.enable": "true", + "sablier.group": "group2", + }, + }) + assert.NilError(t, err) + + // Job without sablier.enable should not be included + _, err = nc.CreateMimicJob(ctx, MimicJobOptions{ + JobID: "groups-test-4", + Count: 0, + }) + assert.NilError(t, err) + + groups, err := p.InstanceGroups(ctx) + assert.NilError(t, err) + + assert.Equal(t, 2, len(groups)) + assert.Equal(t, 2, len(groups["group1"])) + assert.Equal(t, 1, len(groups["group2"])) + + // Check that the instance names are correct + expectedGroup1 := []string{ + formatJobName(*job1.ID, *job1.TaskGroups[0].Name), + formatJobName(*job2.ID, *job2.TaskGroups[0].Name), + } + expectedGroup2 := []string{ + formatJobName(*job3.ID, *job3.TaskGroups[0].Name), + } + + assert.DeepEqual(t, expectedGroup1, groups["group1"]) + assert.DeepEqual(t, expectedGroup2, groups["group2"]) +} diff --git a/pkg/provider/nomad/job_inspect.go b/pkg/provider/nomad/job_inspect.go new file mode 100644 index 0000000..9d07e1d --- /dev/null +++ b/pkg/provider/nomad/job_inspect.go @@ -0,0 +1,93 @@ +package nomad + +import ( + "context" + "fmt" + + "github.com/hashicorp/nomad/api" + "github.com/sablierapp/sablier/pkg/sablier" +) + +// InstanceInspect retrieves the current state of a Nomad job's task group +func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.InstanceInfo, error) { + jobID, groupName, err := parseJobName(name) + if err != nil { + return sablier.InstanceInfo{}, err + } + + jobs := p.Client.Jobs() + job, _, err := jobs.Info(jobID, &api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + return sablier.InstanceInfo{}, fmt.Errorf("cannot get job info: %w", err) + } + + if job == nil { + return sablier.InstanceInfo{}, fmt.Errorf("job %s not found", jobID) + } + + // Find the task group + var targetGroup *api.TaskGroup + for _, tg := range job.TaskGroups { + if tg.Name != nil && *tg.Name == groupName { + targetGroup = tg + break + } + } + + if targetGroup == nil { + return sablier.InstanceInfo{}, fmt.Errorf("task group %s not found in job %s", groupName, jobID) + } + + desiredCount := int32(0) + if targetGroup.Count != nil { + desiredCount = int32(*targetGroup.Count) + } + + // Get allocations for this job to determine actual running count + allocations, _, err := jobs.Allocations(jobID, false, &api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + return sablier.InstanceInfo{}, fmt.Errorf("cannot get job allocations: %w", err) + } + + // Count running allocations for this specific task group + runningCount := int32(0) + for _, alloc := range allocations { + if alloc.TaskGroup == groupName && (alloc.ClientStatus == "running" || alloc.ClientStatus == "pending") { + runningCount++ + } + } + + instanceName := formatJobName(jobID, groupName) + + // Determine status + if desiredCount == 0 { + return sablier.NotReadyInstanceState(instanceName, runningCount, p.desiredReplicas), nil + } + + // Check if all allocations are running + if runningCount == desiredCount && desiredCount > 0 { + // Check allocation health for task groups with health checks + allHealthy := true + for _, alloc := range allocations { + if alloc.TaskGroup == groupName && alloc.ClientStatus == "running" { + // If DeploymentStatus exists and Health is set, check it + if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil { + if !*alloc.DeploymentStatus.Healthy { + allHealthy = false + break + } + } + } + } + + if allHealthy { + return sablier.ReadyInstanceState(instanceName, desiredCount), nil + } + } + + return sablier.NotReadyInstanceState(instanceName, runningCount, desiredCount), nil +} diff --git a/pkg/provider/nomad/job_inspect_test.go b/pkg/provider/nomad/job_inspect_test.go new file mode 100644 index 0000000..f4ad916 --- /dev/null +++ b/pkg/provider/nomad/job_inspect_test.go @@ -0,0 +1,102 @@ +package nomad_test + +import ( + "context" + "testing" + + "github.com/neilotoole/slogt" + "github.com/sablierapp/sablier/pkg/provider/nomad" + "github.com/sablierapp/sablier/pkg/sablier" + "gotest.tools/v3/assert" +) + +func TestNomadProvider_InstanceInspect(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + ctx := context.Background() + + type args struct { + do func(nc *nomadContainer) (string, error) + } + tests := []struct { + name string + args args + want sablier.InstanceInfo + err error + }{ + { + name: "inspect job with running allocation", + args: args{ + do: func(nc *nomadContainer) (string, error) { + job, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + Count: 1, + }) + if err != nil { + return "", err + } + + // Wait for allocation to be running + err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1) + if err != nil { + return "", err + } + + return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil + }, + }, + want: sablier.InstanceInfo{ + CurrentReplicas: 1, + DesiredReplicas: 1, + Status: sablier.InstanceStatusReady, + }, + err: nil, + }, + { + name: "inspect job with 0 allocations", + args: args{ + do: func(nc *nomadContainer) (string, error) { + job, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + Count: 0, + }) + if err != nil { + return "", err + } + + return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil + }, + }, + want: sablier.InstanceInfo{ + CurrentReplicas: 0, + DesiredReplicas: 1, + Status: sablier.InstanceStatusNotReady, + }, + err: nil, + }, + } + + nc := setupNomad(t) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + p, err := nomad.New(ctx, nc.client, "default", slogt.New(t)) + assert.NilError(t, err) + + name, err := tt.args.do(nc) + assert.NilError(t, err) + + info, err := p.InstanceInspect(ctx, name) + if tt.err != nil { + assert.Error(t, err, tt.err.Error()) + } else { + assert.NilError(t, err) + assert.Equal(t, name, info.Name) + assert.Equal(t, tt.want.CurrentReplicas, info.CurrentReplicas) + assert.Equal(t, tt.want.DesiredReplicas, info.DesiredReplicas) + assert.Equal(t, tt.want.Status, info.Status) + } + }) + } +} diff --git a/pkg/provider/nomad/job_list.go b/pkg/provider/nomad/job_list.go new file mode 100644 index 0000000..3e68978 --- /dev/null +++ b/pkg/provider/nomad/job_list.go @@ -0,0 +1,123 @@ +package nomad + +import ( + "context" + + "github.com/hashicorp/nomad/api" + "github.com/sablierapp/sablier/pkg/provider" + "github.com/sablierapp/sablier/pkg/sablier" +) + +const ( + enableLabel = "sablier.enable" + groupLabel = "sablier.group" +) + +// InstanceGroups returns a map of group names to instance names +// It scans all jobs in the namespace looking for the sablier.enable and sablier.group labels +func (p *Provider) InstanceGroups(ctx context.Context) (map[string][]string, error) { + groups := make(map[string][]string) + + jobs := p.Client.Jobs() + jobList, _, err := jobs.List(&api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + return nil, err + } + + for _, jobStub := range jobList { + // Get full job details to access task group metadata + job, _, err := jobs.Info(jobStub.ID, &api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + p.l.WarnContext(ctx, "cannot get job info", "job_id", jobStub.ID, "error", err) + continue + } + + // Check each task group for sablier labels + for _, tg := range job.TaskGroups { + if tg.Name == nil { + continue + } + + // Check meta tags for sablier.enable + if tg.Meta == nil { + continue + } + + enabled, hasEnable := tg.Meta[enableLabel] + if !hasEnable || enabled != "true" { + continue + } + + groupName := "default" + if gn, hasGroup := tg.Meta[groupLabel]; hasGroup && gn != "" { + groupName = gn + } + + instanceName := formatJobName(*job.ID, *tg.Name) + groups[groupName] = append(groups[groupName], instanceName) + } + } + + return groups, nil +} + +// InstanceList returns a list of all instances (task groups) that have Sablier enabled +func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceListOptions) ([]sablier.InstanceConfiguration, error) { + var instances []sablier.InstanceConfiguration + + jobs := p.Client.Jobs() + jobList, _, err := jobs.List(&api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + return nil, err + } + + for _, jobStub := range jobList { + // Get full job details + job, _, err := jobs.Info(jobStub.ID, &api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + p.l.WarnContext(ctx, "cannot get job info", "job_id", jobStub.ID, "error", err) + continue + } + + // Check each task group + for _, tg := range job.TaskGroups { + if tg.Name == nil { + continue + } + + // If All flag is not set, only return enabled instances + if !options.All { + if tg.Meta == nil { + continue + } + enabled, hasEnable := tg.Meta[enableLabel] + if !hasEnable || enabled != "true" { + continue + } + } + + groupName := "default" + if tg.Meta != nil { + if gn, hasGroup := tg.Meta[groupLabel]; hasGroup && gn != "" { + groupName = gn + } + } + + instanceName := formatJobName(*job.ID, *tg.Name) + instances = append(instances, sablier.InstanceConfiguration{ + Name: instanceName, + Group: groupName, + }) + } + } + + return instances, nil +} diff --git a/pkg/provider/nomad/job_list_test.go b/pkg/provider/nomad/job_list_test.go new file mode 100644 index 0000000..0087b95 --- /dev/null +++ b/pkg/provider/nomad/job_list_test.go @@ -0,0 +1,82 @@ +package nomad_test + +import ( + "context" + "testing" + + "github.com/neilotoole/slogt" + "github.com/sablierapp/sablier/pkg/provider" + "github.com/sablierapp/sablier/pkg/provider/nomad" + "gotest.tools/v3/assert" +) + +func TestNomadProvider_InstanceList(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + ctx := context.Background() + nc := setupNomad(t) + + p, err := nomad.New(ctx, nc.client, "default", slogt.New(t)) + assert.NilError(t, err) + + // Create jobs with sablier.enable + job1, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + JobID: "list-test-1", + Count: 0, + Meta: map[string]string{ + "sablier.enable": "true", + "sablier.group": "test-group", + }, + }) + assert.NilError(t, err) + + job2, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + JobID: "list-test-2", + Count: 0, + Meta: map[string]string{ + "sablier.enable": "true", + }, + }) + assert.NilError(t, err) + + // Job without sablier.enable + _, err = nc.CreateMimicJob(ctx, MimicJobOptions{ + JobID: "list-test-3", + Count: 0, + }) + assert.NilError(t, err) + + // Test with All = false (only enabled instances) + instances, err := p.InstanceList(ctx, provider.InstanceListOptions{All: false}) + assert.NilError(t, err) + assert.Equal(t, 2, len(instances)) + + // Test with All = true (all instances) + instancesAll, err := p.InstanceList(ctx, provider.InstanceListOptions{All: true}) + assert.NilError(t, err) + assert.Assert(t, len(instancesAll) >= 3) + + // Verify instance configuration + found := false + for _, inst := range instances { + if inst.Name == formatJobName(*job1.ID, *job1.TaskGroups[0].Name) { + assert.Equal(t, "test-group", inst.Group) + found = true + break + } + } + assert.Assert(t, found, "Expected to find job1 in instances list") + + // Verify default group + foundDefault := false + for _, inst := range instances { + if inst.Name == formatJobName(*job2.ID, *job2.TaskGroups[0].Name) { + assert.Equal(t, "default", inst.Group) + foundDefault = true + break + } + } + assert.Assert(t, foundDefault, "Expected to find job2 with default group") +} diff --git a/pkg/provider/nomad/job_start.go b/pkg/provider/nomad/job_start.go new file mode 100644 index 0000000..f3621d1 --- /dev/null +++ b/pkg/provider/nomad/job_start.go @@ -0,0 +1,78 @@ +package nomad + +import ( + "context" + "fmt" + + "github.com/hashicorp/nomad/api" +) + +// InstanceStart scales the Nomad job's task group to the desired replica count +func (p *Provider) InstanceStart(ctx context.Context, name string) error { + p.l.DebugContext(ctx, "starting instance", "name", name) + + jobID, groupName, err := parseJobName(name) + if err != nil { + return err + } + + jobs := p.Client.Jobs() + job, _, err := jobs.Info(jobID, &api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + return fmt.Errorf("cannot get job info: %w", err) + } + + if job == nil { + return fmt.Errorf("job %s not found", jobID) + } + + // Find the task group + var targetGroup *api.TaskGroup + for _, tg := range job.TaskGroups { + if tg.Name != nil && *tg.Name == groupName { + targetGroup = tg + break + } + } + + if targetGroup == nil { + return fmt.Errorf("task group %s not found in job %s", groupName, jobID) + } + + // Check if already at desired count + currentCount := int32(0) + if targetGroup.Count != nil { + currentCount = int32(*targetGroup.Count) + } + + if currentCount == p.desiredReplicas { + p.l.DebugContext(ctx, "instance already at desired replicas", + "name", name, + "current", currentCount, + "desired", p.desiredReplicas, + ) + return nil + } + + // Scale up + count := int(p.desiredReplicas) + targetGroup.Count = &count + + // Submit the job update + _, _, err = jobs.Register(job, &api.WriteOptions{ + Namespace: p.namespace, + }) + if err != nil { + return fmt.Errorf("cannot scale job: %w", err) + } + + p.l.InfoContext(ctx, "scaled instance up", + "name", name, + "from", currentCount, + "to", p.desiredReplicas, + ) + + return nil +} diff --git a/pkg/provider/nomad/job_start_test.go b/pkg/provider/nomad/job_start_test.go new file mode 100644 index 0000000..00ffb20 --- /dev/null +++ b/pkg/provider/nomad/job_start_test.go @@ -0,0 +1,104 @@ +package nomad_test + +import ( + "context" + "fmt" + "testing" + + "github.com/neilotoole/slogt" + "github.com/sablierapp/sablier/pkg/provider/nomad" + "gotest.tools/v3/assert" +) + +func TestNomadProvider_InstanceStart(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + ctx := context.Background() + + type args struct { + do func(nc *nomadContainer) (string, error) + } + tests := []struct { + name string + args args + err error + }{ + { + name: "start job with 0 allocations", + args: args{ + do: func(nc *nomadContainer) (string, error) { + job, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + Count: 0, + Meta: map[string]string{ + "sablier.enable": "true", + }, + }) + if err != nil { + return "", err + } + return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil + }, + }, + err: nil, + }, + { + name: "start job already at desired count", + args: args{ + do: func(nc *nomadContainer) (string, error) { + job, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + Count: 1, + Meta: map[string]string{ + "sablier.enable": "true", + }, + }) + if err != nil { + return "", err + } + + // Wait for allocation to be running + err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1) + if err != nil { + return "", err + } + + return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil + }, + }, + err: nil, + }, + { + name: "start non-existent job", + args: args{ + do: func(nc *nomadContainer) (string, error) { + return "non-existent/taskgroup", nil + }, + }, + err: fmt.Errorf("job not found"), + }, + } + + nc := setupNomad(t) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := nomad.New(ctx, nc.client, "default", slogt.New(t)) + assert.NilError(t, err) + + name, err := tt.args.do(nc) + assert.NilError(t, err) + + err = p.InstanceStart(ctx, name) + if tt.err != nil { + assert.ErrorContains(t, err, "job not found") + } else { + assert.NilError(t, err) + + // Verify the job was scaled + info, err := p.InstanceInspect(ctx, name) + assert.NilError(t, err) + assert.Equal(t, int32(1), info.DesiredReplicas) + } + }) + } +} diff --git a/pkg/provider/nomad/job_stop.go b/pkg/provider/nomad/job_stop.go new file mode 100644 index 0000000..f4bee12 --- /dev/null +++ b/pkg/provider/nomad/job_stop.go @@ -0,0 +1,76 @@ +package nomad + +import ( + "context" + "fmt" + + "github.com/hashicorp/nomad/api" +) + +// InstanceStop scales the Nomad job's task group to zero +func (p *Provider) InstanceStop(ctx context.Context, name string) error { + p.l.DebugContext(ctx, "stopping instance", "name", name) + + jobID, groupName, err := parseJobName(name) + if err != nil { + return err + } + + jobs := p.Client.Jobs() + job, _, err := jobs.Info(jobID, &api.QueryOptions{ + Namespace: p.namespace, + }) + if err != nil { + return fmt.Errorf("cannot get job info: %w", err) + } + + if job == nil { + return fmt.Errorf("job %s not found", jobID) + } + + // Find the task group + var targetGroup *api.TaskGroup + for _, tg := range job.TaskGroups { + if tg.Name != nil && *tg.Name == groupName { + targetGroup = tg + break + } + } + + if targetGroup == nil { + return fmt.Errorf("task group %s not found in job %s", groupName, jobID) + } + + // Check if already at zero + currentCount := int32(0) + if targetGroup.Count != nil { + currentCount = int32(*targetGroup.Count) + } + + if currentCount == 0 { + p.l.DebugContext(ctx, "instance already stopped", + "name", name, + ) + return nil + } + + // Scale to zero + count := 0 + targetGroup.Count = &count + + // Submit the job update + _, _, err = jobs.Register(job, &api.WriteOptions{ + Namespace: p.namespace, + }) + if err != nil { + return fmt.Errorf("cannot stop job: %w", err) + } + + p.l.InfoContext(ctx, "scaled instance down", + "name", name, + "from", currentCount, + "to", 0, + ) + + return nil +} diff --git a/pkg/provider/nomad/job_stop_test.go b/pkg/provider/nomad/job_stop_test.go new file mode 100644 index 0000000..d40b905 --- /dev/null +++ b/pkg/provider/nomad/job_stop_test.go @@ -0,0 +1,104 @@ +package nomad_test + +import ( + "context" + "fmt" + "testing" + + "github.com/neilotoole/slogt" + "github.com/sablierapp/sablier/pkg/provider/nomad" + "gotest.tools/v3/assert" +) + +func TestNomadProvider_InstanceStop(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + ctx := context.Background() + + type args struct { + do func(nc *nomadContainer) (string, error) + } + tests := []struct { + name string + args args + err error + }{ + { + name: "stop job with running allocations", + args: args{ + do: func(nc *nomadContainer) (string, error) { + job, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + Count: 1, + Meta: map[string]string{ + "sablier.enable": "true", + }, + }) + if err != nil { + return "", err + } + + // Wait for allocation to be running + err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1) + if err != nil { + return "", err + } + + return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil + }, + }, + err: nil, + }, + { + name: "stop job already at 0", + args: args{ + do: func(nc *nomadContainer) (string, error) { + job, err := nc.CreateMimicJob(ctx, MimicJobOptions{ + Count: 0, + Meta: map[string]string{ + "sablier.enable": "true", + }, + }) + if err != nil { + return "", err + } + return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil + }, + }, + err: nil, + }, + { + name: "stop non-existent job", + args: args{ + do: func(nc *nomadContainer) (string, error) { + return "non-existent/taskgroup", nil + }, + }, + err: fmt.Errorf("job not found"), + }, + } + + nc := setupNomad(t) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := nomad.New(ctx, nc.client, "default", slogt.New(t)) + assert.NilError(t, err) + + name, err := tt.args.do(nc) + assert.NilError(t, err) + + err = p.InstanceStop(ctx, name) + if tt.err != nil { + assert.ErrorContains(t, err, "job not found") + } else { + assert.NilError(t, err) + + // Verify the job was scaled to 0 + info, err := p.InstanceInspect(ctx, name) + assert.NilError(t, err) + assert.Equal(t, int32(0), info.DesiredReplicas) + } + }) + } +} diff --git a/pkg/provider/nomad/nomad.go b/pkg/provider/nomad/nomad.go new file mode 100644 index 0000000..43ff54b --- /dev/null +++ b/pkg/provider/nomad/nomad.go @@ -0,0 +1,69 @@ +package nomad + +import ( + "context" + "fmt" + "log/slog" + + "github.com/hashicorp/nomad/api" + "github.com/sablierapp/sablier/pkg/sablier" +) + +// Interface guard +var _ sablier.Provider = (*Provider)(nil) + +type Provider struct { + Client *api.Client + namespace string + desiredReplicas int32 + l *slog.Logger +} + +func New(ctx context.Context, client *api.Client, namespace string, logger *slog.Logger) (*Provider, error) { + logger = logger.With(slog.String("provider", "nomad")) + + if namespace == "" { + namespace = "default" + } + + // Test connection by getting agent self info + agent := client.Agent() + info, err := agent.Self() + if err != nil { + return nil, fmt.Errorf("cannot connect to nomad: %v", err) + } + + version := "unknown" + address := "unknown" + + if info != nil && info.Stats != nil { + if nomadStats, ok := info.Stats["nomad"]; ok { + if versionStr, exists := nomadStats["version"]; exists { + version = versionStr + } + } + } + + if info != nil && info.Config != nil { + if addr, ok := info.Config["AdvertiseAddrs"]; ok { + if addrMap, ok := addr.(map[string]interface{}); ok { + if httpAddr, ok := addrMap["HTTP"].(string); ok { + address = httpAddr + } + } + } + } + + logger.InfoContext(ctx, "connection established with nomad", + slog.String("version", version), + slog.String("namespace", namespace), + slog.String("address", address), + ) + + return &Provider{ + Client: client, + namespace: namespace, + desiredReplicas: 1, + l: logger, + }, nil +} diff --git a/pkg/provider/nomad/testcontainers_test.go b/pkg/provider/nomad/testcontainers_test.go new file mode 100644 index 0000000..df9bb4f --- /dev/null +++ b/pkg/provider/nomad/testcontainers_test.go @@ -0,0 +1,194 @@ +package nomad_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +type nomadContainer struct { + container testcontainers.Container + address string + client *api.Client + t *testing.T +} + +func setupNomad(t *testing.T) *nomadContainer { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + ctx := context.Background() + + req := testcontainers.ContainerRequest{ + Image: "hashicorp/nomad:1.8", + ExposedPorts: []string{"4646/tcp"}, + Cmd: []string{ + "agent", + "-dev", + "-bind=0.0.0.0", + "-network-interface=eth0", + }, + Privileged: true, + WaitingFor: wait.ForLog("Nomad agent started!").WithStartupTimeout(60 * time.Second), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatalf("failed to start nomad container: %s", err) + } + + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Logf("failed to terminate container: %s", err) + } + }) + + host, err := container.Host(ctx) + if err != nil { + t.Fatalf("failed to get container host: %s", err) + } + + port, err := container.MappedPort(ctx, "4646") + if err != nil { + t.Fatalf("failed to get mapped port: %s", err) + } + + address := fmt.Sprintf("http://%s:%s", host, port.Port()) + + // Create Nomad client + config := api.DefaultConfig() + config.Address = address + client, err := api.NewClient(config) + if err != nil { + t.Fatalf("failed to create nomad client: %s", err) + } + + // Wait for Nomad to be ready + time.Sleep(2 * time.Second) + + return &nomadContainer{ + container: container, + address: address, + client: client, + t: t, + } +} + +type MimicJobOptions struct { + JobID string + TaskGroupName string + Count int + Meta map[string]string + Cmd []string +} + +func (nc *nomadContainer) CreateMimicJob(ctx context.Context, opts MimicJobOptions) (*api.Job, error) { + if opts.JobID == "" { + opts.JobID = fmt.Sprintf("mimic-%d", time.Now().UnixNano()) + } + if opts.TaskGroupName == "" { + opts.TaskGroupName = "web" + } + if opts.Cmd == nil { + opts.Cmd = []string{"/mimic", "-running", "-running-after=1s"} + } + + count := opts.Count + job := &api.Job{ + ID: &opts.JobID, + Name: &opts.JobID, + Type: stringToPtr("service"), + Datacenters: []string{"dc1"}, + TaskGroups: []*api.TaskGroup{ + { + Name: &opts.TaskGroupName, + Count: &count, + Meta: opts.Meta, + Tasks: []*api.Task{ + { + Name: "mimic", + Driver: "docker", + Config: map[string]interface{}{ + "image": "sablierapp/mimic:v0.3.1", + "command": opts.Cmd[0], + "args": opts.Cmd[1:], + }, + Resources: &api.Resources{ + CPU: intToPtr(100), + MemoryMB: intToPtr(128), + }, + }, + }, + RestartPolicy: &api.RestartPolicy{ + Attempts: intToPtr(0), + Mode: stringToPtr("fail"), + }, + }, + }, + } + + jobs := nc.client.Jobs() + _, _, err := jobs.Register(job, &api.WriteOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to register job: %w", err) + } + + nc.t.Logf("Created Nomad job %s with task group %s", opts.JobID, opts.TaskGroupName) + + return job, nil +} + +func stringToPtr(s string) *string { + return &s +} + +func intToPtr(i int) *int { + return &i +} + +func WaitForJobAllocations(ctx context.Context, client *api.Client, jobID string, taskGroup string, expectedCount int) error { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + timeout := time.After(60 * time.Second) + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled while waiting for job allocations") + case <-timeout: + return fmt.Errorf("timeout waiting for job allocations") + case <-ticker.C: + jobs := client.Jobs() + allocations, _, err := jobs.Allocations(jobID, false, &api.QueryOptions{}) + if err != nil { + return fmt.Errorf("error getting allocations: %w", err) + } + + runningCount := 0 + for _, alloc := range allocations { + if alloc.TaskGroup == taskGroup && alloc.ClientStatus == "running" { + runningCount++ + } + } + + if runningCount == expectedCount { + return nil + } + } + } +} + +// formatJobName creates the instance name from job ID and task group name +func formatJobName(jobID string, taskGroupName string) string { + return fmt.Sprintf("%s/%s", jobID, taskGroupName) +} diff --git a/pkg/provider/nomad/utils.go b/pkg/provider/nomad/utils.go new file mode 100644 index 0000000..2949424 --- /dev/null +++ b/pkg/provider/nomad/utils.go @@ -0,0 +1,31 @@ +package nomad + +import ( + "fmt" + "strings" +) + +// parseJobName extracts job ID and task group name from the instance name +// Expected format: "jobID/taskGroupName" or just "jobID" (uses default group) +func parseJobName(name string) (string, string, error) { + parts := strings.Split(name, "/") + + if len(parts) == 1 { + // If only job ID provided, use default group name + return parts[0], parts[0], nil + } + + if len(parts) == 2 { + if parts[0] == "" || parts[1] == "" { + return "", "", fmt.Errorf("invalid job name format: %s (expected 'jobID/taskGroupName')", name) + } + return parts[0], parts[1], nil + } + + return "", "", fmt.Errorf("invalid job name format: %s (expected 'jobID/taskGroupName' or 'jobID')", name) +} + +// formatJobName creates the instance name from job ID and task group name +func formatJobName(jobID string, taskGroupName string) string { + return fmt.Sprintf("%s/%s", jobID, taskGroupName) +} diff --git a/pkg/sabliercmd/provider.go b/pkg/sabliercmd/provider.go index 6010a84..c9b89e0 100644 --- a/pkg/sabliercmd/provider.go +++ b/pkg/sabliercmd/provider.go @@ -7,10 +7,12 @@ import ( "github.com/containers/podman/v5/pkg/bindings" "github.com/docker/docker/client" + "github.com/hashicorp/nomad/api" "github.com/sablierapp/sablier/pkg/config" "github.com/sablierapp/sablier/pkg/provider/docker" "github.com/sablierapp/sablier/pkg/provider/dockerswarm" "github.com/sablierapp/sablier/pkg/provider/kubernetes" + "github.com/sablierapp/sablier/pkg/provider/nomad" "github.com/sablierapp/sablier/pkg/provider/podman" "github.com/sablierapp/sablier/pkg/sablier" k8s "k8s.io/client-go/kubernetes" @@ -54,6 +56,39 @@ func setupProvider(ctx context.Context, logger *slog.Logger, config config.Provi return nil, fmt.Errorf("cannot create podman connection: %w", err) } return podman.New(connText, logger) + case "nomad": + // Create Nomad client configuration + nomadConfig := api.DefaultConfig() + + // Set address from config or use default + if config.Nomad.Address != "" { + nomadConfig.Address = config.Nomad.Address + } + + // Set token if provided + if config.Nomad.Token != "" { + nomadConfig.SecretID = config.Nomad.Token + } + + // Set namespace + namespace := config.Nomad.Namespace + if namespace == "" { + namespace = "default" + } + nomadConfig.Namespace = namespace + + // Set region if provided + if config.Nomad.Region != "" { + nomadConfig.Region = config.Nomad.Region + } + + // Create Nomad client + nomadClient, err := api.NewClient(nomadConfig) + if err != nil { + return nil, fmt.Errorf("cannot create nomad client: %v", err) + } + + return nomad.New(ctx, nomadClient, namespace, logger) } return nil, fmt.Errorf("unimplemented provider %s", config.Name) }