Files
sablier/plugins/proxywasm/main.go
Alexis Couvreur 9430cc3eb3 chore(deps): bump to go1.24.0 (#521)
* chore(deps): bump to go1.24.0

* use proxy-wasm/proxy-wasm-go-sdk

* remove tinygo

* update docker image

* add missing env

* use go tool directive for mockgen

* chore: bump Kong/ngx_wasm_module to pre-release 0.6.0

Thanks to https://github.com/Kong/ngx_wasm_module/issues/682

* fix go mod

* set caddy to go1.23
2025-03-01 15:12:54 -05:00

322 lines
8.9 KiB
Go

package main
import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"
jsoniter "github.com/json-iterator/tinygo"
"github.com/proxy-wasm/proxy-wasm-go-sdk/proxywasm"
"github.com/proxy-wasm/proxy-wasm-go-sdk/proxywasm/types"
"golang.org/x/exp/slices"
)
var Version string
func main() {}
func init() {
proxywasm.SetVMContext(&vmContext{})
}
// vmContext implements types.VMContext interface of proxy-wasm-go SDK.
type vmContext struct {
// Embed the default VM context here,
// so that we don't need to reimplement all the methods.
types.DefaultVMContext
}
// Override types.DefaultVMContext.
func (*vmContext) NewPluginContext(contextID uint32) types.PluginContext {
return &pluginContext{}
}
type pluginContext struct {
// Embed the default plugin context here,
// so that we don't need to reimplement all the methods.
types.DefaultPluginContext
configuration pluginConfiguration
}
type pluginConfiguration struct {
cluster string
method string
path string
authority string
timeout uint32
}
// newPluginConfiguration creates a pluginConfiguration with default values
func newPluginConfiguration() pluginConfiguration {
return pluginConfiguration{
cluster: "sablier:10000",
method: "GET",
path: "/",
authority: "sablier.cluster.local",
timeout: 5000, // timeout in milliseconds
}
}
// Override types.DefaultPluginContext.
func (ctx *pluginContext) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
proxywasm.LogInfof("sablier proxywasm plugin version %v loaded", Version)
data, err := proxywasm.GetPluginConfiguration()
if err != nil && !errors.Is(err, types.ErrorStatusNotFound) {
proxywasm.LogCriticalf("error reading plugin configuration: %v", err)
return types.OnPluginStartStatusFailed
}
proxywasm.LogInfof("plugin config: %s", string(data))
config, err := parsePluginConfiguration(data)
if err != nil {
proxywasm.LogCriticalf("error parsing plugin configuration: %v", err)
return types.OnPluginStartStatusFailed
}
ctx.configuration = config
return types.OnPluginStartStatusOK
}
//go:generate go run github.com/json-iterator/tinygo/gen
type DynamicConfiguration struct {
DisplayName string `json:"display_name"`
ShowDetails *bool `json:"show_details"`
Theme string `json:"theme"`
RefreshFrequency string `json:"refresh_frequency"`
}
//go:generate go run github.com/json-iterator/tinygo/gen
type BlockingConfiguration struct {
Timeout string `json:"timeout"`
}
//go:generate go run github.com/json-iterator/tinygo/gen
type Config struct {
// SablierURL in the format of hostname:port. The scheme is excluded
SablierURL string `json:"sablier_url"`
// Cluster is an optional value that allows you to set override the
// first argument to `proxywasm.DispatchHttpCall`.
// In istio for exemple, the expected value would be: "outbound|port||hostname", e.g.: "outbound|10000||sablier"
// In APISIX and Nginx for example, the value would be the same as SablierURL, e.g.: sablier:10000
// Defaults to the same value of `SablierURL`.
Cluster string `json:"cluster"`
Names []string `json:"names"`
Group string `json:"group"`
SessionDuration string `json:"session_duration"`
Dynamic *DynamicConfiguration `json:"dynamic"`
Blocking *BlockingConfiguration `json:"blocking"`
}
func (c Config) GetPath() string {
path := url.URL{}
q := path.Query()
if c.SessionDuration != "" {
dur, err := time.ParseDuration(c.SessionDuration)
if err != nil {
proxywasm.LogWarnf("parsing session duration failed (ignoring value): %v", err)
} else {
q.Add("session_duration", dur.String())
}
}
for _, name := range c.Names {
q.Add("names", name)
}
if c.Group != "" {
q.Add("group", c.Group)
}
path.RawQuery = q.Encode()
if c.Dynamic != nil {
return c.getDynamicQuery(path)
} else if c.Blocking != nil {
return c.getBlockingQuery(path)
}
return "no strategy configured"
}
func (c Config) getDynamicQuery(path url.URL) string {
path.Path = "/api/strategies/dynamic"
q := path.Query()
if c.Dynamic.DisplayName != "" {
q.Add("display_name", c.Dynamic.DisplayName)
}
if c.Dynamic.Theme != "" {
q.Add("theme", c.Dynamic.Theme)
}
if c.Dynamic.RefreshFrequency != "" {
dur, err := time.ParseDuration(c.Dynamic.RefreshFrequency)
if err != nil {
proxywasm.LogWarnf("parsing dynamic refresh frequency failed (ignoring value): %v", err)
} else {
q.Add("refresh_frequency", dur.String())
}
}
if c.Dynamic.ShowDetails != nil {
q.Add("show_details", strconv.FormatBool(*c.Dynamic.ShowDetails))
}
path.RawQuery = q.Encode()
return path.String()
}
func (c Config) getBlockingQuery(path url.URL) string {
path.Path = "/api/strategies/blocking"
q := path.Query()
if c.Blocking.Timeout != "" {
dur, err := time.ParseDuration(c.Blocking.Timeout)
if err != nil {
proxywasm.LogWarnf("parsing blocking timeout duration failed (ignoring value): %v", err)
} else {
q.Add("timeout", dur.String())
}
}
path.RawQuery = q.Encode()
return path.String()
}
func parsePluginConfiguration(data []byte) (pluginConfiguration, error) {
pluginConf := newPluginConfiguration()
if len(data) == 0 {
return pluginConf, fmt.Errorf("the plugin configuration is not a valid: %q", string(data))
}
json := jsoniter.CreateJsonAdapter(Config_json{}, BlockingConfiguration_json{}, DynamicConfiguration_json{})
var c Config
err := json.Unmarshal(data, &c)
if err != nil {
proxywasm.LogErrorf("error parsing configuration: %v", err.Error())
return pluginConf, err
}
if c.Blocking == nil && c.Dynamic == nil {
return pluginConf, fmt.Errorf("you must specify one strategy (dynamic or blocking)")
}
if c.Blocking != nil && c.Dynamic != nil {
return pluginConf, fmt.Errorf("you must specify only one strategy")
}
if c.Blocking != nil && c.Blocking.Timeout != "" {
timeout, err := time.ParseDuration(c.Blocking.Timeout)
if err != nil {
return pluginConf, fmt.Errorf("cannot parse blocking timeout duration: %v", err)
}
pluginConf.timeout = uint32(timeout.Milliseconds())
}
if len(c.Names) == 0 && len(c.Group) == 0 {
return pluginConf, fmt.Errorf("you must specify names or group")
}
if len(c.Names) > 0 && len(c.Group) > 0 {
return pluginConf, fmt.Errorf("you must specify either names or group")
}
if c.SablierURL != "" {
pluginConf.authority = c.SablierURL
// Default to SablierURL
pluginConf.cluster = c.SablierURL
}
if c.Cluster != "" {
pluginConf.cluster = c.Cluster
}
pluginConf.path = c.GetPath()
return pluginConf, nil
}
// Override types.DefaultPluginContext.
func (ctx *pluginContext) NewHttpContext(contextID uint32) types.HttpContext {
headers := [][2]string{
{":method", ctx.configuration.method},
{":path", ctx.configuration.path},
{":authority", ctx.configuration.authority},
{"User-Agent", fmt.Sprintf("sablier-proxywasm-plugin/%s", Version)},
}
return &httpOnDemand{
contextID: contextID,
headers: headers,
cluster: ctx.configuration.cluster,
timeout: ctx.configuration.timeout,
}
}
type httpOnDemand struct {
// Embed the default http context here,
// so that we don't need to reimplement all the methods.
types.DefaultHttpContext
contextID uint32
headers [][2]string
cluster string
timeout uint32
}
// Override types.DefaultHttpContext.
func (ctx *httpOnDemand) OnHttpRequestHeaders(numHeaders int, endOfStream bool) types.Action {
proxywasm.LogInfof("DispatchHttpCall to %v", ctx.cluster)
proxywasm.LogInfof("DispatchHttpCall with headers %v", ctx.headers)
if _, err := proxywasm.DispatchHttpCall(ctx.cluster, ctx.headers, nil, nil,
ctx.timeout, httpCallResponseCallback); err != nil {
proxywasm.LogCriticalf("dipatch httpcall failed: %v", err)
proxywasm.LogDebugf("%s: %v", ctx.cluster, ctx.headers)
return types.ActionContinue
}
proxywasm.LogInfof("http call dispatched to %s", ctx.cluster)
return types.ActionPause
}
func httpCallResponseCallback(numHeaders, bodySize, numTrailers int) {
hs, err := proxywasm.GetHttpCallResponseHeaders()
if err != nil {
proxywasm.LogCriticalf("failed to get response headers: %v", err)
return
}
proxywasm.LogInfof("GetHttpCallResponseHeaders: %v", hs)
headerIndex := slices.IndexFunc(hs, func(h [2]string) bool { return strings.ToLower(h[0]) == "x-sablier-session-status" })
if headerIndex < 0 {
proxywasm.LogCriticalf("failed to find x-sablier-session-status header: %v", hs)
proxywasm.ResumeHttpRequest()
return
}
headerValue := hs[headerIndex][1]
if headerValue != "ready" {
b, err := proxywasm.GetHttpCallResponseBody(0, bodySize)
if err != nil {
proxywasm.LogCriticalf("failed to get response body: %v", err)
proxywasm.ResumeHttpRequest()
return
}
proxywasm.LogInfof("GetHttpCallResponseBody (%v bytes): %v", bodySize, string(b))
if err := proxywasm.SendHttpResponse(200, hs, b, -1); err != nil {
proxywasm.LogErrorf("failed to send local response: %v", err)
proxywasm.ResumeHttpRequest()
}
} else {
proxywasm.ResumeHttpRequest()
}
}