diff --git a/fvt_client_test.go b/fvt_client_test.go index 4e3793c1..ac8dcba5 100644 --- a/fvt_client_test.go +++ b/fvt_client_test.go @@ -926,6 +926,70 @@ func Test_PublishEmptyMessage(t *testing.T) { s.Disconnect(250) } +// Test_CallbackOverrun - When ordermatters=false the callbacks are called within a go routine. It is possible that +// the connection will drop before the handler completes and this should result in the ACK being dropped silently +// (leads to a panic in v1.3-v1.3.4) +func Test_CallbackOverrun(t *testing.T) { + topic := "/test/callbackoverrun" + handlerCalled := make(chan bool) + handlerChoke := make(chan bool) + handlerError := make(chan error) + + pops := NewClientOptions() + pops.AddBroker(FVTTCP) + pops.SetOrderMatters(false) // Not really needed but consistent... + pops.SetClientID("callbackoverrun-pub") + p := NewClient(pops) + + sops := NewClientOptions() + sops.AddBroker(FVTTCP) + sops.SetOrderMatters(false) + sops.SetClientID("callbackoverrun-sub") + var f MessageHandler = func(client Client, msg Message) { + handlerCalled <- true + <-handlerChoke // Wait until connection has been closed + if string(msg.Payload()) != "test message" { + handlerError <- fmt.Errorf("Message payload incorrect") + } else { + handlerError <- nil // Allow main test to proceed (should not raise error in go routine) + } + } + + s := NewClient(sops).(*client) + if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on Client.Connect(): %v", sToken.Error()) + } + + if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on Client.Subscribe(): %v", sToken.Error()) + } + + if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { + t.Fatalf("Error on Client.Connect(): %v", pToken.Error()) + } + + p.Publish(topic, 1, false, "test message") + wait(handlerCalled) // Wait until the handler has been called + s.Disconnect(250) // Ensure the connection is dropped + <-s.commsStopped // Double check... + handlerChoke <- true // Allow handler to proceed + + err := <-handlerError + if err != nil { + t.Fatalf(err.Error()) + } + + time.Sleep(time.Microsecond) // Allow a little time in case the handler returning after connection dropped causes an issue (panic) + fmt.Println("reconnecting") + // Now attempt to reconnect (checking for blockages) + if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil { + t.Fatalf("Error on Client.Connect(): %v", sToken.Error()) + } + + s.Disconnect(250) + p.Disconnect(250) +} + // func Test_Cleanstore(t *testing.T) { // store := "/tmp/fvt/cleanstore" // topic := "/test/cleanstore" diff --git a/router.go b/router.go index 42261ee9..4737d87b 100644 --- a/router.go +++ b/router.go @@ -132,13 +132,46 @@ func (r *router) setDefaultHandler(handler MessageHandler) { // associated callback (or the defaultHandler, if one exists and no other route matched). If // anything is sent down the stop channel the function will end. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken { - ackChan := make(chan *PacketAndToken) - go func() { + var wg sync.WaitGroup + ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed + var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel + + stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan + ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan + goRoutinesDone := make(chan struct{}) // closed on wg.Done() + if order { + ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done + } else { + // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done + ackInChan = make(chan *PacketAndToken) + go func() { // go routine to copy from ackInChan to ackOutChan until stopped + for { + select { + case a := <-ackInChan: + ackOutChan <- a + case <-stopAckCopy: + close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan + for { + select { + case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped) + DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") + case <-goRoutinesDone: + close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure) + DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.") + return + } + } + } + } + }() + } + + go func() { // Main go routine handling inbound messages for message := range messages { // DEBUG.Println(ROU, "matchAndDispatch received message") sent := false r.RLock() - m := messageFromPublish(message, ackFunc(ackChan, client.persist, message)) + m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message)) var handlers []MessageHandler for e := r.routes.Front(); e != nil; e = e.Next() { if e.Value.(*route).match(message.TopicName) { @@ -146,9 +179,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order handlers = append(handlers, e.Value.(*route).callback) } else { hd := e.Value.(*route).callback + wg.Add(1) go func() { hd(client, m) m.Ack() + wg.Done() }() } sent = true @@ -159,9 +194,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order if order { handlers = append(handlers, r.defaultHandler) } else { + wg.Add(1) go func() { r.defaultHandler(client, m) m.Ack() + wg.Done() }() } } else { @@ -175,8 +212,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order } // DEBUG.Println(ROU, "matchAndDispatch handled message") } - close(ackChan) + if order { + close(ackOutChan) + } else { // Ensure that nothing further will be written to ackOutChan before closing it + close(stopAckCopy) + <-ackCopyStopped + close(ackOutChan) + go func() { + wg.Wait() // Note: If this remains running then the user has handlers that are not returning + close(goRoutinesDone) + }() + } DEBUG.Println(ROU, "matchAndDispatch exiting") }() - return ackChan + return ackOutChan }