Merge pull request #1504 from crazy-max/dependabot/go_modules/github.com/eclipse/paho.mqtt.golang-1.5.1

chore(deps): bump github.com/eclipse/paho.mqtt.golang from 1.5.0 to 1.5.1
This commit is contained in:
CrazyMax
2025-11-09 17:23:12 +01:00
committed by GitHub
10 changed files with 240 additions and 153 deletions

2
go.mod
View File

@@ -17,7 +17,7 @@ require (
github.com/docker/go-connections v0.6.0 github.com/docker/go-connections v0.6.0
github.com/docker/go-units v0.5.0 github.com/docker/go-units v0.5.0
github.com/dromara/carbon/v2 v2.6.14 github.com/dromara/carbon/v2 v2.6.14
github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/eclipse/paho.mqtt.golang v1.5.1
github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df
github.com/go-playground/validator/v10 v10.28.0 github.com/go-playground/validator/v10 v10.28.0
github.com/hashicorp/nomad/api v0.0.0-20250812204832-62b195aaa535 // v1.10.4 github.com/hashicorp/nomad/api v0.0.0-20250812204832-62b195aaa535 // v1.10.4

4
go.sum
View File

@@ -100,8 +100,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dromara/carbon/v2 v2.6.14 h1:U4mN6eH/E5GqPI67MSro8aJ5yg1DbenKItKTFwr5Fqg= github.com/dromara/carbon/v2 v2.6.14 h1:U4mN6eH/E5GqPI67MSro8aJ5yg1DbenKItKTFwr5Fqg=
github.com/dromara/carbon/v2 v2.6.14/go.mod h1:NGo3reeV5vhWCYWcSqbJRZm46MEwyfYI5EJRdVFoLJo= github.com/dromara/carbon/v2 v2.6.14/go.mod h1:NGo3reeV5vhWCYWcSqbJRZm46MEwyfYI5EJRdVFoLJo=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=

View File

@@ -258,12 +258,15 @@ func (c *client) Connect() Token {
return return
} }
var attemptCount int
RETRYCONN: RETRYCONN:
var conn net.Conn var conn net.Conn
var rc byte var rc byte
var err error var err error
conn, rc, t.sessionPresent, err = c.attemptConnection() conn, rc, t.sessionPresent, err = c.attemptConnection(false, attemptCount)
if err != nil { if err != nil {
attemptCount++
if c.options.ConnectRetry { if c.options.ConnectRetry {
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error()) DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
time.Sleep(c.options.ConnectRetryInterval) time.Sleep(c.options.ConnectRetryInterval)
@@ -315,15 +318,17 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds") DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
} }
var attemptCount int
for { for {
if nil != c.options.OnReconnecting { if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options) c.options.OnReconnecting(c, &c.options)
} }
var err error var err error
conn, _, _, err = c.attemptConnection() conn, _, _, err = c.attemptConnection(true, attemptCount)
if err == nil { if err == nil {
break break
} }
attemptCount++
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false) sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err) DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
@@ -351,7 +356,7 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
// byte - Return code (packets.Accepted indicates a successful connection). // byte - Return code (packets.Accepted indicates a successful connection).
// bool - SessionPresent flag from the connect ack (only valid if packets.Accepted) // bool - SessionPresent flag from the connect ack (only valid if packets.Accepted)
// err - Error (err != nil guarantees that conn has been set to active connection). // err - Error (err != nil guarantees that conn has been set to active connection).
func (c *client) attemptConnection() (net.Conn, byte, bool, error) { func (c *client) attemptConnection(isReconnect bool, attempt int) (net.Conn, byte, bool, error) {
protocolVersion := c.options.ProtocolVersion protocolVersion := c.options.ProtocolVersion
var ( var (
sessionPresent bool sessionPresent bool
@@ -360,6 +365,10 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
rc byte rc byte
) )
if c.options.OnConnectionNotification != nil {
c.options.OnConnectionNotification(c, ConnectionNotificationConnecting{isReconnect, attempt})
}
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
brokers := c.options.Servers brokers := c.options.Servers
c.optionsMu.Unlock() c.optionsMu.Unlock()
@@ -372,6 +381,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
DEBUG.Println(CLI, "using custom onConnectAttempt handler...") DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
} }
if c.options.OnConnectionNotification != nil {
c.options.OnConnectionNotification(c, ConnectionNotificationBroker{broker})
}
connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established
dialer := c.options.Dialer dialer := c.options.Dialer
if dialer == nil { // if dialer == nil { //
@@ -388,6 +400,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
ERROR.Println(CLI, err.Error()) ERROR.Println(CLI, err.Error())
WARN.Println(CLI, "failed to connect to broker, trying next") WARN.Println(CLI, "failed to connect to broker, trying next")
rc = packets.ErrNetworkError rc = packets.ErrNetworkError
if c.options.OnConnectionNotification != nil {
c.options.OnConnectionNotification(c, ConnectionNotificationBrokerFailed{broker, err})
}
continue continue
} }
DEBUG.Println(CLI, "socket connected to broker") DEBUG.Println(CLI, "socket connected to broker")
@@ -430,6 +445,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
err = fmt.Errorf("%w : %w", packets.ConnErrors[rc], err) err = fmt.Errorf("%w : %w", packets.ConnErrors[rc], err)
} }
} }
if err != nil && c.options.OnConnectionNotification != nil {
c.options.OnConnectionNotification(c, ConnectionNotificationFailed{err})
}
return conn, rc, sessionPresent, err return conn, rc, sessionPresent, err
} }
@@ -564,6 +582,9 @@ func (c *client) internalConnLost(whyConnLost error) {
if c.options.OnConnectionLost != nil { if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, whyConnLost) go c.options.OnConnectionLost(c, whyConnLost)
} }
if c.options.OnConnectionNotification != nil {
go c.options.OnConnectionNotification(c, ConnectionNotificationLost{whyConnLost})
}
DEBUG.Println(CLI, "internalConnLost complete") DEBUG.Println(CLI, "internalConnLost complete")
}() }()
} }
@@ -613,6 +634,9 @@ func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn,
if c.options.OnConnect != nil { if c.options.OnConnect != nil {
go c.options.OnConnect(c) go c.options.OnConnect(c)
} }
if c.options.OnConnectionNotification != nil {
go c.options.OnConnectionNotification(c, ConnectionNotificationConnected{})
}
// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options, // c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
// messages may be published while the client is disconnected (they will block unless in a goroutine). However // messages may be published while the client is disconnected (they will block unless in a goroutine). However

