Add Elasticsearch notification

Elsasticsearch notification: add test cases

Elasticsearch notification: make timeout configurable

Elsasticsearch notification: add @timestamp field to JSON data

Elsasticsearch notification: improve error handling

use context.WithTimeoutCause

Co-authored-by: CrazyMax <1951866+crazy-max@users.noreply.github.com>

better comment the elaticsearch api endpoint

add screenshot to documentation
This commit is contained in:
Robin Moser
2025-07-15 02:23:59 +02:00
parent e99c10917d
commit f245869582
12 changed files with 313 additions and 16 deletions

View File

@@ -94,6 +94,17 @@ func TestLoadFile(t *testing.T) {
Timeout: utl.NewDuration(10 * time.Second),
TemplateBody: model.NotifDefaultTemplateBody,
},
Elasticsearch: &model.NotifElasticsearch{
Scheme: "https",
Host: "localhost",
Port: 9200,
Username: "elastic",
Password: "password",
Client: "diun",
Index: "diun-notifications",
Timeout: utl.NewDuration(10 * time.Second),
InsecureSkipVerify: false,
},
Gotify: &model.NotifGotify{
Endpoint: "http://gotify.foo.com",
Token: "Token123456",

View File

@@ -39,6 +39,16 @@ notif:
- "<@&200>"
renderFields: true
timeout: 10s
elasticsearch:
scheme: https
host: localhost
port: 9200
username: elastic
password: password
client: diun
index: diun-notifications
timeout: 10s
insecureSkipVerify: false
gotify:
endpoint: http://gotify.foo.com
token: Token123456

View File

@@ -28,6 +28,16 @@ notif:
- "<@&200>"
renderFields: true
timeout: 10s
elasticsearch:
scheme: https
host: localhost
port: 9200
username: elastic
password: password
client: diun
index: diun-notifications
timeout: 10s
insecureSkipVerify: false
gotify:
endpoint: http://gotify.foo.com
token: Token123456

View File

@@ -32,22 +32,23 @@ type NotifEntry struct {
// Notif holds data necessary for notification configuration
type Notif struct {
Amqp *NotifAmqp `yaml:"amqp,omitempty" json:"amqp,omitempty"`
Apprise *NotifApprise `yaml:"apprise,omitempty" json:"apprise,omitempty"`
Discord *NotifDiscord `yaml:"discord,omitempty" json:"discord,omitempty"`
Gotify *NotifGotify `yaml:"gotify,omitempty" json:"gotify,omitempty"`
Mail *NotifMail `yaml:"mail,omitempty" json:"mail,omitempty"`
Matrix *NotifMatrix `yaml:"matrix,omitempty" json:"matrix,omitempty"`
Mqtt *NotifMqtt `yaml:"mqtt,omitempty" json:"mqtt,omitempty"`
Ntfy *NotifNtfy `yaml:"ntfy,omitempty" json:"ntfy,omitempty"`
Pushover *NotifPushover `yaml:"pushover,omitempty" json:"pushover,omitempty"`
RocketChat *NotifRocketChat `yaml:"rocketchat,omitempty" json:"rocketchat,omitempty"`
Script *NotifScript `yaml:"script,omitempty" json:"script,omitempty"`
SignalRest *NotifSignalRest `yaml:"signalrest,omitempty" json:"signalrest,omitempty"`
Slack *NotifSlack `yaml:"slack,omitempty" json:"slack,omitempty"`
Teams *NotifTeams `yaml:"teams,omitempty" json:"teams,omitempty"`
Telegram *NotifTelegram `yaml:"telegram,omitempty" json:"telegram,omitempty"`
Webhook *NotifWebhook `yaml:"webhook,omitempty" json:"webhook,omitempty"`
Amqp *NotifAmqp `yaml:"amqp,omitempty" json:"amqp,omitempty"`
Apprise *NotifApprise `yaml:"apprise,omitempty" json:"apprise,omitempty"`
Discord *NotifDiscord `yaml:"discord,omitempty" json:"discord,omitempty"`
Elasticsearch *NotifElasticsearch `yaml:"elasticsearch,omitempty" json:"elasticsearch,omitempty"`
Gotify *NotifGotify `yaml:"gotify,omitempty" json:"gotify,omitempty"`
Mail *NotifMail `yaml:"mail,omitempty" json:"mail,omitempty"`
Matrix *NotifMatrix `yaml:"matrix,omitempty" json:"matrix,omitempty"`
Mqtt *NotifMqtt `yaml:"mqtt,omitempty" json:"mqtt,omitempty"`
Ntfy *NotifNtfy `yaml:"ntfy,omitempty" json:"ntfy,omitempty"`
Pushover *NotifPushover `yaml:"pushover,omitempty" json:"pushover,omitempty"`
RocketChat *NotifRocketChat `yaml:"rocketchat,omitempty" json:"rocketchat,omitempty"`
Script *NotifScript `yaml:"script,omitempty" json:"script,omitempty"`
SignalRest *NotifSignalRest `yaml:"signalrest,omitempty" json:"signalrest,omitempty"`
Slack *NotifSlack `yaml:"slack,omitempty" json:"slack,omitempty"`
Teams *NotifTeams `yaml:"teams,omitempty" json:"teams,omitempty"`
Telegram *NotifTelegram `yaml:"telegram,omitempty" json:"telegram,omitempty"`
Webhook *NotifWebhook `yaml:"webhook,omitempty" json:"webhook,omitempty"`
}
// GetDefaults gets the default values

View File

@@ -0,0 +1,39 @@
package model
import (
"time"
"github.com/crazy-max/diun/v4/pkg/utl"
)
type NotifElasticsearch struct {
Scheme string `yaml:"scheme,omitempty" json:"scheme,omitempty" validate:"required,oneof=http https"`
Host string `yaml:"host,omitempty" json:"host,omitempty" validate:"required"`
Port int `yaml:"port,omitempty" json:"port,omitempty" validate:"required,min=1"`
Username string `yaml:"username,omitempty" json:"username,omitempty" validate:"omitempty"`
UsernameFile string `yaml:"usernameFile,omitempty" json:"usernameFile,omitempty" validate:"omitempty,file"`
Password string `yaml:"password,omitempty" json:"password,omitempty" validate:"omitempty"`
PasswordFile string `yaml:"passwordFile,omitempty" json:"passwordFile,omitempty" validate:"omitempty,file"`
Client string `yaml:"client,omitempty" json:"client,omitempty" validate:"required"`
Index string `yaml:"index,omitempty" json:"index,omitempty" validate:"required"`
Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty" validate:"required"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify,omitempty" json:"insecureSkipVerify,omitempty" validate:"omitempty"`
}
// GetDefaults gets the default values
func (s *NotifElasticsearch) GetDefaults() *NotifElasticsearch {
n := &NotifElasticsearch{}
n.SetDefaults()
return n
}
// SetDefaults sets the default values
func (s *NotifElasticsearch) SetDefaults() {
s.Scheme = "http"
s.Host = "localhost"
s.Port = 9200
s.Client = "diun"
s.Index = "diun-notifications"
s.Timeout = utl.NewDuration(10 * time.Second)
s.InsecureSkipVerify = false
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/crazy-max/diun/v4/internal/notif/amqp"
"github.com/crazy-max/diun/v4/internal/notif/apprise"
"github.com/crazy-max/diun/v4/internal/notif/discord"
"github.com/crazy-max/diun/v4/internal/notif/elasticsearch"
"github.com/crazy-max/diun/v4/internal/notif/gotify"
"github.com/crazy-max/diun/v4/internal/notif/mail"
"github.com/crazy-max/diun/v4/internal/notif/matrix"
@@ -54,6 +55,9 @@ func New(config *model.Notif, meta model.Meta) (*Client, error) {
if config.Discord != nil {
c.notifiers = append(c.notifiers, discord.New(config.Discord, meta))
}
if config.Elasticsearch != nil {
c.notifiers = append(c.notifiers, elasticsearch.New(config.Elasticsearch, meta))
}
if config.Gotify != nil {
c.notifiers = append(c.notifiers, gotify.New(config.Gotify, meta))
}

View File

@@ -0,0 +1,137 @@
package elasticsearch
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/crazy-max/diun/v4/internal/model"
"github.com/crazy-max/diun/v4/internal/msg"
"github.com/crazy-max/diun/v4/internal/notif/notifier"
"github.com/crazy-max/diun/v4/pkg/utl"
"github.com/pkg/errors"
)
// Client represents an active elasticsearch notification object
type Client struct {
*notifier.Notifier
cfg *model.NotifElasticsearch
meta model.Meta
}
// New creates a new elasticsearch notification instance
func New(config *model.NotifElasticsearch, meta model.Meta) notifier.Notifier {
return notifier.Notifier{
Handler: &Client{
cfg: config,
meta: meta,
},
}
}
// Name returns notifier's name
func (c *Client) Name() string {
return "elasticsearch"
}
// Send creates and sends an elasticsearch notification with an entry
func (c *Client) Send(entry model.NotifEntry) error {
username, err := utl.GetSecret(c.cfg.Username, c.cfg.UsernameFile)
if err != nil {
return err
}
password, err := utl.GetSecret(c.cfg.Password, c.cfg.PasswordFile)
if err != nil {
return err
}
// Use the same JSON structure as webhook notifier
message, err := msg.New(msg.Options{
Meta: c.meta,
Entry: entry,
})
if err != nil {
return err
}
body, err := message.RenderJSON()
if err != nil {
return err
}
// Parse the JSON to add the client field
var doc map[string]any
if err := json.Unmarshal(body, &doc); err != nil {
return err
}
// Add the current time
doc["@timestamp"] = time.Now().Format(time.RFC3339Nano)
// Add the client field from the configuration
doc["client"] = c.cfg.Client
// Re-marshal the JSON with the client field
body, err = json.Marshal(doc)
if err != nil {
return err
}
// Build the Elasticsearch indexing URL
// This uses the Index API (POST /{index}/_doc) to create a document with an auto-generated _id:
// https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-create
url := fmt.Sprintf("%s://%s:%d/%s/_doc", c.cfg.Scheme, c.cfg.Host, c.cfg.Port, c.cfg.Index)
cancelCtx, cancel := context.WithCancelCause(context.Background())
timeoutCtx, _ := context.WithTimeoutCause(cancelCtx, *c.cfg.Timeout, errors.WithStack(context.DeadlineExceeded)) //nolint:govet // no need to manually cancel this context as we already rely on parent
defer func() { cancel(errors.WithStack(context.Canceled)) }()
hc := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: c.cfg.InsecureSkipVerify,
},
},
}
req, err := http.NewRequestWithContext(timeoutCtx, "POST", url, bytes.NewBuffer(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", c.meta.UserAgent)
// Add authentication if provided
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
var errBody struct {
Status int `json:"status"`
Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&errBody); err != nil {
return errors.Wrapf(err, "cannot decode JSON error response for HTTP %d %s status",
resp.StatusCode, http.StatusText(resp.StatusCode))
}
return errors.Errorf("%d %s: %s", errBody.Status, errBody.Error.Type, errBody.Error.Reason)
}
return nil
}