Skip to content

Commit 7315a8e

Browse files
[azservicebus] Fixing bug where you could lose messages in ReceiveAndDelete (#24864)
This PR makes it so you can call ReceiveMessages after a Receiver (only in ReceiveAndDelete mode) is closed, giving you access to any cached messages that might have arrived after your final ReceiveMessages() call, but before Close(). Fixes #24078
1 parent a4fafd7 commit 7315a8e

File tree

16 files changed

+427
-69
lines changed

16 files changed

+427
-69
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Release History
22

3+
## 1.9.1 (TBD)
4+
5+
### Bugs Fixed
6+
7+
- Receiver's, in ReceiveModeReceiveAndDelete, now allow ReceiveMessages() calls after Receiver.Close. These calls will only draw from any internally cached messages that
8+
accumulated between the final call to ReceiveMessages() and Close. See an example of how to do this here: [ExampleReceiver_ReceiveMessages_receiveAndDelete](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus#example-Receiver.ReceiveMessages-ReceiveAndDelete)
9+
for an example. (PR#24864)
10+
311
## 1.9.0 (2025-05-06)
412

513
### Features Added

sdk/messaging/azservicebus/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions)
239239

240240
// AcceptSessionForQueue accepts a session from a queue with a specific session ID.
241241
// NOTE: this receiver is initialized immediately, not lazily.
242-
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
242+
// If the operation fails it can return an [*Error] type if the failure is actionable.
243243
func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
244244
id, cleanupOnClose := client.getCleanupForCloseable()
245245
sessionReceiver, err := newSessionReceiver(
@@ -266,7 +266,7 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName strin
266266

267267
// AcceptSessionForSubscription accepts a session from a subscription with a specific session ID.
268268
// NOTE: this receiver is initialized immediately, not lazily.
269-
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
269+
// If the operation fails it can return an [*Error] type if the failure is actionable.
270270
func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
271271
id, cleanupOnClose := client.getCleanupForCloseable()
272272
sessionReceiver, err := newSessionReceiver(

sdk/messaging/azservicebus/error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ const (
3434

3535
// CodeNotFound means the entity you're attempting to connect to doesn't exist.
3636
CodeNotFound = exported.CodeNotFound
37+
38+
// CodeClosed means the link or connection for this sender/receiver has been closed.
39+
CodeClosed Code = "closed"
3740
)
3841

3942
// Error represents a Service Bus specific error.

sdk/messaging/azservicebus/example_receiver_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,54 @@ func ExampleReceiver_ReceiveMessages() {
118118
}
119119
}
120120

121+
func ExampleReceiver_ReceiveMessages_receiveAndDelete() {
122+
// ReceiveMessages respects the passed in context, and will gracefully stop
123+
// receiving when 'ctx' is cancelled.
124+
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
125+
defer cancel()
126+
127+
messages, err = receiver.ReceiveMessages(ctx, 10, nil)
128+
129+
if err != nil {
130+
panic(err)
131+
}
132+
133+
for _, message := range messages {
134+
// The message body is a []byte. For this example we're just assuming that the body
135+
// was a string, converted to bytes but any []byte payload is valid.
136+
var body []byte = message.Body
137+
fmt.Printf("Message received with body: %s\n", string(body))
138+
fmt.Printf("Received and completed the message\n")
139+
}
140+
141+
err := receiver.Close(ctx)
142+
143+
if err != nil {
144+
panic(err)
145+
}
146+
147+
// In ReceiveAndDelete mode, any messages stored in the internal cache are available after Close(). To avoid
148+
// message loss you'll want to loop after closing to ensure the cache is emptied.
149+
// NOTE: you don't need to do this when using PeekLock, which is the default.
150+
for {
151+
messages, err := receiver.ReceiveMessages(context.TODO(), 10, nil)
152+
153+
if sbErr := (*azservicebus.Error)(nil); errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeClosed {
154+
// we've read all cached messages.
155+
break
156+
} else if err != nil {
157+
panic(err)
158+
} else {
159+
// process messages
160+
for _, message := range messages {
161+
var body []byte = message.Body
162+
fmt.Printf("Message received with body: %s\n", string(body))
163+
fmt.Printf("Received and completed the message\n")
164+
}
165+
}
166+
}
167+
}
168+
121169
func ExampleReceiver_ReceiveMessages_amqpMessage() {
122170
// AMQP is the underlying protocol for all interaction with Service Bus.
123171
// You can, if needed, send and receive messages that have a 1:1 correspondence

sdk/messaging/azservicebus/internal/amqpLinks.go

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1717
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
1818
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
19+
"github.com/Azure/go-amqp"
1920
)
2021

2122
type LinksWithID struct {
@@ -114,6 +115,10 @@ type AMQPLinksImpl struct {
114115

115116
ns NamespaceForAMQPLinks
116117

118+
// prefetchedMessagesAfterClose is called after a Receiver is closed. We pass all messages
119+
// we received using the Receiver.Prefetched() function.
120+
prefetchedMessagesAfterClose func(messages []*amqp.Message)
121+
117122
utils.Logger
118123
}
119124

@@ -126,20 +131,25 @@ type NewAMQPLinksArgs struct {
126131
EntityPath string
127132
CreateLinkFunc CreateLinkFunc
128133
GetRecoveryKindFunc func(err error) RecoveryKind
134+
135+
// PrefetchedMessagesAfterClose is called after a Receiver (in ReceiveAndDelete mode) is closed. It gets passed all
136+
// the messages we could read from [amqp.Receiver.Prefetched].
137+
PrefetchedMessagesAfterClose func(messages []*amqp.Message)
129138
}
130139

131140
// NewAMQPLinks creates a session, starts the claim refresher and creates an associated
132141
// management link for a specific entity path.
133142
func NewAMQPLinks(args NewAMQPLinksArgs) AMQPLinks {
134143
l := &AMQPLinksImpl{
135-
entityPath: args.EntityPath,
136-
managementPath: fmt.Sprintf("%s/$management", args.EntityPath),
137-
audience: args.NS.GetEntityAudience(args.EntityPath),
138-
createLink: args.CreateLinkFunc,
139-
closedPermanently: false,
140-
getRecoveryKindFunc: args.GetRecoveryKindFunc,
141-
ns: args.NS,
142-
Logger: utils.NewLogger(),
144+
entityPath: args.EntityPath,
145+
managementPath: fmt.Sprintf("%s/$management", args.EntityPath),
146+
audience: args.NS.GetEntityAudience(args.EntityPath),
147+
createLink: args.CreateLinkFunc,
148+
closedPermanently: false,
149+
getRecoveryKindFunc: args.GetRecoveryKindFunc,
150+
ns: args.NS,
151+
Logger: utils.NewLogger(),
152+
prefetchedMessagesAfterClose: args.PrefetchedMessagesAfterClose,
143153
}
144154

145155
return l
@@ -150,6 +160,9 @@ func (links *AMQPLinksImpl) ManagementPath() string {
150160
return links.managementPath
151161
}
152162

163+
// errClosed is used when you try to use a closed link.
164+
var errClosed = NewErrNonRetriable("link was closed by user")
165+
153166
// recoverLink will recycle all associated links (mgmt, receiver, sender and session)
154167
// and recreate them using the link.linkCreator function.
155168
func (links *AMQPLinksImpl) recoverLink(ctx context.Context, theirLinkRevision LinkID) error {
@@ -161,7 +174,7 @@ func (links *AMQPLinksImpl) recoverLink(ctx context.Context, theirLinkRevision L
161174
links.mu.RUnlock()
162175

163176
if closedPermanently {
164-
return NewErrNonRetriable("link was closed by user")
177+
return errClosed
165178
}
166179

167180
// cheap check before we do the lock
@@ -288,7 +301,7 @@ func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error) {
288301
l.mu.RUnlock()
289302

290303
if closedPermanently {
291-
return nil, NewErrNonRetriable("link was closed by user")
304+
return nil, errClosed
292305
}
293306

294307
if sender != nil || receiver != nil {
@@ -570,6 +583,8 @@ func (links *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent b
570583
}
571584
}
572585

586+
pushPrefetchedMessagesWithoutLocking(links, permanent)
587+
573588
links.Sender, links.Receiver, links.session, links.RPCLink = nil, nil, nil, nil
574589

575590
if wasCancelled {
@@ -582,3 +597,35 @@ func (links *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent b
582597

583598
return nil
584599
}
600+
601+
// pushPrefetchedMessagesWithoutLocking clears the prefetched messages from the link. This function assumes
602+
// the caller has taken care of proper locking and has already closed the Receiver link, if it exists.
603+
func pushPrefetchedMessagesWithoutLocking(links *AMQPLinksImpl, permanent bool) {
604+
if !permanent { // only activate this when the Receiver is shut down. This avoids any possible concurrency issues.
605+
return
606+
}
607+
608+
if links.Receiver == nil || links.prefetchedMessagesAfterClose == nil {
609+
return
610+
}
611+
612+
var prefetched []*amqp.Message
613+
614+
for {
615+
m := links.Receiver.Prefetched()
616+
617+
if m == nil {
618+
break
619+
}
620+
621+
prefetched = append(prefetched, m)
622+
}
623+
624+
if len(prefetched) == 0 {
625+
links.Writef(exported.EventConn, "No messages on receiver after closing.")
626+
return
627+
}
628+
629+
links.Writef(exported.EventConn, "Got %d messages on receiver after closing. These can be received using ReceiveMessages().", len(prefetched))
630+
links.prefetchedMessagesAfterClose(prefetched)
631+
}

sdk/messaging/azservicebus/internal/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
package internal
55

66
// Version is the semantic version number
7-
const Version = "v1.9.0"
7+
const Version = "v1.9.1"

sdk/messaging/azservicebus/internal/errors.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"net"
1212
"net/http"
1313
"reflect"
14-
"strings"
1514

1615
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
1716
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
@@ -69,6 +68,10 @@ func TransformError(err error) error {
6968
return exported.NewError(exported.CodeTimeout, err)
7069
}
7170

71+
if errors.Is(err, errClosed) {
72+
return exported.NewError(exported.CodeClosed, err)
73+
}
74+
7275
// there are a few errors that all boil down to "bad creds or unauthorized"
7376
var amqpErr *amqp.Error
7477

@@ -141,11 +144,6 @@ func IsCancelError(err error) bool {
141144
return false
142145
}
143146

144-
func IsDrainingError(err error) bool {
145-
// TODO: we should be able to identify these errors programatically
146-
return strings.Contains(err.Error(), "link is currently draining")
147-
}
148-
149147
const errorConditionLockLost = amqp.ErrCond("com.microsoft:message-lock-lost")
150148

151149
var amqpConditionsToRecoveryKind = map[amqp.ErrCond]RecoveryKind{
@@ -250,12 +248,6 @@ func GetRecoveryKind(err error) RecoveryKind {
250248
return RecoveryKindConn
251249
}
252250

253-
if IsDrainingError(err) {
254-
// temporary, operation should just be retryable since drain will
255-
// eventually complete.
256-
return RecoveryKindNone
257-
}
258-
259251
var rpcErr RPCError
260252

261253
if errors.As(err, &rpcErr) {

sdk/messaging/azservicebus/internal/errors_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ func Test_ServiceBusError_NoRecoveryNeeded(t *testing.T) {
158158
&amqp.Error{Condition: amqp.ErrCond("com.microsoft:server-busy")},
159159
&amqp.Error{Condition: amqp.ErrCond("com.microsoft:timeout")},
160160
&amqp.Error{Condition: amqp.ErrCond("com.microsoft:operation-cancelled")},
161-
errors.New("link is currently draining"), // not yet exposed from go-amqp
162161
// simple timeouts from the mgmt link
163162
RPCError{Resp: &amqpwrap.RPCResponse{Code: 408}},
164163
RPCError{Resp: &amqpwrap.RPCResponse{Code: 503}},
@@ -229,6 +228,8 @@ func Test_ServiceBusError_Fatal(t *testing.T) {
229228
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusNotFound}}))
230229
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: RPCResponseCodeLockLost}}))
231230
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusUnauthorized}}))
231+
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(errClosed))
232+
232233
}
233234