79
vendor/github.com/eclipse/paho.mqtt.golang/connnotf.go generated vendored Normal file
View File

@@ -0,0 +1,79 @@
package mqtt
import "net/url"
type ConnectionNotificationType int64
const (
ConnectionNotificationTypeConnected ConnectionNotificationType = iota
ConnectionNotificationTypeConnecting
ConnectionNotificationTypeFailed
ConnectionNotificationTypeLost
ConnectionNotificationTypeBroker
ConnectionNotificationTypeBrokerFailed
)
type ConnectionNotification interface {
Type() ConnectionNotificationType
}
// Connected
type ConnectionNotificationConnected struct {
}
func (n ConnectionNotificationConnected) Type() ConnectionNotificationType {
return ConnectionNotificationTypeConnected
}
// Connecting
type ConnectionNotificationConnecting struct {
IsReconnect bool
Attempt int
}
func (n ConnectionNotificationConnecting) Type() ConnectionNotificationType {
return ConnectionNotificationTypeConnecting
}
// Connection Failed
type ConnectionNotificationFailed struct {
Reason error
}
func (n ConnectionNotificationFailed) Type() ConnectionNotificationType {
return ConnectionNotificationTypeFailed
}
// Connection Lost
type ConnectionNotificationLost struct {
Reason error // may be nil
}
func (n ConnectionNotificationLost) Type() ConnectionNotificationType {
return ConnectionNotificationTypeLost
}
// Broker Connection
type ConnectionNotificationBroker struct {
Broker *url.URL
}
func (n ConnectionNotificationBroker) Type() ConnectionNotificationType {
return ConnectionNotificationTypeBroker
}
// Broker Connection Failed
type ConnectionNotificationBrokerFailed struct {
Broker *url.URL
Reason error
}
func (n ConnectionNotificationBrokerFailed) Type() ConnectionNotificationType {
return ConnectionNotificationTypeBrokerFailed
}

View File

