Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions fvt_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,3 +1711,77 @@ func Test_OverLengthTopic(t *testing.T) {
p.Disconnect(250)
s.Disconnect(250)
}

// Test_Ack_After_Disconnect issue #726
// Must not panic if Ack is sent after connection loss
func Test_Ack_After_Disconnect(t *testing.T) {
pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetClientID("Ack_After_Disconnect_tx")
p := NewClient(pops)

msgReceived := make(chan struct{})
disconnectDone := make(chan struct{})
ackCalled := make(chan bool)

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.AutoAckDisabled = true // Manual message Acknowledgment (so we can delay this)
sops.SetClientID("Ack_After_Disconnect_rx")
var f MessageHandler = func(client Client, msg Message) {
// matchAndDispatch waits for handlers to complete, so we return after
// starting a goroutine that will call Ack eventually
close(msgReceived)
go func() {
defer close(ackCalled) // should only ever get one message

select {
case <-disconnectDone:
msg.Ack()
ackCalled <- true
case <-time.After(time.Second):
ackCalled <- false
}
}()
}
sops.SetDefaultPublishHandler(f)
s := NewClient(sops)

sToken := s.Connect()
if sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
}

s.Subscribe("/test/ack-after-disconnect", 2, nil)

pToken := p.Connect()
if pToken.Wait() && pToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", pToken.Error())
}

p.Publish("/test/ack-after-disconnect", 2, false, "Publish qos0")

select {
case <-msgReceived:
case <-time.After(time.Second):
t.Error("Timed out waiting for message to be received")
}

p.Disconnect(0)
s.Disconnect(0)

// Ensure disconnection complete before proceeding
for s.(*client).status.ConnectionStatus() != disconnected {
time.Sleep(10 * time.Millisecond)
}
close(disconnectDone)

select {
case c := <-ackCalled:
if !c {
t.Error("Did not receive message, so no attempt made to send Ack")
}
case <-time.After(time.Second):
t.Error("Timed out waiting for ackCalled")
}
}
13 changes: 6 additions & 7 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,24 +444,23 @@ func startComms(conn net.Conn, // Network connection (must be active)
}

// ackFunc acknowledges a packet
// WARNING the function returned must not be called if the comms routine is shutting down or not running
// (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from
// matchAndDispatch which will be shutdown before the comms are
func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() {
// WARNING sendAck may be called at any time (even after the connection is dead). At the time of writing ACK sent after
// connection loss will be dropped (this is not ideal)
func ackFunc(sendAck func(*PacketAndToken), persist Store, packet *packets.PublishPacket) func() {
return func() {
switch packet.Qos {
case 2:
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
pr.MessageID = packet.MessageID
DEBUG.Println(NET, "putting pubrec msg on obound")
oboundP <- &PacketAndToken{p: pr, t: nil}
sendAck(&PacketAndToken{p: pr, t: nil})
DEBUG.Println(NET, "done putting pubrec msg on obound")
case 1:
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
pa.MessageID = packet.MessageID
DEBUG.Println(NET, "putting puback msg on obound")
persistOutbound(persist, pa)
oboundP <- &PacketAndToken{p: pa, t: nil}
persistOutbound(persist, pa) // May fail if store has been closed
sendAck(&PacketAndToken{p: pa, t: nil})
DEBUG.Println(NET, "done putting puback msg on obound")
case 0:
// do nothing, since there is no need to send an ack packet back
Expand Down
70 changes: 21 additions & 49 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,38 +136,21 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel

stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
}
}
}
}()
ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates

// In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the
// case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we
// have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726)
var ackMutex sync.RWMutex
sendAckChan := ackChan // This will be set to nil before ackChan is closed
sendAck := func(ack *PacketAndToken) {
ackMutex.RLock()
defer ackMutex.RUnlock()
if sendAckChan != nil {
sendAckChan <- ack
} else {
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
}
}

go func() { // Main go routine handling inbound messages
Expand All @@ -176,20 +159,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
m := messageFromPublish(message, ackFunc(sendAck, client.persist, message))
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
sent = true
Expand All @@ -200,13 +181,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
} else {
Expand All @@ -225,18 +204,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
ackMutex.Lock()
sendAckChan = nil
ackMutex.Unlock()
close(ackChan) // as sendAckChan is now nil nothing further will be sent on this
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackOutChan
return ackChan
}