From 95c2b60207150e4565bf4f44c245694f0bad9b28 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 9 Nov 2025 16:15:38 +0000 Subject: [PATCH] chore(deps): bump github.com/eclipse/paho.mqtt.golang Bumps [github.com/eclipse/paho.mqtt.golang](https://github.com/eclipse/paho.mqtt.golang) from 1.5.0 to 1.5.1. - [Release notes](https://github.com/eclipse/paho.mqtt.golang/releases) - [Commits](https://github.com/eclipse/paho.mqtt.golang/compare/v1.5.0...v1.5.1) --- updated-dependencies: - dependency-name: github.com/eclipse/paho.mqtt.golang dependency-version: 1.5.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- .../eclipse/paho.mqtt.golang/client.go | 30 +++- .../eclipse/paho.mqtt.golang/connnotf.go | 79 +++++++++ .../eclipse/paho.mqtt.golang/net.go | 13 +- .../eclipse/paho.mqtt.golang/netconn.go | 11 +- .../eclipse/paho.mqtt.golang/options.go | 164 ++++++++++-------- .../paho.mqtt.golang/packets/packets.go | 5 + .../eclipse/paho.mqtt.golang/router.go | 81 +++------ vendor/modules.txt | 4 +- 10 files changed, 240 insertions(+), 153 deletions(-) create mode 100644 vendor/github.com/eclipse/paho.mqtt.golang/connnotf.go diff --git a/go.mod b/go.mod index 85fc1880..2be70b9c 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/docker/go-connections v0.6.0 github.com/docker/go-units v0.5.0 github.com/dromara/carbon/v2 v2.6.14 - github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df github.com/go-playground/validator/v10 v10.28.0 github.com/hashicorp/nomad/api v0.0.0-20250812204832-62b195aaa535 // v1.10.4 diff --git a/go.sum b/go.sum index c574cf7f..fda99f56 100644 --- a/go.sum +++ b/go.sum @@ -100,8 +100,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dromara/carbon/v2 v2.6.14 h1:U4mN6eH/E5GqPI67MSro8aJ5yg1DbenKItKTFwr5Fqg= github.com/dromara/carbon/v2 v2.6.14/go.mod h1:NGo3reeV5vhWCYWcSqbJRZm46MEwyfYI5EJRdVFoLJo= -github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= -github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/client.go b/vendor/github.com/eclipse/paho.mqtt.golang/client.go index b88d932d..0b502f65 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/client.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/client.go @@ -258,12 +258,15 @@ func (c *client) Connect() Token { return } + var attemptCount int + RETRYCONN: var conn net.Conn var rc byte var err error - conn, rc, t.sessionPresent, err = c.attemptConnection() + conn, rc, t.sessionPresent, err = c.attemptConnection(false, attemptCount) if err != nil { + attemptCount++ if c.options.ConnectRetry { DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error()) time.Sleep(c.options.ConnectRetryInterval) @@ -315,15 +318,17 @@ func (c *client) reconnect(connectionUp connCompletedFn) { DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds") } + var attemptCount int for { if nil != c.options.OnReconnecting { c.options.OnReconnecting(c, &c.options) } var err error - conn, _, _, err = c.attemptConnection() + conn, _, _, err = c.attemptConnection(true, attemptCount) if err == nil { break } + attemptCount++ sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false) DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err) @@ -351,7 +356,7 @@ func (c *client) reconnect(connectionUp connCompletedFn) { // byte - Return code (packets.Accepted indicates a successful connection). // bool - SessionPresent flag from the connect ack (only valid if packets.Accepted) // err - Error (err != nil guarantees that conn has been set to active connection). -func (c *client) attemptConnection() (net.Conn, byte, bool, error) { +func (c *client) attemptConnection(isReconnect bool, attempt int) (net.Conn, byte, bool, error) { protocolVersion := c.options.ProtocolVersion var ( sessionPresent bool @@ -360,6 +365,10 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { rc byte ) + if c.options.OnConnectionNotification != nil { + c.options.OnConnectionNotification(c, ConnectionNotificationConnecting{isReconnect, attempt}) + } + c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases brokers := c.options.Servers c.optionsMu.Unlock() @@ -372,6 +381,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { DEBUG.Println(CLI, "using custom onConnectAttempt handler...") tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) } + if c.options.OnConnectionNotification != nil { + c.options.OnConnectionNotification(c, ConnectionNotificationBroker{broker}) + } connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established dialer := c.options.Dialer if dialer == nil { // @@ -388,6 +400,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") rc = packets.ErrNetworkError + if c.options.OnConnectionNotification != nil { + c.options.OnConnectionNotification(c, ConnectionNotificationBrokerFailed{broker, err}) + } continue } DEBUG.Println(CLI, "socket connected to broker") @@ -430,6 +445,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { err = fmt.Errorf("%w : %w", packets.ConnErrors[rc], err) } } + if err != nil && c.options.OnConnectionNotification != nil { + c.options.OnConnectionNotification(c, ConnectionNotificationFailed{err}) + } return conn, rc, sessionPresent, err } @@ -564,6 +582,9 @@ func (c *client) internalConnLost(whyConnLost error) { if c.options.OnConnectionLost != nil { go c.options.OnConnectionLost(c, whyConnLost) } + if c.options.OnConnectionNotification != nil { + go c.options.OnConnectionNotification(c, ConnectionNotificationLost{whyConnLost}) + } DEBUG.Println(CLI, "internalConnLost complete") }() } @@ -613,6 +634,9 @@ func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn, if c.options.OnConnect != nil { go c.options.OnConnect(c) } + if c.options.OnConnectionNotification != nil { + go c.options.OnConnectionNotification(c, ConnectionNotificationConnected{}) + } // c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options, // messages may be published while the client is disconnected (they will block unless in a goroutine). However diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/connnotf.go b/vendor/github.com/eclipse/paho.mqtt.golang/connnotf.go new file mode 100644 index 00000000..3f50fca0 --- /dev/null +++ b/vendor/github.com/eclipse/paho.mqtt.golang/connnotf.go @@ -0,0 +1,79 @@ +package mqtt + +import "net/url" + +type ConnectionNotificationType int64 + +const ( + ConnectionNotificationTypeConnected ConnectionNotificationType = iota + ConnectionNotificationTypeConnecting + ConnectionNotificationTypeFailed + ConnectionNotificationTypeLost + ConnectionNotificationTypeBroker + ConnectionNotificationTypeBrokerFailed +) + +type ConnectionNotification interface { + Type() ConnectionNotificationType +} + +// Connected + +type ConnectionNotificationConnected struct { +} + +func (n ConnectionNotificationConnected) Type() ConnectionNotificationType { + return ConnectionNotificationTypeConnected +} + +// Connecting + +type ConnectionNotificationConnecting struct { + IsReconnect bool + Attempt int +} + +func (n ConnectionNotificationConnecting) Type() ConnectionNotificationType { + return ConnectionNotificationTypeConnecting +} + +// Connection Failed + +type ConnectionNotificationFailed struct { + Reason error +} + +func (n ConnectionNotificationFailed) Type() ConnectionNotificationType { + return ConnectionNotificationTypeFailed +} + +// Connection Lost + +type ConnectionNotificationLost struct { + Reason error // may be nil +} + +func (n ConnectionNotificationLost) Type() ConnectionNotificationType { + return ConnectionNotificationTypeLost +} + +// Broker Connection + +type ConnectionNotificationBroker struct { + Broker *url.URL +} + +func (n ConnectionNotificationBroker) Type() ConnectionNotificationType { + return ConnectionNotificationTypeBroker +} + +// Broker Connection Failed + +type ConnectionNotificationBrokerFailed struct { + Broker *url.URL + Reason error +} + +func (n ConnectionNotificationBrokerFailed) Type() ConnectionNotificationType { + return ConnectionNotificationTypeBrokerFailed +} diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/net.go b/vendor/github.com/eclipse/paho.mqtt.golang/net.go index 10cc7dae..cb3d3741 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/net.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/net.go @@ -444,24 +444,23 @@ func startComms(conn net.Conn, // Network connection (must be active) } // ackFunc acknowledges a packet -// WARNING the function returned must not be called if the comms routine is shutting down or not running -// (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from -// matchAndDispatch which will be shutdown before the comms are -func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() { +// WARNING sendAck may be called at any time (even after the connection is dead). At the time of writing ACK sent after +// connection loss will be dropped (this is not ideal) +func ackFunc(sendAck func(*PacketAndToken), persist Store, packet *packets.PublishPacket) func() { return func() { switch packet.Qos { case 2: pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket) pr.MessageID = packet.MessageID DEBUG.Println(NET, "putting pubrec msg on obound") - oboundP <- &PacketAndToken{p: pr, t: nil} + sendAck(&PacketAndToken{p: pr, t: nil}) DEBUG.Println(NET, "done putting pubrec msg on obound") case 1: pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) pa.MessageID = packet.MessageID DEBUG.Println(NET, "putting puback msg on obound") - persistOutbound(persist, pa) - oboundP <- &PacketAndToken{p: pa, t: nil} + persistOutbound(persist, pa) // May fail if store has been closed + sendAck(&PacketAndToken{p: pa, t: nil}) DEBUG.Println(NET, "done putting puback msg on obound") case 0: // do nothing, since there is no need to send an ack packet back diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go b/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go index f5429e28..e6f64e56 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go @@ -50,16 +50,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions) return conn, err case "mqtt", "tcp": - allProxy := os.Getenv("all_proxy") - if len(allProxy) == 0 { - conn, err := dialer.Dial("tcp", uri.Host) - if err != nil { - return nil, err - } - return conn, nil - } - proxyDialer := proxy.FromEnvironment() - + proxyDialer := proxy.FromEnvironmentUsing(dialer) conn, err := proxyDialer.Dial("tcp", uri.Host) if err != nil { return nil, err diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/options.go b/vendor/github.com/eclipse/paho.mqtt.golang/options.go index 5aaa7d95..8ce1c3ca 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/options.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/options.go @@ -62,93 +62,99 @@ type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Con // Does not carry out any MQTT specific handshakes. type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error) +// ConnectionNotificationHandler is invoked for any type of connection event. +type ConnectionNotificationHandler func(Client, ConnectionNotification) + // ClientOptions contains configurable options for an Client. Note that these should be set using the // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. // WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy // to create a configuration with difficult to trace issues (e.g. Mosquitto 2.0.12+ will reject connections // with KeepAlive=0 by default). type ClientOptions struct { - Servers []*url.URL - ClientID string - Username string - Password string - CredentialsProvider CredentialsProvider - CleanSession bool - Order bool - WillEnabled bool - WillTopic string - WillPayload []byte - WillQos byte - WillRetained bool - ProtocolVersion uint - protocolVersionExplicit bool - TLSConfig *tls.Config - KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0. - PingTimeout time.Duration - ConnectTimeout time.Duration - MaxReconnectInterval time.Duration - AutoReconnect bool - ConnectRetryInterval time.Duration - ConnectRetry bool - Store Store - DefaultPublishHandler MessageHandler - OnConnect OnConnectHandler - OnConnectionLost ConnectionLostHandler - OnReconnecting ReconnectHandler - OnConnectAttempt ConnectionAttemptHandler - WriteTimeout time.Duration - MessageChannelDepth uint - ResumeSubs bool - HTTPHeaders http.Header - WebsocketOptions *WebsocketOptions - MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming - Dialer *net.Dialer - CustomOpenConnectionFn OpenConnectionFunc - AutoAckDisabled bool + Servers []*url.URL + ClientID string + Username string + Password string + CredentialsProvider CredentialsProvider + CleanSession bool + Order bool + WillEnabled bool + WillTopic string + WillPayload []byte + WillQos byte + WillRetained bool + ProtocolVersion uint + protocolVersionExplicit bool + TLSConfig *tls.Config + KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0. + PingTimeout time.Duration + ConnectTimeout time.Duration + MaxReconnectInterval time.Duration + AutoReconnect bool + ConnectRetryInterval time.Duration + ConnectRetry bool + Store Store + DefaultPublishHandler MessageHandler + OnConnect OnConnectHandler + OnConnectionLost ConnectionLostHandler + OnReconnecting ReconnectHandler + OnConnectAttempt ConnectionAttemptHandler + OnConnectionNotification ConnectionNotificationHandler + WriteTimeout time.Duration + MessageChannelDepth uint + ResumeSubs bool + HTTPHeaders http.Header + WebsocketOptions *WebsocketOptions + MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming + Dialer *net.Dialer + CustomOpenConnectionFn OpenConnectionFunc + AutoAckDisabled bool } // NewClientOptions will create a new ClientClientOptions type with some // default values. -// Port: 1883 -// CleanSession: True -// Order: True (note: it is recommended that this be set to FALSE unless order is important) -// KeepAlive: 30 (seconds) -// ConnectTimeout: 30 (seconds) -// MaxReconnectInterval 10 (minutes) -// AutoReconnect: True +// +// Port: 1883 +// CleanSession: True +// Order: True (note: it is recommended that this be set to FALSE unless order is important) +// KeepAlive: 30 (seconds) +// ConnectTimeout: 30 (seconds) +// MaxReconnectInterval 10 (minutes) +// AutoReconnect: True func NewClientOptions() *ClientOptions { o := &ClientOptions{ - Servers: nil, - ClientID: "", - Username: "", - Password: "", - CleanSession: true, - Order: true, - WillEnabled: false, - WillTopic: "", - WillPayload: nil, - WillQos: 0, - WillRetained: false, - ProtocolVersion: 0, - protocolVersionExplicit: false, - KeepAlive: 30, - PingTimeout: 10 * time.Second, - ConnectTimeout: 30 * time.Second, - MaxReconnectInterval: 10 * time.Minute, - AutoReconnect: true, - ConnectRetryInterval: 30 * time.Second, - ConnectRetry: false, - Store: nil, - OnConnect: nil, - OnConnectionLost: DefaultConnectionLostHandler, - OnConnectAttempt: nil, - WriteTimeout: 0, // 0 represents timeout disabled - ResumeSubs: false, - HTTPHeaders: make(map[string][]string), - WebsocketOptions: &WebsocketOptions{}, - Dialer: &net.Dialer{Timeout: 30 * time.Second}, - CustomOpenConnectionFn: nil, - AutoAckDisabled: false, + Servers: nil, + ClientID: "", + Username: "", + Password: "", + CleanSession: true, + Order: true, + WillEnabled: false, + WillTopic: "", + WillPayload: nil, + WillQos: 0, + WillRetained: false, + ProtocolVersion: 0, + protocolVersionExplicit: false, + KeepAlive: 30, + PingTimeout: 10 * time.Second, + ConnectTimeout: 30 * time.Second, + MaxReconnectInterval: 10 * time.Minute, + AutoReconnect: true, + ConnectRetryInterval: 30 * time.Second, + ConnectRetry: false, + Store: nil, + OnConnect: nil, + OnConnectionLost: DefaultConnectionLostHandler, + OnConnectAttempt: nil, + OnConnectionNotification: nil, + WriteTimeout: 0, // 0 represents timeout disabled + ResumeSubs: false, + HTTPHeaders: make(map[string][]string), + WebsocketOptions: &WebsocketOptions{}, + Dialer: &net.Dialer{Timeout: 30 * time.Second}, + CustomOpenConnectionFn: nil, + AutoAckDisabled: false, } return o } @@ -355,6 +361,13 @@ func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionA return o } +// SetConnectionNotificationHandler sets the ConnectionNotificationHandler callback to receive all types of connection +// events. +func (o *ClientOptions) SetConnectionNotificationHandler(onConnectionNotification ConnectionNotificationHandler) *ClientOptions { + o.OnConnectionNotification = onConnectionNotification + return o +} + // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a // timeout error. A duration of 0 never times out. Default never times out func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions { @@ -450,6 +463,7 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon } // SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler. +// // By default it is set to false. Setting it to true will disable the auto-ack globally. func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions { o.AutoAckDisabled = autoAckDisabled diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go index b2d7ed1b..7cc3c6d8 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go @@ -330,6 +330,11 @@ func decodeBytes(b io.Reader) ([]byte, error) { } func encodeBytes(field []byte) []byte { + // Attempting to encode more than 65,535 bytes would lead to an unexpected 16-bit length and extra data written + // (which would be parsed as later parts of the message). The safest option is to truncate. + if len(field) > 65535 { + field = field[0:65535] + } fieldLength := make([]byte, 2) binary.BigEndian.PutUint16(fieldLength, uint16(len(field))) return append(fieldLength, field...) diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/router.go b/vendor/github.com/eclipse/paho.mqtt.golang/router.go index bd05a0c0..5cfc5e6c 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/router.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/router.go @@ -136,60 +136,41 @@ func (r *router) setDefaultHandler(handler MessageHandler) { // associated callback (or the defaultHandler, if one exists and no other route matched). If // anything is sent down the stop channel the function will end. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken { - var wg sync.WaitGroup - ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed - var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel + ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates - stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan - ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan - goRoutinesDone := make(chan struct{}) // closed on wg.Done() - if order { - ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done - } else { - // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done - ackInChan = make(chan *PacketAndToken) - go func() { // go routine to copy from ackInChan to ackOutChan until stopped - for { - select { - case a := <-ackInChan: - ackOutChan <- a - case <-stopAckCopy: - close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan - for { - select { - case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped) - DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") - case <-goRoutinesDone: - close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure) - DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.") - return - } - } - } - } - }() + // In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the + // case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we + // have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726) + var ackMutex sync.RWMutex + sendAckChan := ackChan // This will be set to nil before ackChan is closed + sendAck := func(ack *PacketAndToken) { + ackMutex.RLock() + defer ackMutex.RUnlock() + if sendAckChan != nil { + sendAckChan <- ack + } else { + DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") + } } go func() { // Main go routine handling inbound messages + var handlers []MessageHandler for message := range messages { // DEBUG.Println(ROU, "matchAndDispatch received message") sent := false r.RLock() - m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message)) - var handlers []MessageHandler + m := messageFromPublish(message, ackFunc(sendAck, client.persist, message)) for e := r.routes.Front(); e != nil; e = e.Next() { if e.Value.(*route).match(message.TopicName) { if order { handlers = append(handlers, e.Value.(*route).callback) } else { hd := e.Value.(*route).callback - wg.Add(1) go func() { hd(client, m) if !client.options.AutoAckDisabled { m.Ack() } - wg.Done() }() } sent = true @@ -200,13 +181,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order if order { handlers = append(handlers, r.defaultHandler) } else { - wg.Add(1) go func() { r.defaultHandler(client, m) if !client.options.AutoAckDisabled { m.Ack() } - wg.Done() }() } } else { @@ -214,26 +193,22 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order } } r.RUnlock() - for _, handler := range handlers { - handler(client, m) - if !client.options.AutoAckDisabled { - m.Ack() + if order { + for _, handler := range handlers { + handler(client, m) + if !client.options.AutoAckDisabled { + m.Ack() + } } + handlers = handlers[:0] } // DEBUG.Println(ROU, "matchAndDispatch handled message") } - if order { - close(ackOutChan) - } else { // Ensure that nothing further will be written to ackOutChan before closing it - close(stopAckCopy) - <-ackCopyStopped - close(ackOutChan) - go func() { - wg.Wait() // Note: If this remains running then the user has handlers that are not returning - close(goRoutinesDone) - }() - } + ackMutex.Lock() + sendAckChan = nil + ackMutex.Unlock() + close(ackChan) // as sendAckChan is now nil nothing further will be sent on this DEBUG.Println(ROU, "matchAndDispatch exiting") }() - return ackOutChan + return ackChan } diff --git a/vendor/modules.txt b/vendor/modules.txt index fc23415d..ab1d44d9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -144,8 +144,8 @@ github.com/dromara/carbon/v2/calendar/hebrew github.com/dromara/carbon/v2/calendar/julian github.com/dromara/carbon/v2/calendar/lunar github.com/dromara/carbon/v2/calendar/persian -# github.com/eclipse/paho.mqtt.golang v1.5.0 -## explicit; go 1.20 +# github.com/eclipse/paho.mqtt.golang v1.5.1 +## explicit; go 1.24.0 github.com/eclipse/paho.mqtt.golang github.com/eclipse/paho.mqtt.golang/packets # github.com/emicklei/go-restful/v3 v3.11.0