@@ -444,24 +444,23 @@ func startComms(conn net.Conn, // Network connection (must be active)
} }
// ackFunc acknowledges a packet // ackFunc acknowledges a packet
// WARNING the function returned must not be called if the comms routine is shutting down or not running // WARNING sendAck may be called at any time (even after the connection is dead). At the time of writing ACK sent after
// (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from // connection loss will be dropped (this is not ideal)
// matchAndDispatch which will be shutdown before the comms are func ackFunc(sendAck func(*PacketAndToken), persist Store, packet *packets.PublishPacket) func() {
func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() {
return func() { return func() {
switch packet.Qos { switch packet.Qos {
case 2: case 2:
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket) pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
pr.MessageID = packet.MessageID pr.MessageID = packet.MessageID
DEBUG.Println(NET, "putting pubrec msg on obound") DEBUG.Println(NET, "putting pubrec msg on obound")
oboundP <- &PacketAndToken{p: pr, t: nil} sendAck(&PacketAndToken{p: pr, t: nil})
DEBUG.Println(NET, "done putting pubrec msg on obound") DEBUG.Println(NET, "done putting pubrec msg on obound")
case 1: case 1:
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
pa.MessageID = packet.MessageID pa.MessageID = packet.MessageID
DEBUG.Println(NET, "putting puback msg on obound") DEBUG.Println(NET, "putting puback msg on obound")
persistOutbound(persist, pa) persistOutbound(persist, pa) // May fail if store has been closed
oboundP <- &PacketAndToken{p: pa, t: nil} sendAck(&PacketAndToken{p: pa, t: nil})
DEBUG.Println(NET, "done putting puback msg on obound") DEBUG.Println(NET, "done putting puback msg on obound")
case 0: case 0:
// do nothing, since there is no need to send an ack packet back // do nothing, since there is no need to send an ack packet back

View File

@@ -50,16 +50,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade
conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions) conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions)
return conn, err return conn, err
case "mqtt", "tcp": case "mqtt", "tcp":
allProxy := os.Getenv("all_proxy") proxyDialer := proxy.FromEnvironmentUsing(dialer)
if len(allProxy) == 0 {
conn, err := dialer.Dial("tcp", uri.Host)
if err != nil {
return nil, err
}
return conn, nil
}
proxyDialer := proxy.FromEnvironment()
conn, err := proxyDialer.Dial("tcp", uri.Host) conn, err := proxyDialer.Dial("tcp", uri.Host)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -62,6 +62,9 @@ type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Con
// Does not carry out any MQTT specific handshakes. // Does not carry out any MQTT specific handshakes.
type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error) type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error)
// ConnectionNotificationHandler is invoked for any type of connection event.
type ConnectionNotificationHandler func(Client, ConnectionNotification)
// ClientOptions contains configurable options for an Client. Note that these should be set using the // ClientOptions contains configurable options for an Client. Note that these should be set using the
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
// WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy // WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy
@@ -96,6 +99,7 @@ type ClientOptions struct {
OnConnectionLost ConnectionLostHandler OnConnectionLost ConnectionLostHandler
OnReconnecting ReconnectHandler OnReconnecting ReconnectHandler
OnConnectAttempt ConnectionAttemptHandler OnConnectAttempt ConnectionAttemptHandler
OnConnectionNotification ConnectionNotificationHandler
WriteTimeout time.Duration WriteTimeout time.Duration
MessageChannelDepth uint MessageChannelDepth uint
ResumeSubs bool ResumeSubs bool
@@ -109,6 +113,7 @@ type ClientOptions struct {
// NewClientOptions will create a new ClientClientOptions type with some // NewClientOptions will create a new ClientClientOptions type with some
// default values. // default values.
//
// Port: 1883 // Port: 1883
// CleanSession: True // CleanSession: True
// Order: True (note: it is recommended that this be set to FALSE unless order is important) // Order: True (note: it is recommended that this be set to FALSE unless order is important)
@@ -142,6 +147,7 @@ func NewClientOptions() *ClientOptions {
OnConnect: nil, OnConnect: nil,
OnConnectionLost: DefaultConnectionLostHandler, OnConnectionLost: DefaultConnectionLostHandler,
OnConnectAttempt: nil, OnConnectAttempt: nil,
OnConnectionNotification: nil,
WriteTimeout: 0, // 0 represents timeout disabled WriteTimeout: 0, // 0 represents timeout disabled
ResumeSubs: false, ResumeSubs: false,
HTTPHeaders: make(map[string][]string), HTTPHeaders: make(map[string][]string),
@@ -355,6 +361,13 @@ func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionA
return o return o
} }
// SetConnectionNotificationHandler sets the ConnectionNotificationHandler callback to receive all types of connection
// events.
func (o *ClientOptions) SetConnectionNotificationHandler(onConnectionNotification ConnectionNotificationHandler) *ClientOptions {
o.OnConnectionNotification = onConnectionNotification
return o
}
// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
// timeout error. A duration of 0 never times out. Default never times out // timeout error. A duration of 0 never times out. Default never times out
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions { func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
@@ -450,6 +463,7 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon
} }
// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler. // SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler.
//
// By default it is set to false. Setting it to true will disable the auto-ack globally. // By default it is set to false. Setting it to true will disable the auto-ack globally.
func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions { func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions {
o.AutoAckDisabled = autoAckDisabled o.AutoAckDisabled = autoAckDisabled

View File

@@ -330,6 +330,11 @@ func decodeBytes(b io.Reader) ([]byte, error) {
} }
func encodeBytes(field []byte) []byte { func encodeBytes(field []byte) []byte {
// Attempting to encode more than 65,535 bytes would lead to an unexpected 16-bit length and extra data written
// (which would be parsed as later parts of the message). The safest option is to truncate.
if len(field) > 65535 {
field = field[0:65535]
}
fieldLength := make([]byte, 2) fieldLength := make([]byte, 2)
binary.BigEndian.PutUint16(fieldLength, uint16(len(field))) binary.BigEndian.PutUint16(fieldLength, uint16(len(field)))
return append(fieldLength, field...) return append(fieldLength, field...)

View File

@@ -136,60 +136,41 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
// associated callback (or the defaultHandler, if one exists and no other route matched). If // associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end. // anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken { func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
var wg sync.WaitGroup ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan // In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan // case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we
goRoutinesDone := make(chan struct{}) // closed on wg.Done() // have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726)
if order { var ackMutex sync.RWMutex
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done sendAckChan := ackChan // This will be set to nil before ackChan is closed
sendAck := func(ack *PacketAndToken) {
ackMutex.RLock()
defer ackMutex.RUnlock()
if sendAckChan != nil {
sendAckChan <- ack
} else { } else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
} }
} }
}
}
}()
}
go func() { // Main go routine handling inbound messages go func() { // Main go routine handling inbound messages
var handlers []MessageHandler
for message := range messages { for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message") // DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false sent := false
r.RLock() r.RLock()
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message)) m := messageFromPublish(message, ackFunc(sendAck, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() { for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) { if e.Value.(*route).match(message.TopicName) {
if order { if order {
handlers = append(handlers, e.Value.(*route).callback) handlers = append(handlers, e.Value.(*route).callback)
} else { } else {
hd := e.Value.(*route).callback hd := e.Value.(*route).callback
wg.Add(1)
go func() { go func() {
hd(client, m) hd(client, m)
if !client.options.AutoAckDisabled { if !client.options.AutoAckDisabled {
m.Ack() m.Ack()
} }
wg.Done()
}() }()
} }
sent = true sent = true
@@ -200,13 +181,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
if order { if order {
handlers = append(handlers, r.defaultHandler) handlers = append(handlers, r.defaultHandler)
} else { } else {
wg.Add(1)
go func() { go func() {
r.defaultHandler(client, m) r.defaultHandler(client, m)
if !client.options.AutoAckDisabled { if !client.options.AutoAckDisabled {
m.Ack() m.Ack()
} }
wg.Done()
}() }()
} }
} else { } else {
@@ -214,26 +193,22 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
} }
} }
r.RUnlock() r.RUnlock()
if order {
for _, handler := range handlers { for _, handler := range handlers {
handler(client, m) handler(client, m)
if !client.options.AutoAckDisabled { if !client.options.AutoAckDisabled {
m.Ack() m.Ack()
} }
} }
handlers = handlers[:0]
}
// DEBUG.Println(ROU, "matchAndDispatch handled message") // DEBUG.Println(ROU, "matchAndDispatch handled message")
} }
if order { ackMutex.Lock()
close(ackOutChan) sendAckChan = nil
} else { // Ensure that nothing further will be written to ackOutChan before closing it ackMutex.Unlock()
close(stopAckCopy) close(ackChan) // as sendAckChan is now nil nothing further will be sent on this
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting") DEBUG.Println(ROU, "matchAndDispatch exiting")
}() }()
return ackOutChan return ackChan
} }

4
vendor/modules.txt vendored
View File

@@ -144,8 +144,8 @@ github.com/dromara/carbon/v2/calendar/hebrew
github.com/dromara/carbon/v2/calendar/julian github.com/dromara/carbon/v2/calendar/julian
github.com/dromara/carbon/v2/calendar/lunar github.com/dromara/carbon/v2/calendar/lunar
github.com/dromara/carbon/v2/calendar/persian github.com/dromara/carbon/v2/calendar/persian
# github.com/eclipse/paho.mqtt.golang v1.5.0 # github.com/eclipse/paho.mqtt.golang v1.5.1
## explicit; go 1.20 ## explicit; go 1.24.0
github.com/eclipse/paho.mqtt.golang github.com/eclipse/paho.mqtt.golang
github.com/eclipse/paho.mqtt.golang/packets github.com/eclipse/paho.mqtt.golang/packets
# github.com/emicklei/go-restful/v3 v3.11.0 # github.com/emicklei/go-restful/v3 v3.11.0