Skip to content

Commit 6ac167b

Browse files
authored
Merge pull request #1237 from nats-io/fix_panic_on_redelivery
[FIXED] Possible panic on message redelivery
2 parents aacb792 + 31fd271 commit 6ac167b

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

server/server.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3693,8 +3693,13 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo
36933693
sub.Unlock()
36943694
return
36953695
}
3696-
// Sort our messages outstanding from acksPending, grab some state and unlock.
36973696
sub.Lock()
3697+
// Subscriber could have been closed
3698+
if sub.ackTimer == nil {
3699+
sub.Unlock()
3700+
return
3701+
}
3702+
// Sort our messages outstanding from acksPending, grab some state and unlock.
36983703
sortedPendingMsgs := sub.makeSortedPendingMsgs()
36993704
if len(sortedPendingMsgs) == 0 {
37003705
sub.clearAckTimer()

server/server_redelivery_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package server
1616
import (
1717
"encoding/json"
1818
"fmt"
19+
"math/rand"
1920
"net"
2021
"runtime"
2122
"sync"
@@ -1714,3 +1715,35 @@ func TestRedeliveryRaceWithAck(t *testing.T) {
17141715
}
17151716

17161717
}
1718+
1719+
func TestNoPanicOnSubCloseWhileOnRedelivery(t *testing.T) {
1720+
s := runServer(t, clusterName)
1721+
defer s.Shutdown()
1722+
1723+
sc := NewDefaultConnection(t)
1724+
defer sc.Close()
1725+
1726+
if err := sc.Publish("foo", []byte("msg")); err != nil {
1727+
t.Fatalf("Error on publish: %v", err)
1728+
}
1729+
1730+
for i := 0; i < 100; i++ {
1731+
sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {},
1732+
stan.AckWait(ackWaitInMs(5)),
1733+
stan.SetManualAckMode(),
1734+
stan.DeliverAllAvailable())
1735+
if err != nil {
1736+
t.Fatalf("Error on subscribe: %v", err)
1737+
}
1738+
// Artificially pretend that the client had failed hearbeat
1739+
srvSub := s.clients.getSubs(clientName)[0]
1740+
srvSub.Lock()
1741+
srvSub.hasFailedHB = true
1742+
srvSub.Unlock()
1743+
1744+
time.Sleep(time.Duration(rand.Intn(15)) * time.Millisecond)
1745+
sub.Close()
1746+
1747+
waitForNumSubs(t, s, clientName, 0)
1748+
}
1749+
}

0 commit comments

Comments
 (0)