From 4debe3a76a5aa8f27a2616e2d4e3fc53be0dc788 Mon Sep 17 00:00:00 2001 From: Matt Brittan Date: Tue, 16 Sep 2025 15:54:20 +1200 Subject: [PATCH 1/2] Potential panic when using manual ACK Measures were added in 2021 to ensure that matchAndDispatch waited for handlers to complete before exiting. Unfortunately this did not fully account for manual acknowledgments (which could be triggered at any time). This change simplifies Ack handling to avoid the issue (includes a test). Note that this is not a complete solution (ignoring an ACK after the connection has dropped will not always be the right option). Ref issue #726 --- fvt_client_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++ net.go | 13 ++++---- router.go | 70 +++++++++++++------------------------------ 3 files changed, 101 insertions(+), 56 deletions(-) diff --git a/fvt_client_test.go b/fvt_client_test.go index 35c1d693..3145502a 100644 --- a/fvt_client_test.go +++ b/fvt_client_test.go @@ -1711,3 +1711,77 @@ func Test_OverLengthTopic(t *testing.T) { p.Disconnect(250) s.Disconnect(250) } + +// Test_Ack_After_Disconnect issue #726 +// Must not panic if Ack is sent after connection loss +func Test_Ack_After_Disconnect(t *testing.T) { + pops := NewClientOptions() + pops.AddBroker(FVTTCP) + pops.SetClientID("Ack_After_Disconnect_tx") + p := NewClient(pops) + + msgReceived := make(chan struct{}) + disconnectDone := make(chan struct{}) + ackCalled := make(chan bool) + + sops := NewClientOptions() + sops.AddBroker(FVTTCP) + sops.AutoAckDisabled = true // Manual message Acknowledgment (so we can delay this) + sops.SetClientID("Ack_After_Disconnect_rx") + var f MessageHandler = func(client Client, msg Message) { + // matchAndDispatch waits for handlers to complete, so we return after + // starting a goroutine that will call Ack eventually + close(msgReceived) + go func() { + defer close(ackCalled) // should only ever get one message + + select { + case <-disconnectDone: + msg.Ack() + ackCalled <- true + case <-time.After(time.Second): + ackCalled <- false + } + }() + } + sops.SetDefaultPublishHandler(f) + s := NewClient(sops) + + sToken := s.Connect() + if sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on Client.Connect(): %v", sToken.Error()) + } + + s.Subscribe("/test/ack-after-disconnect", 2, nil) + + pToken := p.Connect() + if pToken.Wait() && pToken.Error() != nil { + t.Fatalf("Error on Client.Connect(): %v", pToken.Error()) + } + + p.Publish("/test/ack-after-disconnect", 2, false, "Publish qos0") + + select { + case <-msgReceived: + case <-time.After(time.Second): + t.Error("Timed out waiting for message to be received") + } + + p.Disconnect(0) + s.Disconnect(0) + + // Hack to wait until disconnection complete + for s.(*client).status.status != disconnected { + time.Sleep(10 * time.Millisecond) + } + close(disconnectDone) + + select { + case c := <-ackCalled: + if !c { + t.Error("Did not receive message, so no attempt made to send Ack") + } + case <-time.After(time.Second): + t.Error("Timed out waiting for ackCalled") + } +} diff --git a/net.go b/net.go index 10cc7dae..cb3d3741 100644 --- a/net.go +++ b/net.go @@ -444,24 +444,23 @@ func startComms(conn net.Conn, // Network connection (must be active) } // ackFunc acknowledges a packet -// WARNING the function returned must not be called if the comms routine is shutting down or not running -// (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from -// matchAndDispatch which will be shutdown before the comms are -func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() { +// WARNING sendAck may be called at any time (even after the connection is dead). At the time of writing ACK sent after +// connection loss will be dropped (this is not ideal) +func ackFunc(sendAck func(*PacketAndToken), persist Store, packet *packets.PublishPacket) func() { return func() { switch packet.Qos { case 2: pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket) pr.MessageID = packet.MessageID DEBUG.Println(NET, "putting pubrec msg on obound") - oboundP <- &PacketAndToken{p: pr, t: nil} + sendAck(&PacketAndToken{p: pr, t: nil}) DEBUG.Println(NET, "done putting pubrec msg on obound") case 1: pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) pa.MessageID = packet.MessageID DEBUG.Println(NET, "putting puback msg on obound") - persistOutbound(persist, pa) - oboundP <- &PacketAndToken{p: pa, t: nil} + persistOutbound(persist, pa) // May fail if store has been closed + sendAck(&PacketAndToken{p: pa, t: nil}) DEBUG.Println(NET, "done putting puback msg on obound") case 0: // do nothing, since there is no need to send an ack packet back diff --git a/router.go b/router.go index e7101b5c..5cfc5e6c 100644 --- a/router.go +++ b/router.go @@ -136,38 +136,21 @@ func (r *router) setDefaultHandler(handler MessageHandler) { // associated callback (or the defaultHandler, if one exists and no other route matched). If // anything is sent down the stop channel the function will end. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken { - var wg sync.WaitGroup - ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed - var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel - - stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan - ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan - goRoutinesDone := make(chan struct{}) // closed on wg.Done() - if order { - ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done - } else { - // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done - ackInChan = make(chan *PacketAndToken) - go func() { // go routine to copy from ackInChan to ackOutChan until stopped - for { - select { - case a := <-ackInChan: - ackOutChan <- a - case <-stopAckCopy: - close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan - for { - select { - case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped) - DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") - case <-goRoutinesDone: - close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure) - DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.") - return - } - } - } - } - }() + ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates + + // In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the + // case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we + // have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726) + var ackMutex sync.RWMutex + sendAckChan := ackChan // This will be set to nil before ackChan is closed + sendAck := func(ack *PacketAndToken) { + ackMutex.RLock() + defer ackMutex.RUnlock() + if sendAckChan != nil { + sendAckChan <- ack + } else { + DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") + } } go func() { // Main go routine handling inbound messages @@ -176,20 +159,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order // DEBUG.Println(ROU, "matchAndDispatch received message") sent := false r.RLock() - m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message)) + m := messageFromPublish(message, ackFunc(sendAck, client.persist, message)) for e := r.routes.Front(); e != nil; e = e.Next() { if e.Value.(*route).match(message.TopicName) { if order { handlers = append(handlers, e.Value.(*route).callback) } else { hd := e.Value.(*route).callback - wg.Add(1) go func() { hd(client, m) if !client.options.AutoAckDisabled { m.Ack() } - wg.Done() }() } sent = true @@ -200,13 +181,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order if order { handlers = append(handlers, r.defaultHandler) } else { - wg.Add(1) go func() { r.defaultHandler(client, m) if !client.options.AutoAckDisabled { m.Ack() } - wg.Done() }() } } else { @@ -225,18 +204,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order } // DEBUG.Println(ROU, "matchAndDispatch handled message") } - if order { - close(ackOutChan) - } else { // Ensure that nothing further will be written to ackOutChan before closing it - close(stopAckCopy) - <-ackCopyStopped - close(ackOutChan) - go func() { - wg.Wait() // Note: If this remains running then the user has handlers that are not returning - close(goRoutinesDone) - }() - } + ackMutex.Lock() + sendAckChan = nil + ackMutex.Unlock() + close(ackChan) // as sendAckChan is now nil nothing further will be sent on this DEBUG.Println(ROU, "matchAndDispatch exiting") }() - return ackOutChan + return ackChan } From 433bd22ae6ac92daf00d575def72e425bc21e39f Mon Sep 17 00:00:00 2001 From: Matt Brittan Date: Tue, 16 Sep 2025 16:02:47 +1200 Subject: [PATCH 2/2] address data race in test --- fvt_client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fvt_client_test.go b/fvt_client_test.go index 3145502a..e7d24758 100644 --- a/fvt_client_test.go +++ b/fvt_client_test.go @@ -1770,8 +1770,8 @@ func Test_Ack_After_Disconnect(t *testing.T) { p.Disconnect(0) s.Disconnect(0) - // Hack to wait until disconnection complete - for s.(*client).status.status != disconnected { + // Ensure disconnection complete before proceeding + for s.(*client).status.ConnectionStatus() != disconnected { time.Sleep(10 * time.Millisecond) } close(disconnectDone)