234235
func Test_IsLockLostError(t *testing.T) {

sdk/messaging/azservicebus/internal/exported/error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ const (
3232

3333
// CodeNotFound means the entity you're attempting to connect to doesn't exist.
3434
CodeNotFound Code = "notfound"
35+
36+
// CodeClosed means the link or connection for this sender/receiver has been closed.
37+
CodeClosed Code = "closed"
3538
)
3639

3740
// Error represents a Service Bus specific error.

sdk/messaging/azservicebus/internal/exported/receive_mode.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,17 @@ package exported
1010
type ReceiveMode int
1111

1212
const (
13-
// PeekLock will lock messages as they are received and can be settled
14-
// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
15-
// functions.
13+
// PeekLock will lock messages as they are received. These messages can then be settled using the
14+
// Receiver's (Complete|Abandon|DeadLetter|Defer)Message functions.
1615
PeekLock ReceiveMode = 0
16+
1717
// ReceiveAndDelete will delete messages as they are received.
18+
//
19+
// NOTE: In ReceiveAndDelete mode you should continue to call ReceiveMessages(), to receive any cached messages, even after the Receiver
20+
// has been closed. See [receiver_and_delete_example] for an example of how to incorporate this into your code.
21+
//
22+
// This is not needed for Receivers in [PeekLock] mode, as cached messages are automatically released to the service.
23+
//
24+
// [receiver_and_delete_example]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus#example-Receiver.ReceiveMessages-ReceiveAndDelete
1825
ReceiveAndDelete ReceiveMode = 1
1926
)

0 commit comments

Comments
 (0)