Skip to content

Commit 035c98f

Browse files
committed
[FIXED] Race condition that could cause error on subscribe
If the server times out a client while an application with the same client ID just (re)connected and is about to resume a durable consumer, a race could cause the subscription to fail with an "unknown client ID" error if the replace_durable option was set. Resolves #1239 Signed-off-by: Ivan Kozlovic <[email protected]>
1 parent 82edf4b commit 035c98f

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

server/server.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3245,19 +3245,19 @@ func (s *StanServer) checkClientHealth(clientID string) {
32453245
client.fhb++
32463246
// If we have reached the max number of failures
32473247
if client.fhb > s.opts.ClientHBFailCount {
3248-
s.log.Errorf("[Client:%s] Timed out on heartbeats", clientID)
3249-
// close the client (connection). This locks the
3250-
// client object internally so unlock here.
32513248
client.Unlock()
3252-
// If clustered, thread operations through Raft.
3253-
if s.isClustered {
3254-
if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}, false); err != nil {
3255-
s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v",
3256-
clientID, err)
3249+
s.barrier(func() {
3250+
s.log.Errorf("[Client:%s] Timed out on heartbeats", clientID)
3251+
// If clustered, thread operations through Raft.
3252+
if s.isClustered {
3253+
if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}, false); err != nil {
3254+
s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v",
3255+
clientID, err)
3256+
}
3257+
} else {
3258+
s.closeClient(clientID)
32573259
}
3258-
} else {
3259-
s.closeClient(clientID)
3260-
}
3260+
})
32613261
return
32623262
}
32633263
} else {
@@ -3298,14 +3298,16 @@ func (s *StanServer) closeClient(clientID string) error {
32983298
return ErrUnknownClient
32993299
}
33003300

3301-
// Remove all non-durable subscribers.
3302-
s.removeAllNonDurableSubscribers(client)
3303-
3304-
// Remove from our clientStore.
3301+
// Remove from our clientStore before removing subs.
3302+
// This prevent race when the same client ID is just
3303+
// reconnecting and registering a durable.
33053304
if _, err := s.clients.unregister(clientID); err != nil {
33063305
s.log.Errorf("Error unregistering client %q: %v", clientID, err)
33073306
}
33083307

3308+
// Remove all non-durable subscribers.
3309+
s.removeAllNonDurableSubscribers(client)
3310+
33093311
if s.debug {
33103312
client.RLock()
33113313
hbInbox := client.info.HbInbox

0 commit comments

Comments
 (0)