diff --git a/README.md b/README.md
index 72c5e76..1f10596 100644
--- a/README.md
+++ b/README.md
@@ -26,6 +26,7 @@ Whether you don't want to overload your Raspberry Pi, or your QA environment is
- [Docker Swarm](#docker-swarm)
- [Podman](#podman)
- [Kubernetes](#kubernetes)
+ - [Nomad](#nomad)
- [Usage with Reverse Proxies](#usage-with-reverse-proxies)
- [Apache APISIX](#apache-apisix)
- [Caddy](#caddy)
@@ -283,6 +284,22 @@ Sablier provides native Kubernetes support for managing deployments, scaling wor
📚 **[Full Documentation](https://sablierapp.dev/#/providers/kubernetes)**
+---
+
+### Nomad
+
+
+
+Sablier integrates with HashiCorp Nomad for dynamic scaling of task groups across your cluster.
+
+**Features:**
+- Scale Nomad task groups on demand
+- Support for ACL tokens and namespaces
+- Event-driven allocation monitoring
+- Multi-region support
+
+📚 **[Full Documentation](https://sablierapp.dev/#/providers/nomad)**
+
## Usage with Reverse Proxies
### Apache APISIX
diff --git a/docs/providers/nomad.md b/docs/providers/nomad.md
new file mode 100644
index 0000000..96e66ca
--- /dev/null
+++ b/docs/providers/nomad.md
@@ -0,0 +1,427 @@
+# Nomad
+
+The Nomad provider allows Sablier to manage [HashiCorp Nomad](https://www.nomadproject.io/) job task groups, scaling them from 0 to N allocations on demand.
+
+## Overview
+
+Sablier integrates with Nomad to:
+- Scale task groups to zero when idle
+- Scale task groups up on first request
+- Monitor allocation health and readiness
+
+## Use the Nomad Provider
+
+Configure the provider name as `nomad`:
+
+
+
+#### **File (YAML)**
+
+```yaml
+provider:
+ name: nomad
+ nomad:
+ address: "http://127.0.0.1:4646"
+ namespace: "default"
+ token: "" # Optional ACL token
+ region: "" # Optional region
+```
+
+#### **CLI**
+
+```bash
+sablier start --provider.name=nomad \
+ --provider.nomad.address=http://127.0.0.1:4646 \
+ --provider.nomad.namespace=default
+```
+
+#### **Environment Variables**
+
+```bash
+PROVIDER_NAME=nomad
+PROVIDER_NOMAD_ADDRESS=http://127.0.0.1:4646
+PROVIDER_NOMAD_NAMESPACE=default
+PROVIDER_NOMAD_TOKEN=your-acl-token
+PROVIDER_NOMAD_REGION=us-east-1
+```
+
+
+
+## Configuration
+
+### Connection Settings
+
+| Setting | Description | Default | Environment Variable |
+|------------|---------------------------------------------------------|--------------------------|-------------------------|
+| `address` | HTTP address of the Nomad server | `http://127.0.0.1:4646` | `NOMAD_ADDR` |
+| `token` | Secret ID of an ACL token (if ACLs are enabled) | `""` | `NOMAD_TOKEN` |
+| `namespace`| Target namespace for operations | `default` | `NOMAD_NAMESPACE` |
+| `region` | Target region for operations | `""` | `NOMAD_REGION` |
+
+### Example Configuration
+
+```yaml
+provider:
+ name: nomad
+ nomad:
+ address: "https://nomad.example.com:4646"
+ namespace: "production"
+ token: "your-secret-acl-token"
+ region: "us-west-2"
+
+server:
+ port: 10000
+
+sessions:
+ default-duration: 5m
+
+strategy:
+ dynamic:
+ default-theme: "hacker-terminal"
+```
+
+## Labeling Jobs
+
+Mark task groups for Sablier management using metadata:
+
+```hcl
+job "whoami" {
+ datacenters = ["dc1"]
+
+ group "web" {
+ count = 0 # Start at 0
+
+ meta {
+ sablier.enable = "true"
+ sablier.group = "whoami" # Optional group name
+ }
+
+ task "server" {
+ driver = "docker"
+
+ config {
+ image = "containous/whoami"
+ ports = ["http"]
+ }
+
+ resources {
+ cpu = 100
+ memory = 128
+ }
+ }
+
+ network {
+ port "http" {
+ to = 80
+ }
+ }
+ }
+}
+```
+
+### Required Labels
+
+| Label | Value | Description |
+|-------|-------|-------------|
+| `sablier.enable` | `"true"` | Enables Sablier management for this task group |
+
+### Optional Labels
+
+| Label | Value | Description |
+|-------|-------|-------------|
+| `sablier.group` | `string` | Group name for managing multiple task groups together (default: `"default"`) |
+
+## Instance Naming
+
+Nomad instances are identified using the format: `jobID/taskGroupName`
+
+Examples:
+- `whoami/web` - Task group "web" in job "whoami"
+- `api/backend` - Task group "backend" in job "api"
+
+If you only provide the job ID (e.g., `whoami`), Sablier will assume the task group has the same name as the job.
+
+## Reverse Proxy Integration
+
+### Traefik
+
+```yaml
+http:
+ middlewares:
+ sablier-whoami:
+ plugin:
+ sablier:
+ sablierUrl: http://sablier:10000
+ names: "whoami/web"
+ sessionDuration: 1m
+
+ routers:
+ whoami:
+ rule: "Host(`whoami.localhost`)"
+ middlewares:
+ - sablier-whoami
+ service: whoami
+
+ services:
+ whoami:
+ loadBalancer:
+ servers:
+ - url: "http://whoami.service.consul:80"
+```
+
+### Nginx
+
+```nginx
+location / {
+ set $sablierUrl 'http://sablier:10000';
+ set $sablierNames 'whoami/web';
+ set $sablierSessionDuration '1m';
+ set $sablierNginxInternalRedirect '@whoami';
+
+ js_content sablier.call;
+}
+
+location @whoami {
+ proxy_pass http://whoami.service.consul;
+}
+```
+
+## Scaling Behavior
+
+### Scale Up (0 → N)
+
+When a request arrives for a scaled-down task group:
+
+1. Sablier updates the job's task group `count` to the desired value (default: 1)
+2. Nomad scheduler places allocations on available nodes
+3. Allocations transition through: `pending` → `running`
+4. If health checks are configured, Sablier waits for them to pass
+5. Once all allocations are healthy, the instance is marked as `ready`
+
+### Scale Down (N → 0)
+
+When the session expires:
+
+1. Sablier updates the job's task group `count` to 0
+2. Nomad gracefully stops all allocations
+3. The instance is marked as `not-ready`
+
+## Health Checks
+
+Sablier respects Nomad's deployment health checks. If your task group has health checks configured, Sablier will wait for allocations to be marked as healthy before considering the instance ready.
+
+Example with Consul health checks:
+
+```hcl
+group "web" {
+ count = 0
+
+ meta {
+ sablier.enable = "true"
+ }
+
+ task "server" {
+ driver = "docker"
+
+ service {
+ name = "whoami"
+ port = "http"
+
+ check {
+ type = "http"
+ path = "/"
+ interval = "10s"
+ timeout = "2s"
+ }
+ }
+ }
+}
+```
+
+## Permissions (ACL)
+
+If Nomad ACLs are enabled, the token must have the following permissions:
+
+```hcl
+namespace "default" {
+ policy = "write"
+
+ capabilities = [
+ "read-job",
+ "submit-job",
+ "dispatch-job",
+ "read-logs",
+ "read-fs",
+ "alloc-node-exec",
+ "list-jobs",
+ "parse-job",
+ ]
+}
+
+node {
+ policy = "read"
+}
+```
+
+Create a policy file `sablier-policy.hcl`:
+
+```hcl
+namespace "default" {
+ policy = "write"
+}
+```
+
+Apply it:
+
+```bash
+nomad acl policy apply sablier ./sablier-policy.hcl
+nomad acl token create -name="sablier" -policy=sablier
+```
+
+Use the generated token's Secret ID in your Sablier configuration.
+
+## Example Deployment
+
+### Nomad Job for Sablier
+
+```hcl
+job "sablier" {
+ datacenters = ["dc1"]
+ type = "service"
+
+ group "sablier" {
+ count = 1
+
+ network {
+ port "http" {
+ static = 10000
+ to = 10000
+ }
+ }
+
+ task "sablier" {
+ driver = "docker"
+
+ config {
+ image = "sablierapp/sablier:1.10.1"
+ ports = ["http"]
+
+ args = [
+ "start",
+ "--provider.name=nomad",
+ ]
+ }
+
+ env {
+ NOMAD_ADDR = "http://nomad.service.consul:4646"
+ NOMAD_NAMESPACE = "default"
+ NOMAD_TOKEN = "${NOMAD_TOKEN}" # Pass via template
+ }
+
+ template {
+ data = <
+```
+
+## Further Reading
+
+- [Nomad Documentation](https://www.nomadproject.io/docs)
+- [Nomad API Reference](https://www.nomadproject.io/api-docs)
+- [Nomad ACL System](https://www.nomadproject.io/docs/operations/acl)
+- [Nomad Job Specification](https://www.nomadproject.io/docs/job-specification)
diff --git a/docs/providers/overview.md b/docs/providers/overview.md
index c5d7069..55d1181 100644
--- a/docs/providers/overview.md
+++ b/docs/providers/overview.md
@@ -18,6 +18,7 @@ A Provider typically has the following capabilities:
| [Docker Swarm](docker_swarm) | `docker_swarm` or `swarm` | Scale down to zero and up **services** on demand |
| [Kubernetes](kubernetes) | `kubernetes` | Scale down and up **deployments** and **statefulsets** on demand |
| [Podman](podman) | `podman` | Stop and start **containers** on demand |
+| [Nomad](nomad) | `nomad` | Scale down to zero and up **job task groups** on demand |
*Your Provider is not on the list? [Open an issue to request the missing provider here!](https://github.com/sablierapp/sablier/issues/new?assignees=&labels=enhancement%2C+provider&projects=&template=instance-provider-request.md&title=Add+%60%5BPROVIDER%5D%60+provider)*
diff --git a/go.mod b/go.mod
index a7280b9..02bb856 100644
--- a/go.mod
+++ b/go.mod
@@ -116,8 +116,13 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/schema v1.4.1 // indirect
+ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
+ github.com/hashicorp/cronexpr v1.1.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
+ github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
+ github.com/hashicorp/go-rootcerts v1.0.2 // indirect
+ github.com/hashicorp/nomad/api v0.0.0-20251112174658-75f43fd4b041 // indirect
github.com/hpcloud/tail v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jedib0t/go-pretty/v6 v6.6.6 // indirect
@@ -145,6 +150,8 @@ require (
github.com/mdelapenya/tlscert v0.2.0 // indirect
github.com/miekg/pkcs11 v1.1.1 // indirect
github.com/mistifyio/go-zfs/v3 v3.0.1 // indirect
+ github.com/mitchellh/go-homedir v1.1.0 // indirect
+ github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/go-archive v0.1.0 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
diff --git a/go.sum b/go.sum
index 1838cd7..329d3dd 100644
--- a/go.sum
+++ b/go.sum
@@ -254,13 +254,23 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/schema v1.4.1 h1:jUg5hUjCSDZpNGLuXQOgIWGdlgrIdYvgQ0wZtdK1M3E=
github.com/gorilla/schema v1.4.1/go.mod h1:Dg5SSm5PV60mhF2NFaTV1xuYYj8tV8NOPRo4FggUMnM=
+github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
+github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
+github.com/hashicorp/cronexpr v1.1.3 h1:rl5IkxXN2m681EfivTlccqIryzYJSXRGRNa0xeG7NA4=
+github.com/hashicorp/cronexpr v1.1.3/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
+github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
+github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
+github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
+github.com/hashicorp/nomad/api v0.0.0-20251112174658-75f43fd4b041 h1:bPS6GhvnL8ae2CG3uz1yJPH+ymYmYCZe7JAXge6eskA=
+github.com/hashicorp/nomad/api v0.0.0-20251112174658-75f43fd4b041/go.mod h1:sldFTIgs+FsUeKU3LwVjviAIuksxD8TzDOn02MYwslE=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
@@ -328,6 +338,10 @@ github.com/miekg/pkcs11 v1.1.1 h1:Ugu9pdy6vAYku5DEpVWVFPYnzV+bxB+iRdbuFSu7TvU=
github.com/miekg/pkcs11 v1.1.1/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
github.com/mistifyio/go-zfs/v3 v3.0.1 h1:YaoXgBePoMA12+S1u/ddkv+QqxcfiZK4prI6HPnkFiU=
github.com/mistifyio/go-zfs/v3 v3.0.1/go.mod h1:CzVgeB0RvF2EGzQnytKVvVSDwmKJXxkOTUGbNrTja/k=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ=
diff --git a/pkg/config/provider.go b/pkg/config/provider.go
index 9833700..86b12f1 100644
--- a/pkg/config/provider.go
+++ b/pkg/config/provider.go
@@ -12,6 +12,7 @@ type Provider struct {
AutoStopOnStartup bool `yaml:"auto-stop-on-startup,omitempty" default:"true"`
Kubernetes Kubernetes
Podman Podman
+ Nomad Nomad
}
type Kubernetes struct {
@@ -34,7 +35,23 @@ type Podman struct {
Uri string `mapstructure:"URI" yaml:"uri,omitempty" default:"unix:///run/podman/podman.sock"`
}
-var providers = []string{"docker", "docker_swarm", "swarm", "kubernetes", "podman"}
+type Nomad struct {
+ // Address is the HTTP address of the Nomad server.
+ // Defaults to http://127.0.0.1:4646
+ // Can also be set via the NOMAD_ADDR environment variable.
+ Address string `mapstructure:"ADDRESS" yaml:"address,omitempty" default:"http://127.0.0.1:4646"`
+ // Token is the secret ID of an ACL token for authentication.
+ // Can also be set via the NOMAD_TOKEN environment variable.
+ Token string `mapstructure:"TOKEN" yaml:"token,omitempty"`
+ // Namespace is the target namespace for queries.
+ // Can also be set via the NOMAD_NAMESPACE environment variable.
+ Namespace string `mapstructure:"NAMESPACE" yaml:"namespace,omitempty" default:"default"`
+ // Region is the target region for queries.
+ // Can also be set via the NOMAD_REGION environment variable.
+ Region string `mapstructure:"REGION" yaml:"region,omitempty"`
+}
+
+var providers = []string{"docker", "docker_swarm", "swarm", "kubernetes", "podman", "nomad"}
func NewProviderConfig() Provider {
return Provider{
@@ -48,6 +65,12 @@ func NewProviderConfig() Provider {
Podman: Podman{
Uri: "unix:///run/podman/podman.sock",
},
+ Nomad: Nomad{
+ Address: "http://127.0.0.1:4646",
+ Namespace: "default",
+ Token: "",
+ Region: "",
+ },
}
}
diff --git a/pkg/provider/nomad/events.go b/pkg/provider/nomad/events.go
new file mode 100644
index 0000000..e062750
--- /dev/null
+++ b/pkg/provider/nomad/events.go
@@ -0,0 +1,161 @@
+package nomad
+
+import (
+ "context"
+ "time"
+
+ "github.com/hashicorp/nomad/api"
+)
+
+// NotifyInstanceStopped watches for job allocations being stopped/completed
+// and sends the instance name to the channel when detected
+func (p *Provider) NotifyInstanceStopped(ctx context.Context, instance chan<- string) {
+ p.l.InfoContext(ctx, "starting nomad events watcher")
+
+ // Use Nomad's event stream API to watch for allocation updates
+ topics := map[api.Topic][]string{
+ api.TopicAllocation: {"*"},
+ }
+
+ streamCh, err := p.Client.EventStream().Stream(ctx, topics, 0, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+
+ if err != nil {
+ p.l.ErrorContext(ctx, "failed to start event stream", "error", err)
+ return
+ }
+
+ p.l.InfoContext(ctx, "nomad event stream started")
+
+ // Track last seen count for each task group to detect scale-downs
+ lastSeen := make(map[string]int32)
+
+ // Poll job allocations periodically as a fallback
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ p.l.InfoContext(ctx, "stopping nomad events watcher")
+ return
+
+ case event := <-streamCh:
+ if event.Err != nil {
+ p.l.ErrorContext(ctx, "event stream error", "error", event.Err)
+ continue
+ }
+
+ // Process allocation events
+ for _, e := range event.Events {
+ if e.Type == "AllocationUpdated" {
+ p.processAllocationEvent(ctx, e, instance, lastSeen)
+ }
+ }
+
+ case <-ticker.C:
+ // Periodically check all jobs for scale-down as a fallback
+ p.pollJobAllocations(ctx, instance, lastSeen)
+ }
+ }
+}
+
+func (p *Provider) processAllocationEvent(ctx context.Context, event api.Event, instance chan<- string, lastSeen map[string]int32) {
+ alloc, ok := event.Payload["Allocation"]
+ if !ok {
+ return
+ }
+
+ allocMap, ok := alloc.(map[string]interface{})
+ if !ok {
+ return
+ }
+
+ jobID, _ := allocMap["JobID"].(string)
+ taskGroup, _ := allocMap["TaskGroup"].(string)
+ clientStatus, _ := allocMap["ClientStatus"].(string)
+
+ if jobID == "" || taskGroup == "" {
+ return
+ }
+
+ // If allocation stopped, check if this was a scale-down
+ if clientStatus == "complete" || clientStatus == "failed" || clientStatus == "lost" {
+ instanceName := formatJobName(jobID, taskGroup)
+
+ // Check current job state
+ info, err := p.InstanceInspect(ctx, instanceName)
+ if err != nil {
+ p.l.WarnContext(ctx, "cannot inspect instance after allocation event",
+ "instance", instanceName,
+ "error", err,
+ )
+ return
+ }
+
+ // If scaled to zero, notify
+ if info.DesiredReplicas == 0 {
+ p.l.InfoContext(ctx, "instance scaled to zero detected",
+ "instance", instanceName,
+ )
+ select {
+ case instance <- instanceName:
+ case <-ctx.Done():
+ }
+ }
+ }
+}
+
+func (p *Provider) pollJobAllocations(ctx context.Context, instance chan<- string, lastSeen map[string]int32) {
+ jobs := p.Client.Jobs()
+ jobList, _, err := jobs.List(&api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ p.l.WarnContext(ctx, "failed to list jobs for polling", "error", err)
+ return
+ }
+
+ for _, jobStub := range jobList {
+ job, _, err := jobs.Info(jobStub.ID, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ continue
+ }
+
+ for _, tg := range job.TaskGroups {
+ if tg.Name == nil || tg.Meta == nil {
+ continue
+ }
+
+ // Only watch enabled instances
+ enabled, hasEnable := tg.Meta[enableLabel]
+ if !hasEnable || enabled != "true" {
+ continue
+ }
+
+ instanceName := formatJobName(*job.ID, *tg.Name)
+ currentCount := int32(0)
+ if tg.Count != nil {
+ currentCount = int32(*tg.Count)
+ }
+
+ // Check if scaled down to zero
+ if prev, exists := lastSeen[instanceName]; exists && prev > 0 && currentCount == 0 {
+ p.l.InfoContext(ctx, "instance scaled to zero detected via polling",
+ "instance", instanceName,
+ "previous_count", prev,
+ )
+ select {
+ case instance <- instanceName:
+ case <-ctx.Done():
+ return
+ }
+ }
+
+ lastSeen[instanceName] = currentCount
+ }
+ }
+}
diff --git a/pkg/provider/nomad/events_test.go b/pkg/provider/nomad/events_test.go
new file mode 100644
index 0000000..d0529be
--- /dev/null
+++ b/pkg/provider/nomad/events_test.go
@@ -0,0 +1,57 @@
+package nomad_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/neilotoole/slogt"
+ "github.com/sablierapp/sablier/pkg/provider/nomad"
+ "gotest.tools/v3/assert"
+)
+
+func TestNomadProvider_NotifyInstanceStopped(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ ctx := context.Background()
+ nc := setupNomad(t)
+
+ p, err := nomad.New(ctx, nc.client, "default", slogt.New(t))
+ assert.NilError(t, err)
+
+ // Create a job with 1 allocation
+ job, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ Count: 1,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ },
+ })
+ assert.NilError(t, err)
+
+ instanceName := formatJobName(*job.ID, *job.TaskGroups[0].Name)
+
+ // Wait for allocation to be running
+ err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1)
+ assert.NilError(t, err)
+
+ // Start watching for stop events
+ stoppedChan := make(chan string, 1)
+ go p.NotifyInstanceStopped(ctx, stoppedChan)
+
+ // Give the watcher time to initialize
+ time.Sleep(2 * time.Second)
+
+ // Scale the job to 0
+ err = p.InstanceStop(ctx, instanceName)
+ assert.NilError(t, err)
+
+ // Wait for the notification
+ select {
+ case name := <-stoppedChan:
+ assert.Equal(t, instanceName, name)
+ case <-time.After(30 * time.Second):
+ t.Fatal("timeout waiting for instance stopped notification")
+ }
+}
diff --git a/pkg/provider/nomad/job_groups_test.go b/pkg/provider/nomad/job_groups_test.go
new file mode 100644
index 0000000..f9473a3
--- /dev/null
+++ b/pkg/provider/nomad/job_groups_test.go
@@ -0,0 +1,79 @@
+package nomad_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/neilotoole/slogt"
+ "github.com/sablierapp/sablier/pkg/provider/nomad"
+ "gotest.tools/v3/assert"
+)
+
+func TestNomadProvider_InstanceGroups(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ ctx := context.Background()
+ nc := setupNomad(t)
+
+ p, err := nomad.New(ctx, nc.client, "default", slogt.New(t))
+ assert.NilError(t, err)
+
+ // Create jobs with different groups
+ job1, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ JobID: "groups-test-1",
+ Count: 0,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ "sablier.group": "group1",
+ },
+ })
+ assert.NilError(t, err)
+
+ job2, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ JobID: "groups-test-2",
+ Count: 0,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ "sablier.group": "group1",
+ },
+ })
+ assert.NilError(t, err)
+
+ job3, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ JobID: "groups-test-3",
+ Count: 0,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ "sablier.group": "group2",
+ },
+ })
+ assert.NilError(t, err)
+
+ // Job without sablier.enable should not be included
+ _, err = nc.CreateMimicJob(ctx, MimicJobOptions{
+ JobID: "groups-test-4",
+ Count: 0,
+ })
+ assert.NilError(t, err)
+
+ groups, err := p.InstanceGroups(ctx)
+ assert.NilError(t, err)
+
+ assert.Equal(t, 2, len(groups))
+ assert.Equal(t, 2, len(groups["group1"]))
+ assert.Equal(t, 1, len(groups["group2"]))
+
+ // Check that the instance names are correct
+ expectedGroup1 := []string{
+ formatJobName(*job1.ID, *job1.TaskGroups[0].Name),
+ formatJobName(*job2.ID, *job2.TaskGroups[0].Name),
+ }
+ expectedGroup2 := []string{
+ formatJobName(*job3.ID, *job3.TaskGroups[0].Name),
+ }
+
+ assert.DeepEqual(t, expectedGroup1, groups["group1"])
+ assert.DeepEqual(t, expectedGroup2, groups["group2"])
+}
diff --git a/pkg/provider/nomad/job_inspect.go b/pkg/provider/nomad/job_inspect.go
new file mode 100644
index 0000000..9d07e1d
--- /dev/null
+++ b/pkg/provider/nomad/job_inspect.go
@@ -0,0 +1,93 @@
+package nomad
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/hashicorp/nomad/api"
+ "github.com/sablierapp/sablier/pkg/sablier"
+)
+
+// InstanceInspect retrieves the current state of a Nomad job's task group
+func (p *Provider) InstanceInspect(ctx context.Context, name string) (sablier.InstanceInfo, error) {
+ jobID, groupName, err := parseJobName(name)
+ if err != nil {
+ return sablier.InstanceInfo{}, err
+ }
+
+ jobs := p.Client.Jobs()
+ job, _, err := jobs.Info(jobID, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return sablier.InstanceInfo{}, fmt.Errorf("cannot get job info: %w", err)
+ }
+
+ if job == nil {
+ return sablier.InstanceInfo{}, fmt.Errorf("job %s not found", jobID)
+ }
+
+ // Find the task group
+ var targetGroup *api.TaskGroup
+ for _, tg := range job.TaskGroups {
+ if tg.Name != nil && *tg.Name == groupName {
+ targetGroup = tg
+ break
+ }
+ }
+
+ if targetGroup == nil {
+ return sablier.InstanceInfo{}, fmt.Errorf("task group %s not found in job %s", groupName, jobID)
+ }
+
+ desiredCount := int32(0)
+ if targetGroup.Count != nil {
+ desiredCount = int32(*targetGroup.Count)
+ }
+
+ // Get allocations for this job to determine actual running count
+ allocations, _, err := jobs.Allocations(jobID, false, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return sablier.InstanceInfo{}, fmt.Errorf("cannot get job allocations: %w", err)
+ }
+
+ // Count running allocations for this specific task group
+ runningCount := int32(0)
+ for _, alloc := range allocations {
+ if alloc.TaskGroup == groupName && (alloc.ClientStatus == "running" || alloc.ClientStatus == "pending") {
+ runningCount++
+ }
+ }
+
+ instanceName := formatJobName(jobID, groupName)
+
+ // Determine status
+ if desiredCount == 0 {
+ return sablier.NotReadyInstanceState(instanceName, runningCount, p.desiredReplicas), nil
+ }
+
+ // Check if all allocations are running
+ if runningCount == desiredCount && desiredCount > 0 {
+ // Check allocation health for task groups with health checks
+ allHealthy := true
+ for _, alloc := range allocations {
+ if alloc.TaskGroup == groupName && alloc.ClientStatus == "running" {
+ // If DeploymentStatus exists and Health is set, check it
+ if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil {
+ if !*alloc.DeploymentStatus.Healthy {
+ allHealthy = false
+ break
+ }
+ }
+ }
+ }
+
+ if allHealthy {
+ return sablier.ReadyInstanceState(instanceName, desiredCount), nil
+ }
+ }
+
+ return sablier.NotReadyInstanceState(instanceName, runningCount, desiredCount), nil
+}
diff --git a/pkg/provider/nomad/job_inspect_test.go b/pkg/provider/nomad/job_inspect_test.go
new file mode 100644
index 0000000..f4ad916
--- /dev/null
+++ b/pkg/provider/nomad/job_inspect_test.go
@@ -0,0 +1,102 @@
+package nomad_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/neilotoole/slogt"
+ "github.com/sablierapp/sablier/pkg/provider/nomad"
+ "github.com/sablierapp/sablier/pkg/sablier"
+ "gotest.tools/v3/assert"
+)
+
+func TestNomadProvider_InstanceInspect(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ ctx := context.Background()
+
+ type args struct {
+ do func(nc *nomadContainer) (string, error)
+ }
+ tests := []struct {
+ name string
+ args args
+ want sablier.InstanceInfo
+ err error
+ }{
+ {
+ name: "inspect job with running allocation",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ job, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ Count: 1,
+ })
+ if err != nil {
+ return "", err
+ }
+
+ // Wait for allocation to be running
+ err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1)
+ if err != nil {
+ return "", err
+ }
+
+ return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil
+ },
+ },
+ want: sablier.InstanceInfo{
+ CurrentReplicas: 1,
+ DesiredReplicas: 1,
+ Status: sablier.InstanceStatusReady,
+ },
+ err: nil,
+ },
+ {
+ name: "inspect job with 0 allocations",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ job, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ Count: 0,
+ })
+ if err != nil {
+ return "", err
+ }
+
+ return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil
+ },
+ },
+ want: sablier.InstanceInfo{
+ CurrentReplicas: 0,
+ DesiredReplicas: 1,
+ Status: sablier.InstanceStatusNotReady,
+ },
+ err: nil,
+ },
+ }
+
+ nc := setupNomad(t)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+
+ p, err := nomad.New(ctx, nc.client, "default", slogt.New(t))
+ assert.NilError(t, err)
+
+ name, err := tt.args.do(nc)
+ assert.NilError(t, err)
+
+ info, err := p.InstanceInspect(ctx, name)
+ if tt.err != nil {
+ assert.Error(t, err, tt.err.Error())
+ } else {
+ assert.NilError(t, err)
+ assert.Equal(t, name, info.Name)
+ assert.Equal(t, tt.want.CurrentReplicas, info.CurrentReplicas)
+ assert.Equal(t, tt.want.DesiredReplicas, info.DesiredReplicas)
+ assert.Equal(t, tt.want.Status, info.Status)
+ }
+ })
+ }
+}
diff --git a/pkg/provider/nomad/job_list.go b/pkg/provider/nomad/job_list.go
new file mode 100644
index 0000000..3e68978
--- /dev/null
+++ b/pkg/provider/nomad/job_list.go
@@ -0,0 +1,123 @@
+package nomad
+
+import (
+ "context"
+
+ "github.com/hashicorp/nomad/api"
+ "github.com/sablierapp/sablier/pkg/provider"
+ "github.com/sablierapp/sablier/pkg/sablier"
+)
+
+const (
+ enableLabel = "sablier.enable"
+ groupLabel = "sablier.group"
+)
+
+// InstanceGroups returns a map of group names to instance names
+// It scans all jobs in the namespace looking for the sablier.enable and sablier.group labels
+func (p *Provider) InstanceGroups(ctx context.Context) (map[string][]string, error) {
+ groups := make(map[string][]string)
+
+ jobs := p.Client.Jobs()
+ jobList, _, err := jobs.List(&api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ for _, jobStub := range jobList {
+ // Get full job details to access task group metadata
+ job, _, err := jobs.Info(jobStub.ID, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ p.l.WarnContext(ctx, "cannot get job info", "job_id", jobStub.ID, "error", err)
+ continue
+ }
+
+ // Check each task group for sablier labels
+ for _, tg := range job.TaskGroups {
+ if tg.Name == nil {
+ continue
+ }
+
+ // Check meta tags for sablier.enable
+ if tg.Meta == nil {
+ continue
+ }
+
+ enabled, hasEnable := tg.Meta[enableLabel]
+ if !hasEnable || enabled != "true" {
+ continue
+ }
+
+ groupName := "default"
+ if gn, hasGroup := tg.Meta[groupLabel]; hasGroup && gn != "" {
+ groupName = gn
+ }
+
+ instanceName := formatJobName(*job.ID, *tg.Name)
+ groups[groupName] = append(groups[groupName], instanceName)
+ }
+ }
+
+ return groups, nil
+}
+
+// InstanceList returns a list of all instances (task groups) that have Sablier enabled
+func (p *Provider) InstanceList(ctx context.Context, options provider.InstanceListOptions) ([]sablier.InstanceConfiguration, error) {
+ var instances []sablier.InstanceConfiguration
+
+ jobs := p.Client.Jobs()
+ jobList, _, err := jobs.List(&api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ for _, jobStub := range jobList {
+ // Get full job details
+ job, _, err := jobs.Info(jobStub.ID, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ p.l.WarnContext(ctx, "cannot get job info", "job_id", jobStub.ID, "error", err)
+ continue
+ }
+
+ // Check each task group
+ for _, tg := range job.TaskGroups {
+ if tg.Name == nil {
+ continue
+ }
+
+ // If All flag is not set, only return enabled instances
+ if !options.All {
+ if tg.Meta == nil {
+ continue
+ }
+ enabled, hasEnable := tg.Meta[enableLabel]
+ if !hasEnable || enabled != "true" {
+ continue
+ }
+ }
+
+ groupName := "default"
+ if tg.Meta != nil {
+ if gn, hasGroup := tg.Meta[groupLabel]; hasGroup && gn != "" {
+ groupName = gn
+ }
+ }
+
+ instanceName := formatJobName(*job.ID, *tg.Name)
+ instances = append(instances, sablier.InstanceConfiguration{
+ Name: instanceName,
+ Group: groupName,
+ })
+ }
+ }
+
+ return instances, nil
+}
diff --git a/pkg/provider/nomad/job_list_test.go b/pkg/provider/nomad/job_list_test.go
new file mode 100644
index 0000000..0087b95
--- /dev/null
+++ b/pkg/provider/nomad/job_list_test.go
@@ -0,0 +1,82 @@
+package nomad_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/neilotoole/slogt"
+ "github.com/sablierapp/sablier/pkg/provider"
+ "github.com/sablierapp/sablier/pkg/provider/nomad"
+ "gotest.tools/v3/assert"
+)
+
+func TestNomadProvider_InstanceList(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ ctx := context.Background()
+ nc := setupNomad(t)
+
+ p, err := nomad.New(ctx, nc.client, "default", slogt.New(t))
+ assert.NilError(t, err)
+
+ // Create jobs with sablier.enable
+ job1, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ JobID: "list-test-1",
+ Count: 0,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ "sablier.group": "test-group",
+ },
+ })
+ assert.NilError(t, err)
+
+ job2, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ JobID: "list-test-2",
+ Count: 0,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ },
+ })
+ assert.NilError(t, err)
+
+ // Job without sablier.enable
+ _, err = nc.CreateMimicJob(ctx, MimicJobOptions{
+ JobID: "list-test-3",
+ Count: 0,
+ })
+ assert.NilError(t, err)
+
+ // Test with All = false (only enabled instances)
+ instances, err := p.InstanceList(ctx, provider.InstanceListOptions{All: false})
+ assert.NilError(t, err)
+ assert.Equal(t, 2, len(instances))
+
+ // Test with All = true (all instances)
+ instancesAll, err := p.InstanceList(ctx, provider.InstanceListOptions{All: true})
+ assert.NilError(t, err)
+ assert.Assert(t, len(instancesAll) >= 3)
+
+ // Verify instance configuration
+ found := false
+ for _, inst := range instances {
+ if inst.Name == formatJobName(*job1.ID, *job1.TaskGroups[0].Name) {
+ assert.Equal(t, "test-group", inst.Group)
+ found = true
+ break
+ }
+ }
+ assert.Assert(t, found, "Expected to find job1 in instances list")
+
+ // Verify default group
+ foundDefault := false
+ for _, inst := range instances {
+ if inst.Name == formatJobName(*job2.ID, *job2.TaskGroups[0].Name) {
+ assert.Equal(t, "default", inst.Group)
+ foundDefault = true
+ break
+ }
+ }
+ assert.Assert(t, foundDefault, "Expected to find job2 with default group")
+}
diff --git a/pkg/provider/nomad/job_start.go b/pkg/provider/nomad/job_start.go
new file mode 100644
index 0000000..f3621d1
--- /dev/null
+++ b/pkg/provider/nomad/job_start.go
@@ -0,0 +1,78 @@
+package nomad
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/hashicorp/nomad/api"
+)
+
+// InstanceStart scales the Nomad job's task group to the desired replica count
+func (p *Provider) InstanceStart(ctx context.Context, name string) error {
+ p.l.DebugContext(ctx, "starting instance", "name", name)
+
+ jobID, groupName, err := parseJobName(name)
+ if err != nil {
+ return err
+ }
+
+ jobs := p.Client.Jobs()
+ job, _, err := jobs.Info(jobID, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return fmt.Errorf("cannot get job info: %w", err)
+ }
+
+ if job == nil {
+ return fmt.Errorf("job %s not found", jobID)
+ }
+
+ // Find the task group
+ var targetGroup *api.TaskGroup
+ for _, tg := range job.TaskGroups {
+ if tg.Name != nil && *tg.Name == groupName {
+ targetGroup = tg
+ break
+ }
+ }
+
+ if targetGroup == nil {
+ return fmt.Errorf("task group %s not found in job %s", groupName, jobID)
+ }
+
+ // Check if already at desired count
+ currentCount := int32(0)
+ if targetGroup.Count != nil {
+ currentCount = int32(*targetGroup.Count)
+ }
+
+ if currentCount == p.desiredReplicas {
+ p.l.DebugContext(ctx, "instance already at desired replicas",
+ "name", name,
+ "current", currentCount,
+ "desired", p.desiredReplicas,
+ )
+ return nil
+ }
+
+ // Scale up
+ count := int(p.desiredReplicas)
+ targetGroup.Count = &count
+
+ // Submit the job update
+ _, _, err = jobs.Register(job, &api.WriteOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return fmt.Errorf("cannot scale job: %w", err)
+ }
+
+ p.l.InfoContext(ctx, "scaled instance up",
+ "name", name,
+ "from", currentCount,
+ "to", p.desiredReplicas,
+ )
+
+ return nil
+}
diff --git a/pkg/provider/nomad/job_start_test.go b/pkg/provider/nomad/job_start_test.go
new file mode 100644
index 0000000..00ffb20
--- /dev/null
+++ b/pkg/provider/nomad/job_start_test.go
@@ -0,0 +1,104 @@
+package nomad_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/neilotoole/slogt"
+ "github.com/sablierapp/sablier/pkg/provider/nomad"
+ "gotest.tools/v3/assert"
+)
+
+func TestNomadProvider_InstanceStart(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ ctx := context.Background()
+
+ type args struct {
+ do func(nc *nomadContainer) (string, error)
+ }
+ tests := []struct {
+ name string
+ args args
+ err error
+ }{
+ {
+ name: "start job with 0 allocations",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ job, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ Count: 0,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ },
+ })
+ if err != nil {
+ return "", err
+ }
+ return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil
+ },
+ },
+ err: nil,
+ },
+ {
+ name: "start job already at desired count",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ job, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ Count: 1,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ },
+ })
+ if err != nil {
+ return "", err
+ }
+
+ // Wait for allocation to be running
+ err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1)
+ if err != nil {
+ return "", err
+ }
+
+ return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil
+ },
+ },
+ err: nil,
+ },
+ {
+ name: "start non-existent job",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ return "non-existent/taskgroup", nil
+ },
+ },
+ err: fmt.Errorf("job not found"),
+ },
+ }
+
+ nc := setupNomad(t)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ p, err := nomad.New(ctx, nc.client, "default", slogt.New(t))
+ assert.NilError(t, err)
+
+ name, err := tt.args.do(nc)
+ assert.NilError(t, err)
+
+ err = p.InstanceStart(ctx, name)
+ if tt.err != nil {
+ assert.ErrorContains(t, err, "job not found")
+ } else {
+ assert.NilError(t, err)
+
+ // Verify the job was scaled
+ info, err := p.InstanceInspect(ctx, name)
+ assert.NilError(t, err)
+ assert.Equal(t, int32(1), info.DesiredReplicas)
+ }
+ })
+ }
+}
diff --git a/pkg/provider/nomad/job_stop.go b/pkg/provider/nomad/job_stop.go
new file mode 100644
index 0000000..f4bee12
--- /dev/null
+++ b/pkg/provider/nomad/job_stop.go
@@ -0,0 +1,76 @@
+package nomad
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/hashicorp/nomad/api"
+)
+
+// InstanceStop scales the Nomad job's task group to zero
+func (p *Provider) InstanceStop(ctx context.Context, name string) error {
+ p.l.DebugContext(ctx, "stopping instance", "name", name)
+
+ jobID, groupName, err := parseJobName(name)
+ if err != nil {
+ return err
+ }
+
+ jobs := p.Client.Jobs()
+ job, _, err := jobs.Info(jobID, &api.QueryOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return fmt.Errorf("cannot get job info: %w", err)
+ }
+
+ if job == nil {
+ return fmt.Errorf("job %s not found", jobID)
+ }
+
+ // Find the task group
+ var targetGroup *api.TaskGroup
+ for _, tg := range job.TaskGroups {
+ if tg.Name != nil && *tg.Name == groupName {
+ targetGroup = tg
+ break
+ }
+ }
+
+ if targetGroup == nil {
+ return fmt.Errorf("task group %s not found in job %s", groupName, jobID)
+ }
+
+ // Check if already at zero
+ currentCount := int32(0)
+ if targetGroup.Count != nil {
+ currentCount = int32(*targetGroup.Count)
+ }
+
+ if currentCount == 0 {
+ p.l.DebugContext(ctx, "instance already stopped",
+ "name", name,
+ )
+ return nil
+ }
+
+ // Scale to zero
+ count := 0
+ targetGroup.Count = &count
+
+ // Submit the job update
+ _, _, err = jobs.Register(job, &api.WriteOptions{
+ Namespace: p.namespace,
+ })
+ if err != nil {
+ return fmt.Errorf("cannot stop job: %w", err)
+ }
+
+ p.l.InfoContext(ctx, "scaled instance down",
+ "name", name,
+ "from", currentCount,
+ "to", 0,
+ )
+
+ return nil
+}
diff --git a/pkg/provider/nomad/job_stop_test.go b/pkg/provider/nomad/job_stop_test.go
new file mode 100644
index 0000000..d40b905
--- /dev/null
+++ b/pkg/provider/nomad/job_stop_test.go
@@ -0,0 +1,104 @@
+package nomad_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/neilotoole/slogt"
+ "github.com/sablierapp/sablier/pkg/provider/nomad"
+ "gotest.tools/v3/assert"
+)
+
+func TestNomadProvider_InstanceStop(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ ctx := context.Background()
+
+ type args struct {
+ do func(nc *nomadContainer) (string, error)
+ }
+ tests := []struct {
+ name string
+ args args
+ err error
+ }{
+ {
+ name: "stop job with running allocations",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ job, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ Count: 1,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ },
+ })
+ if err != nil {
+ return "", err
+ }
+
+ // Wait for allocation to be running
+ err = WaitForJobAllocations(ctx, nc.client, *job.ID, *job.TaskGroups[0].Name, 1)
+ if err != nil {
+ return "", err
+ }
+
+ return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil
+ },
+ },
+ err: nil,
+ },
+ {
+ name: "stop job already at 0",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ job, err := nc.CreateMimicJob(ctx, MimicJobOptions{
+ Count: 0,
+ Meta: map[string]string{
+ "sablier.enable": "true",
+ },
+ })
+ if err != nil {
+ return "", err
+ }
+ return formatJobName(*job.ID, *job.TaskGroups[0].Name), nil
+ },
+ },
+ err: nil,
+ },
+ {
+ name: "stop non-existent job",
+ args: args{
+ do: func(nc *nomadContainer) (string, error) {
+ return "non-existent/taskgroup", nil
+ },
+ },
+ err: fmt.Errorf("job not found"),
+ },
+ }
+
+ nc := setupNomad(t)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ p, err := nomad.New(ctx, nc.client, "default", slogt.New(t))
+ assert.NilError(t, err)
+
+ name, err := tt.args.do(nc)
+ assert.NilError(t, err)
+
+ err = p.InstanceStop(ctx, name)
+ if tt.err != nil {
+ assert.ErrorContains(t, err, "job not found")
+ } else {
+ assert.NilError(t, err)
+
+ // Verify the job was scaled to 0
+ info, err := p.InstanceInspect(ctx, name)
+ assert.NilError(t, err)
+ assert.Equal(t, int32(0), info.DesiredReplicas)
+ }
+ })
+ }
+}
diff --git a/pkg/provider/nomad/nomad.go b/pkg/provider/nomad/nomad.go
new file mode 100644
index 0000000..43ff54b
--- /dev/null
+++ b/pkg/provider/nomad/nomad.go
@@ -0,0 +1,69 @@
+package nomad
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+
+ "github.com/hashicorp/nomad/api"
+ "github.com/sablierapp/sablier/pkg/sablier"
+)
+
+// Interface guard
+var _ sablier.Provider = (*Provider)(nil)
+
+type Provider struct {
+ Client *api.Client
+ namespace string
+ desiredReplicas int32
+ l *slog.Logger
+}
+
+func New(ctx context.Context, client *api.Client, namespace string, logger *slog.Logger) (*Provider, error) {
+ logger = logger.With(slog.String("provider", "nomad"))
+
+ if namespace == "" {
+ namespace = "default"
+ }
+
+ // Test connection by getting agent self info
+ agent := client.Agent()
+ info, err := agent.Self()
+ if err != nil {
+ return nil, fmt.Errorf("cannot connect to nomad: %v", err)
+ }
+
+ version := "unknown"
+ address := "unknown"
+
+ if info != nil && info.Stats != nil {
+ if nomadStats, ok := info.Stats["nomad"]; ok {
+ if versionStr, exists := nomadStats["version"]; exists {
+ version = versionStr
+ }
+ }
+ }
+
+ if info != nil && info.Config != nil {
+ if addr, ok := info.Config["AdvertiseAddrs"]; ok {
+ if addrMap, ok := addr.(map[string]interface{}); ok {
+ if httpAddr, ok := addrMap["HTTP"].(string); ok {
+ address = httpAddr
+ }
+ }
+ }
+ }
+
+ logger.InfoContext(ctx, "connection established with nomad",
+ slog.String("version", version),
+ slog.String("namespace", namespace),
+ slog.String("address", address),
+ )
+
+ return &Provider{
+ Client: client,
+ namespace: namespace,
+ desiredReplicas: 1,
+ l: logger,
+ }, nil
+}
diff --git a/pkg/provider/nomad/testcontainers_test.go b/pkg/provider/nomad/testcontainers_test.go
new file mode 100644
index 0000000..df9bb4f
--- /dev/null
+++ b/pkg/provider/nomad/testcontainers_test.go
@@ -0,0 +1,194 @@
+package nomad_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/hashicorp/nomad/api"
+ "github.com/testcontainers/testcontainers-go"
+ "github.com/testcontainers/testcontainers-go/wait"
+)
+
+type nomadContainer struct {
+ container testcontainers.Container
+ address string
+ client *api.Client
+ t *testing.T
+}
+
+func setupNomad(t *testing.T) *nomadContainer {
+ if testing.Short() {
+ t.Skip("skipping test in short mode.")
+ }
+
+ ctx := context.Background()
+
+ req := testcontainers.ContainerRequest{
+ Image: "hashicorp/nomad:1.8",
+ ExposedPorts: []string{"4646/tcp"},
+ Cmd: []string{
+ "agent",
+ "-dev",
+ "-bind=0.0.0.0",
+ "-network-interface=eth0",
+ },
+ Privileged: true,
+ WaitingFor: wait.ForLog("Nomad agent started!").WithStartupTimeout(60 * time.Second),
+ }
+
+ container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ if err != nil {
+ t.Fatalf("failed to start nomad container: %s", err)
+ }
+
+ t.Cleanup(func() {
+ if err := container.Terminate(ctx); err != nil {
+ t.Logf("failed to terminate container: %s", err)
+ }
+ })
+
+ host, err := container.Host(ctx)
+ if err != nil {
+ t.Fatalf("failed to get container host: %s", err)
+ }
+
+ port, err := container.MappedPort(ctx, "4646")
+ if err != nil {
+ t.Fatalf("failed to get mapped port: %s", err)
+ }
+
+ address := fmt.Sprintf("http://%s:%s", host, port.Port())
+
+ // Create Nomad client
+ config := api.DefaultConfig()
+ config.Address = address
+ client, err := api.NewClient(config)
+ if err != nil {
+ t.Fatalf("failed to create nomad client: %s", err)
+ }
+
+ // Wait for Nomad to be ready
+ time.Sleep(2 * time.Second)
+
+ return &nomadContainer{
+ container: container,
+ address: address,
+ client: client,
+ t: t,
+ }
+}
+
+type MimicJobOptions struct {
+ JobID string
+ TaskGroupName string
+ Count int
+ Meta map[string]string
+ Cmd []string
+}
+
+func (nc *nomadContainer) CreateMimicJob(ctx context.Context, opts MimicJobOptions) (*api.Job, error) {
+ if opts.JobID == "" {
+ opts.JobID = fmt.Sprintf("mimic-%d", time.Now().UnixNano())
+ }
+ if opts.TaskGroupName == "" {
+ opts.TaskGroupName = "web"
+ }
+ if opts.Cmd == nil {
+ opts.Cmd = []string{"/mimic", "-running", "-running-after=1s"}
+ }
+
+ count := opts.Count
+ job := &api.Job{
+ ID: &opts.JobID,
+ Name: &opts.JobID,
+ Type: stringToPtr("service"),
+ Datacenters: []string{"dc1"},
+ TaskGroups: []*api.TaskGroup{
+ {
+ Name: &opts.TaskGroupName,
+ Count: &count,
+ Meta: opts.Meta,
+ Tasks: []*api.Task{
+ {
+ Name: "mimic",
+ Driver: "docker",
+ Config: map[string]interface{}{
+ "image": "sablierapp/mimic:v0.3.1",
+ "command": opts.Cmd[0],
+ "args": opts.Cmd[1:],
+ },
+ Resources: &api.Resources{
+ CPU: intToPtr(100),
+ MemoryMB: intToPtr(128),
+ },
+ },
+ },
+ RestartPolicy: &api.RestartPolicy{
+ Attempts: intToPtr(0),
+ Mode: stringToPtr("fail"),
+ },
+ },
+ },
+ }
+
+ jobs := nc.client.Jobs()
+ _, _, err := jobs.Register(job, &api.WriteOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("failed to register job: %w", err)
+ }
+
+ nc.t.Logf("Created Nomad job %s with task group %s", opts.JobID, opts.TaskGroupName)
+
+ return job, nil
+}
+
+func stringToPtr(s string) *string {
+ return &s
+}
+
+func intToPtr(i int) *int {
+ return &i
+}
+
+func WaitForJobAllocations(ctx context.Context, client *api.Client, jobID string, taskGroup string, expectedCount int) error {
+ ticker := time.NewTicker(500 * time.Millisecond)
+ defer ticker.Stop()
+
+ timeout := time.After(60 * time.Second)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("context canceled while waiting for job allocations")
+ case <-timeout:
+ return fmt.Errorf("timeout waiting for job allocations")
+ case <-ticker.C:
+ jobs := client.Jobs()
+ allocations, _, err := jobs.Allocations(jobID, false, &api.QueryOptions{})
+ if err != nil {
+ return fmt.Errorf("error getting allocations: %w", err)
+ }
+
+ runningCount := 0
+ for _, alloc := range allocations {
+ if alloc.TaskGroup == taskGroup && alloc.ClientStatus == "running" {
+ runningCount++
+ }
+ }
+
+ if runningCount == expectedCount {
+ return nil
+ }
+ }
+ }
+}
+
+// formatJobName creates the instance name from job ID and task group name
+func formatJobName(jobID string, taskGroupName string) string {
+ return fmt.Sprintf("%s/%s", jobID, taskGroupName)
+}
diff --git a/pkg/provider/nomad/utils.go b/pkg/provider/nomad/utils.go
new file mode 100644
index 0000000..2949424
--- /dev/null
+++ b/pkg/provider/nomad/utils.go
@@ -0,0 +1,31 @@
+package nomad
+
+import (
+ "fmt"
+ "strings"
+)
+
+// parseJobName extracts job ID and task group name from the instance name
+// Expected format: "jobID/taskGroupName" or just "jobID" (uses default group)
+func parseJobName(name string) (string, string, error) {
+ parts := strings.Split(name, "/")
+
+ if len(parts) == 1 {
+ // If only job ID provided, use default group name
+ return parts[0], parts[0], nil
+ }
+
+ if len(parts) == 2 {
+ if parts[0] == "" || parts[1] == "" {
+ return "", "", fmt.Errorf("invalid job name format: %s (expected 'jobID/taskGroupName')", name)
+ }
+ return parts[0], parts[1], nil
+ }
+
+ return "", "", fmt.Errorf("invalid job name format: %s (expected 'jobID/taskGroupName' or 'jobID')", name)
+}
+
+// formatJobName creates the instance name from job ID and task group name
+func formatJobName(jobID string, taskGroupName string) string {
+ return fmt.Sprintf("%s/%s", jobID, taskGroupName)
+}
diff --git a/pkg/sabliercmd/provider.go b/pkg/sabliercmd/provider.go
index 6010a84..c9b89e0 100644
--- a/pkg/sabliercmd/provider.go
+++ b/pkg/sabliercmd/provider.go
@@ -7,10 +7,12 @@ import (
"github.com/containers/podman/v5/pkg/bindings"
"github.com/docker/docker/client"
+ "github.com/hashicorp/nomad/api"
"github.com/sablierapp/sablier/pkg/config"
"github.com/sablierapp/sablier/pkg/provider/docker"
"github.com/sablierapp/sablier/pkg/provider/dockerswarm"
"github.com/sablierapp/sablier/pkg/provider/kubernetes"
+ "github.com/sablierapp/sablier/pkg/provider/nomad"
"github.com/sablierapp/sablier/pkg/provider/podman"
"github.com/sablierapp/sablier/pkg/sablier"
k8s "k8s.io/client-go/kubernetes"
@@ -54,6 +56,39 @@ func setupProvider(ctx context.Context, logger *slog.Logger, config config.Provi
return nil, fmt.Errorf("cannot create podman connection: %w", err)
}
return podman.New(connText, logger)
+ case "nomad":
+ // Create Nomad client configuration
+ nomadConfig := api.DefaultConfig()
+
+ // Set address from config or use default
+ if config.Nomad.Address != "" {
+ nomadConfig.Address = config.Nomad.Address
+ }
+
+ // Set token if provided
+ if config.Nomad.Token != "" {
+ nomadConfig.SecretID = config.Nomad.Token
+ }
+
+ // Set namespace
+ namespace := config.Nomad.Namespace
+ if namespace == "" {
+ namespace = "default"
+ }
+ nomadConfig.Namespace = namespace
+
+ // Set region if provided
+ if config.Nomad.Region != "" {
+ nomadConfig.Region = config.Nomad.Region
+ }
+
+ // Create Nomad client
+ nomadClient, err := api.NewClient(nomadConfig)
+ if err != nil {
+ return nil, fmt.Errorf("cannot create nomad client: %v", err)
+ }
+
+ return nomad.New(ctx, nomadClient, namespace, logger)
}
return nil, fmt.Errorf("unimplemented provider %s", config.Name)
}