Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions fvt_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,70 @@ func Test_PublishEmptyMessage(t *testing.T) {
s.Disconnect(250)
}

// Test_CallbackOverrun - When ordermatters=false the callbacks are called within a go routine. It is possible that
// the connection will drop before the handler completes and this should result in the ACK being dropped silently
// (leads to a panic in v1.3-v1.3.4)
func Test_CallbackOverrun(t *testing.T) {
topic := "/test/callbackoverrun"
handlerCalled := make(chan bool)
handlerChoke := make(chan bool)
handlerError := make(chan error)

pops := NewClientOptions()
pops.AddBroker(FVTTCP)
pops.SetOrderMatters(false) // Not really needed but consistent...
pops.SetClientID("callbackoverrun-pub")
p := NewClient(pops)

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetOrderMatters(false)
sops.SetClientID("callbackoverrun-sub")
var f MessageHandler = func(client Client, msg Message) {
handlerCalled <- true
<-handlerChoke // Wait until connection has been closed
if string(msg.Payload()) != "test message" {
handlerError <- fmt.Errorf("Message payload incorrect")
} else {
handlerError <- nil // Allow main test to proceed (should not raise error in go routine)
}
}

s := NewClient(sops).(*client)
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
}

if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Subscribe(): %v", sToken.Error())
}

if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", pToken.Error())
}

p.Publish(topic, 1, false, "test message")
wait(handlerCalled) // Wait until the handler has been called
s.Disconnect(250) // Ensure the connection is dropped
<-s.commsStopped // Double check...
handlerChoke <- true // Allow handler to proceed

err := <-handlerError
if err != nil {
t.Fatalf(err.Error())
}

time.Sleep(time.Microsecond) // Allow a little time in case the handler returning after connection dropped causes an issue (panic)
fmt.Println("reconnecting")
// Now attempt to reconnect (checking for blockages)
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on Client.Connect(): %v", sToken.Error())
}

s.Disconnect(250)
p.Disconnect(250)
}

// func Test_Cleanstore(t *testing.T) {
// store := "/tmp/fvt/cleanstore"
// topic := "/test/cleanstore"
Expand Down
57 changes: 52 additions & 5 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,58 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
ackChan := make(chan *PacketAndToken)
go func() {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel

stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
}
}
}
}()
}

go func() { // Main go routine handling inbound messages
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackChan, client.persist, message))
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
m.Ack()
wg.Done()
}()
}
sent = true
Expand All @@ -159,9 +194,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
m.Ack()
wg.Done()
}()
}
} else {
Expand All @@ -175,8 +212,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
close(ackChan)
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackChan
return ackOutChan
}