mirror of
https://github.com/crazy-max/diun.git
synced 2025-12-21 13:23:09 +01:00
chore(deps): bump github.com/eclipse/paho.mqtt.golang
Bumps [github.com/eclipse/paho.mqtt.golang](https://github.com/eclipse/paho.mqtt.golang) from 1.5.0 to 1.5.1. - [Release notes](https://github.com/eclipse/paho.mqtt.golang/releases) - [Commits](https://github.com/eclipse/paho.mqtt.golang/compare/v1.5.0...v1.5.1) --- updated-dependencies: - dependency-name: github.com/eclipse/paho.mqtt.golang dependency-version: 1.5.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
30
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
30
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
@@ -258,12 +258,15 @@ func (c *client) Connect() Token {
|
||||
return
|
||||
}
|
||||
|
||||
var attemptCount int
|
||||
|
||||
RETRYCONN:
|
||||
var conn net.Conn
|
||||
var rc byte
|
||||
var err error
|
||||
conn, rc, t.sessionPresent, err = c.attemptConnection()
|
||||
conn, rc, t.sessionPresent, err = c.attemptConnection(false, attemptCount)
|
||||
if err != nil {
|
||||
attemptCount++
|
||||
if c.options.ConnectRetry {
|
||||
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
|
||||
time.Sleep(c.options.ConnectRetryInterval)
|
||||
@@ -315,15 +318,17 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
|
||||
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
|
||||
}
|
||||
|
||||
var attemptCount int
|
||||
for {
|
||||
if nil != c.options.OnReconnecting {
|
||||
c.options.OnReconnecting(c, &c.options)
|
||||
}
|
||||
var err error
|
||||
conn, _, _, err = c.attemptConnection()
|
||||
conn, _, _, err = c.attemptConnection(true, attemptCount)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
attemptCount++
|
||||
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
|
||||
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
|
||||
|
||||
@@ -351,7 +356,7 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
|
||||
// byte - Return code (packets.Accepted indicates a successful connection).
|
||||
// bool - SessionPresent flag from the connect ack (only valid if packets.Accepted)
|
||||
// err - Error (err != nil guarantees that conn has been set to active connection).
|
||||
func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
func (c *client) attemptConnection(isReconnect bool, attempt int) (net.Conn, byte, bool, error) {
|
||||
protocolVersion := c.options.ProtocolVersion
|
||||
var (
|
||||
sessionPresent bool
|
||||
@@ -360,6 +365,10 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
rc byte
|
||||
)
|
||||
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationConnecting{isReconnect, attempt})
|
||||
}
|
||||
|
||||
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
|
||||
brokers := c.options.Servers
|
||||
c.optionsMu.Unlock()
|
||||
@@ -372,6 +381,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
|
||||
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
|
||||
}
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationBroker{broker})
|
||||
}
|
||||
connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established
|
||||
dialer := c.options.Dialer
|
||||
if dialer == nil { //
|
||||
@@ -388,6 +400,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
ERROR.Println(CLI, err.Error())
|
||||
WARN.Println(CLI, "failed to connect to broker, trying next")
|
||||
rc = packets.ErrNetworkError
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationBrokerFailed{broker, err})
|
||||
}
|
||||
continue
|
||||
}
|
||||
DEBUG.Println(CLI, "socket connected to broker")
|
||||
@@ -430,6 +445,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
err = fmt.Errorf("%w : %w", packets.ConnErrors[rc], err)
|
||||
}
|
||||
}
|
||||
if err != nil && c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationFailed{err})
|
||||
}
|
||||
return conn, rc, sessionPresent, err
|
||||
}
|
||||
|
||||
@@ -564,6 +582,9 @@ func (c *client) internalConnLost(whyConnLost error) {
|
||||
if c.options.OnConnectionLost != nil {
|
||||
go c.options.OnConnectionLost(c, whyConnLost)
|
||||
}
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
go c.options.OnConnectionNotification(c, ConnectionNotificationLost{whyConnLost})
|
||||
}
|
||||
DEBUG.Println(CLI, "internalConnLost complete")
|
||||
}()
|
||||
}
|
||||
@@ -613,6 +634,9 @@ func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn,
|
||||
if c.options.OnConnect != nil {
|
||||
go c.options.OnConnect(c)
|
||||
}
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
go c.options.OnConnectionNotification(c, ConnectionNotificationConnected{})
|
||||
}
|
||||
|
||||
// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
|
||||
// messages may be published while the client is disconnected (they will block unless in a goroutine). However
|
||||
|
||||
79
vendor/github.com/eclipse/paho.mqtt.golang/connnotf.go
generated
vendored
Normal file
79
vendor/github.com/eclipse/paho.mqtt.golang/connnotf.go
generated
vendored
Normal file
@@ -0,0 +1,79 @@
|
||||
package mqtt
|
||||
|
||||
import "net/url"
|
||||
|
||||
type ConnectionNotificationType int64
|
||||
|
||||
const (
|
||||
ConnectionNotificationTypeConnected ConnectionNotificationType = iota
|
||||
ConnectionNotificationTypeConnecting
|
||||
ConnectionNotificationTypeFailed
|
||||
ConnectionNotificationTypeLost
|
||||
ConnectionNotificationTypeBroker
|
||||
ConnectionNotificationTypeBrokerFailed
|
||||
)
|
||||
|
||||
type ConnectionNotification interface {
|
||||
Type() ConnectionNotificationType
|
||||
}
|
||||
|
||||
// Connected
|
||||
|
||||
type ConnectionNotificationConnected struct {
|
||||
}
|
||||
|
||||
func (n ConnectionNotificationConnected) Type() ConnectionNotificationType {
|
||||
return ConnectionNotificationTypeConnected
|
||||
}
|
||||
|
||||
// Connecting
|
||||
|
||||
type ConnectionNotificationConnecting struct {
|
||||
IsReconnect bool
|
||||
Attempt int
|
||||
}
|
||||
|
||||
func (n ConnectionNotificationConnecting) Type() ConnectionNotificationType {
|
||||
return ConnectionNotificationTypeConnecting
|
||||
}
|
||||
|
||||
// Connection Failed
|
||||
|
||||
type ConnectionNotificationFailed struct {
|
||||
Reason error
|
||||
}
|
||||
|
||||
func (n ConnectionNotificationFailed) Type() ConnectionNotificationType {
|
||||
return ConnectionNotificationTypeFailed
|
||||
}
|
||||
|
||||
// Connection Lost
|
||||
|
||||
type ConnectionNotificationLost struct {
|
||||
Reason error // may be nil
|
||||
}
|
||||
|
||||
func (n ConnectionNotificationLost) Type() ConnectionNotificationType {
|
||||
return ConnectionNotificationTypeLost
|
||||
}
|
||||
|
||||
// Broker Connection
|
||||
|
||||
type ConnectionNotificationBroker struct {
|
||||
Broker *url.URL
|
||||
}
|
||||
|
||||
func (n ConnectionNotificationBroker) Type() ConnectionNotificationType {
|
||||
return ConnectionNotificationTypeBroker
|
||||
}
|
||||
|
||||
// Broker Connection Failed
|
||||
|
||||
type ConnectionNotificationBrokerFailed struct {
|
||||
Broker *url.URL
|
||||
Reason error
|
||||
}
|
||||
|
||||
func (n ConnectionNotificationBrokerFailed) Type() ConnectionNotificationType {
|
||||
return ConnectionNotificationTypeBrokerFailed
|
||||
}
|
||||
13
vendor/github.com/eclipse/paho.mqtt.golang/net.go
generated
vendored
13
vendor/github.com/eclipse/paho.mqtt.golang/net.go
generated
vendored
@@ -444,24 +444,23 @@ func startComms(conn net.Conn, // Network connection (must be active)
|
||||
}
|
||||
|
||||
// ackFunc acknowledges a packet
|
||||
// WARNING the function returned must not be called if the comms routine is shutting down or not running
|
||||
// (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from
|
||||
// matchAndDispatch which will be shutdown before the comms are
|
||||
func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() {
|
||||
// WARNING sendAck may be called at any time (even after the connection is dead). At the time of writing ACK sent after
|
||||
// connection loss will be dropped (this is not ideal)
|
||||
func ackFunc(sendAck func(*PacketAndToken), persist Store, packet *packets.PublishPacket) func() {
|
||||
return func() {
|
||||
switch packet.Qos {
|
||||
case 2:
|
||||
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
|
||||
pr.MessageID = packet.MessageID
|
||||
DEBUG.Println(NET, "putting pubrec msg on obound")
|
||||
oboundP <- &PacketAndToken{p: pr, t: nil}
|
||||
sendAck(&PacketAndToken{p: pr, t: nil})
|
||||
DEBUG.Println(NET, "done putting pubrec msg on obound")
|
||||
case 1:
|
||||
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
|
||||
pa.MessageID = packet.MessageID
|
||||
DEBUG.Println(NET, "putting puback msg on obound")
|
||||
persistOutbound(persist, pa)
|
||||
oboundP <- &PacketAndToken{p: pa, t: nil}
|
||||
persistOutbound(persist, pa) // May fail if store has been closed
|
||||
sendAck(&PacketAndToken{p: pa, t: nil})
|
||||
DEBUG.Println(NET, "done putting puback msg on obound")
|
||||
case 0:
|
||||
// do nothing, since there is no need to send an ack packet back
|
||||
|
||||
11
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
11
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
@@ -50,16 +50,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade
|
||||
conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions)
|
||||
return conn, err
|
||||
case "mqtt", "tcp":
|
||||
allProxy := os.Getenv("all_proxy")
|
||||
if len(allProxy) == 0 {
|
||||
conn, err := dialer.Dial("tcp", uri.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
proxyDialer := proxy.FromEnvironment()
|
||||
|
||||
proxyDialer := proxy.FromEnvironmentUsing(dialer)
|
||||
conn, err := proxyDialer.Dial("tcp", uri.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
164
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
164
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
@@ -62,93 +62,99 @@ type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Con
|
||||
// Does not carry out any MQTT specific handshakes.
|
||||
type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error)
|
||||
|
||||
// ConnectionNotificationHandler is invoked for any type of connection event.
|
||||
type ConnectionNotificationHandler func(Client, ConnectionNotification)
|
||||
|
||||
// ClientOptions contains configurable options for an Client. Note that these should be set using the
|
||||
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
|
||||
// WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy
|
||||
// to create a configuration with difficult to trace issues (e.g. Mosquitto 2.0.12+ will reject connections
|
||||
// with KeepAlive=0 by default).
|
||||
type ClientOptions struct {
|
||||
Servers []*url.URL
|
||||
ClientID string
|
||||
Username string
|
||||
Password string
|
||||
CredentialsProvider CredentialsProvider
|
||||
CleanSession bool
|
||||
Order bool
|
||||
WillEnabled bool
|
||||
WillTopic string
|
||||
WillPayload []byte
|
||||
WillQos byte
|
||||
WillRetained bool
|
||||
ProtocolVersion uint
|
||||
protocolVersionExplicit bool
|
||||
TLSConfig *tls.Config
|
||||
KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0.
|
||||
PingTimeout time.Duration
|
||||
ConnectTimeout time.Duration
|
||||
MaxReconnectInterval time.Duration
|
||||
AutoReconnect bool
|
||||
ConnectRetryInterval time.Duration
|
||||
ConnectRetry bool
|
||||
Store Store
|
||||
DefaultPublishHandler MessageHandler
|
||||
OnConnect OnConnectHandler
|
||||
OnConnectionLost ConnectionLostHandler
|
||||
OnReconnecting ReconnectHandler
|
||||
OnConnectAttempt ConnectionAttemptHandler
|
||||
WriteTimeout time.Duration
|
||||
MessageChannelDepth uint
|
||||
ResumeSubs bool
|
||||
HTTPHeaders http.Header
|
||||
WebsocketOptions *WebsocketOptions
|
||||
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
|
||||
Dialer *net.Dialer
|
||||
CustomOpenConnectionFn OpenConnectionFunc
|
||||
AutoAckDisabled bool
|
||||
Servers []*url.URL
|
||||
ClientID string
|
||||
Username string
|
||||
Password string
|
||||
CredentialsProvider CredentialsProvider
|
||||
CleanSession bool
|
||||
Order bool
|
||||
WillEnabled bool
|
||||
WillTopic string
|
||||
WillPayload []byte
|
||||
WillQos byte
|
||||
WillRetained bool
|
||||
ProtocolVersion uint
|
||||
protocolVersionExplicit bool
|
||||
TLSConfig *tls.Config
|
||||
KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0.
|
||||
PingTimeout time.Duration
|
||||
ConnectTimeout time.Duration
|
||||
MaxReconnectInterval time.Duration
|
||||
AutoReconnect bool
|
||||
ConnectRetryInterval time.Duration
|
||||
ConnectRetry bool
|
||||
Store Store
|
||||
DefaultPublishHandler MessageHandler
|
||||
OnConnect OnConnectHandler
|
||||
OnConnectionLost ConnectionLostHandler
|
||||
OnReconnecting ReconnectHandler
|
||||
OnConnectAttempt ConnectionAttemptHandler
|
||||
OnConnectionNotification ConnectionNotificationHandler
|
||||
WriteTimeout time.Duration
|
||||
MessageChannelDepth uint
|
||||
ResumeSubs bool
|
||||
HTTPHeaders http.Header
|
||||
WebsocketOptions *WebsocketOptions
|
||||
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
|
||||
Dialer *net.Dialer
|
||||
CustomOpenConnectionFn OpenConnectionFunc
|
||||
AutoAckDisabled bool
|
||||
}
|
||||
|
||||
// NewClientOptions will create a new ClientClientOptions type with some
|
||||
// default values.
|
||||
// Port: 1883
|
||||
// CleanSession: True
|
||||
// Order: True (note: it is recommended that this be set to FALSE unless order is important)
|
||||
// KeepAlive: 30 (seconds)
|
||||
// ConnectTimeout: 30 (seconds)
|
||||
// MaxReconnectInterval 10 (minutes)
|
||||
// AutoReconnect: True
|
||||
//
|
||||
// Port: 1883
|
||||
// CleanSession: True
|
||||
// Order: True (note: it is recommended that this be set to FALSE unless order is important)
|
||||
// KeepAlive: 30 (seconds)
|
||||
// ConnectTimeout: 30 (seconds)
|
||||
// MaxReconnectInterval 10 (minutes)
|
||||
// AutoReconnect: True
|
||||
func NewClientOptions() *ClientOptions {
|
||||
o := &ClientOptions{
|
||||
Servers: nil,
|
||||
ClientID: "",
|
||||
Username: "",
|
||||
Password: "",
|
||||
CleanSession: true,
|
||||
Order: true,
|
||||
WillEnabled: false,
|
||||
WillTopic: "",
|
||||
WillPayload: nil,
|
||||
WillQos: 0,
|
||||
WillRetained: false,
|
||||
ProtocolVersion: 0,
|
||||
protocolVersionExplicit: false,
|
||||
KeepAlive: 30,
|
||||
PingTimeout: 10 * time.Second,
|
||||
ConnectTimeout: 30 * time.Second,
|
||||
MaxReconnectInterval: 10 * time.Minute,
|
||||
AutoReconnect: true,
|
||||
ConnectRetryInterval: 30 * time.Second,
|
||||
ConnectRetry: false,
|
||||
Store: nil,
|
||||
OnConnect: nil,
|
||||
OnConnectionLost: DefaultConnectionLostHandler,
|
||||
OnConnectAttempt: nil,
|
||||
WriteTimeout: 0, // 0 represents timeout disabled
|
||||
ResumeSubs: false,
|
||||
HTTPHeaders: make(map[string][]string),
|
||||
WebsocketOptions: &WebsocketOptions{},
|
||||
Dialer: &net.Dialer{Timeout: 30 * time.Second},
|
||||
CustomOpenConnectionFn: nil,
|
||||
AutoAckDisabled: false,
|
||||
Servers: nil,
|
||||
ClientID: "",
|
||||
Username: "",
|
||||
Password: "",
|
||||
CleanSession: true,
|
||||
Order: true,
|
||||
WillEnabled: false,
|
||||
WillTopic: "",
|
||||
WillPayload: nil,
|
||||
WillQos: 0,
|
||||
WillRetained: false,
|
||||
ProtocolVersion: 0,
|
||||
protocolVersionExplicit: false,
|
||||
KeepAlive: 30,
|
||||
PingTimeout: 10 * time.Second,
|
||||
ConnectTimeout: 30 * time.Second,
|
||||
MaxReconnectInterval: 10 * time.Minute,
|
||||
AutoReconnect: true,
|
||||
ConnectRetryInterval: 30 * time.Second,
|
||||
ConnectRetry: false,
|
||||
Store: nil,
|
||||
OnConnect: nil,
|
||||
OnConnectionLost: DefaultConnectionLostHandler,
|
||||
OnConnectAttempt: nil,
|
||||
OnConnectionNotification: nil,
|
||||
WriteTimeout: 0, // 0 represents timeout disabled
|
||||
ResumeSubs: false,
|
||||
HTTPHeaders: make(map[string][]string),
|
||||
WebsocketOptions: &WebsocketOptions{},
|
||||
Dialer: &net.Dialer{Timeout: 30 * time.Second},
|
||||
CustomOpenConnectionFn: nil,
|
||||
AutoAckDisabled: false,
|
||||
}
|
||||
return o
|
||||
}
|
||||
@@ -355,6 +361,13 @@ func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionA
|
||||
return o
|
||||
}
|
||||
|
||||
// SetConnectionNotificationHandler sets the ConnectionNotificationHandler callback to receive all types of connection
|
||||
// events.
|
||||
func (o *ClientOptions) SetConnectionNotificationHandler(onConnectionNotification ConnectionNotificationHandler) *ClientOptions {
|
||||
o.OnConnectionNotification = onConnectionNotification
|
||||
return o
|
||||
}
|
||||
|
||||
// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
|
||||
// timeout error. A duration of 0 never times out. Default never times out
|
||||
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
|
||||
@@ -450,6 +463,7 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon
|
||||
}
|
||||
|
||||
// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler.
|
||||
//
|
||||
// By default it is set to false. Setting it to true will disable the auto-ack globally.
|
||||
func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions {
|
||||
o.AutoAckDisabled = autoAckDisabled
|
||||
|
||||
5
vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
generated
vendored
5
vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
generated
vendored
@@ -330,6 +330,11 @@ func decodeBytes(b io.Reader) ([]byte, error) {
|
||||
}
|
||||
|
||||
func encodeBytes(field []byte) []byte {
|
||||
// Attempting to encode more than 65,535 bytes would lead to an unexpected 16-bit length and extra data written
|
||||
// (which would be parsed as later parts of the message). The safest option is to truncate.
|
||||
if len(field) > 65535 {
|
||||
field = field[0:65535]
|
||||
}
|
||||
fieldLength := make([]byte, 2)
|
||||
binary.BigEndian.PutUint16(fieldLength, uint16(len(field)))
|
||||
return append(fieldLength, field...)
|
||||
|
||||
81
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
81
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
@@ -136,60 +136,41 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
|
||||
// associated callback (or the defaultHandler, if one exists and no other route matched). If
|
||||
// anything is sent down the stop channel the function will end.
|
||||
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
|
||||
var wg sync.WaitGroup
|
||||
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
|
||||
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
|
||||
ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates
|
||||
|
||||
stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
|
||||
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
|
||||
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
|
||||
if order {
|
||||
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
|
||||
} else {
|
||||
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
|
||||
ackInChan = make(chan *PacketAndToken)
|
||||
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
|
||||
for {
|
||||
select {
|
||||
case a := <-ackInChan:
|
||||
ackOutChan <- a
|
||||
case <-stopAckCopy:
|
||||
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
|
||||
for {
|
||||
select {
|
||||
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
|
||||
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
|
||||
case <-goRoutinesDone:
|
||||
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
|
||||
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
// In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the
|
||||
// case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we
|
||||
// have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726)
|
||||
var ackMutex sync.RWMutex
|
||||
sendAckChan := ackChan // This will be set to nil before ackChan is closed
|
||||
sendAck := func(ack *PacketAndToken) {
|
||||
ackMutex.RLock()
|
||||
defer ackMutex.RUnlock()
|
||||
if sendAckChan != nil {
|
||||
sendAckChan <- ack
|
||||
} else {
|
||||
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
|
||||
}
|
||||
}
|
||||
|
||||
go func() { // Main go routine handling inbound messages
|
||||
var handlers []MessageHandler
|
||||
for message := range messages {
|
||||
// DEBUG.Println(ROU, "matchAndDispatch received message")
|
||||
sent := false
|
||||
r.RLock()
|
||||
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
|
||||
var handlers []MessageHandler
|
||||
m := messageFromPublish(message, ackFunc(sendAck, client.persist, message))
|
||||
for e := r.routes.Front(); e != nil; e = e.Next() {
|
||||
if e.Value.(*route).match(message.TopicName) {
|
||||
if order {
|
||||
handlers = append(handlers, e.Value.(*route).callback)
|
||||
} else {
|
||||
hd := e.Value.(*route).callback
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
hd(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
sent = true
|
||||
@@ -200,13 +181,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
if order {
|
||||
handlers = append(handlers, r.defaultHandler)
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
r.defaultHandler(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
} else {
|
||||
@@ -214,26 +193,22 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
}
|
||||
}
|
||||
r.RUnlock()
|
||||
for _, handler := range handlers {
|
||||
handler(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
if order {
|
||||
for _, handler := range handlers {
|
||||
handler(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
}
|
||||
handlers = handlers[:0]
|
||||
}
|
||||
// DEBUG.Println(ROU, "matchAndDispatch handled message")
|
||||
}
|
||||
if order {
|
||||
close(ackOutChan)
|
||||
} else { // Ensure that nothing further will be written to ackOutChan before closing it
|
||||
close(stopAckCopy)
|
||||
<-ackCopyStopped
|
||||
close(ackOutChan)
|
||||
go func() {
|
||||
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
|
||||
close(goRoutinesDone)
|
||||
}()
|
||||
}
|
||||
ackMutex.Lock()
|
||||
sendAckChan = nil
|
||||
ackMutex.Unlock()
|
||||
close(ackChan) // as sendAckChan is now nil nothing further will be sent on this
|
||||
DEBUG.Println(ROU, "matchAndDispatch exiting")
|
||||
}()
|
||||
return ackOutChan
|
||||
return ackChan
|
||||
}
|
||||
|
||||
4
vendor/modules.txt
vendored
4
vendor/modules.txt
vendored
@@ -144,8 +144,8 @@ github.com/dromara/carbon/v2/calendar/hebrew
|
||||
github.com/dromara/carbon/v2/calendar/julian
|
||||
github.com/dromara/carbon/v2/calendar/lunar
|
||||
github.com/dromara/carbon/v2/calendar/persian
|
||||
# github.com/eclipse/paho.mqtt.golang v1.5.0
|
||||
## explicit; go 1.20
|
||||
# github.com/eclipse/paho.mqtt.golang v1.5.1
|
||||
## explicit; go 1.24.0
|
||||
github.com/eclipse/paho.mqtt.golang
|
||||
github.com/eclipse/paho.mqtt.golang/packets
|
||||
# github.com/emicklei/go-restful/v3 v3.11.0
|
||||
|
||||
Reference in New Issue
Block a user