Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Release History

## 1.9.1 (TBD)

### Bugs Fixed

- Receiver's, in ReceiveModeReceiveAndDelete, now allow ReceiveMessages() calls after Receiver.Close. These calls will only draw from any internally cached messages that
accumulated between the final call to ReceiveMessages() and Close. See an example of how to do this here: [ExampleReceiver_ReceiveMessages_receiveAndDelete](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus#example-Receiver.ReceiveMessages-ReceiveAndDelete)
for an example. (PR#24864)

## 1.9.0 (2025-05-06)

### Features Added
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions)

// AcceptSessionForQueue accepts a session from a queue with a specific session ID.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
// If the operation fails it can return an [*Error] type if the failure is actionable.
func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
Expand All @@ -266,7 +266,7 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName strin

// AcceptSessionForSubscription accepts a session from a subscription with a specific session ID.
// NOTE: this receiver is initialized immediately, not lazily.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
// If the operation fails it can return an [*Error] type if the failure is actionable.
func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error) {
id, cleanupOnClose := client.getCleanupForCloseable()
sessionReceiver, err := newSessionReceiver(
Expand Down
3 changes: 3 additions & 0 deletions sdk/messaging/azservicebus/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (

// CodeNotFound means the entity you're attempting to connect to doesn't exist.
CodeNotFound = exported.CodeNotFound

// CodeClosed means the link or connection for this sender/receiver has been closed.
CodeClosed Code = "closed"
)

// Error represents a Service Bus specific error.
Expand Down
48 changes: 48 additions & 0 deletions sdk/messaging/azservicebus/example_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,54 @@ func ExampleReceiver_ReceiveMessages() {
}
}

func ExampleReceiver_ReceiveMessages_receiveAndDelete() {
// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err = receiver.ReceiveMessages(ctx, 10, nil)

if err != nil {
panic(err)
}

for _, message := range messages {
// The message body is a []byte. For this example we're just assuming that the body
// was a string, converted to bytes but any []byte payload is valid.
var body []byte = message.Body
fmt.Printf("Message received with body: %s\n", string(body))
fmt.Printf("Received and completed the message\n")
}

err := receiver.Close(ctx)

if err != nil {
panic(err)
}

// In ReceiveAndDelete mode, any messages stored in the internal cache are available after Close(). To avoid
// message loss you'll want to loop after closing to ensure the cache is emptied.
// NOTE: you don't need to do this when using PeekLock, which is the default.
for {
messages, err := receiver.ReceiveMessages(context.TODO(), 10, nil)

if sbErr := (*azservicebus.Error)(nil); errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeClosed {
// we've read all cached messages.
break
} else if err != nil {
panic(err)
} else {
// process messages
for _, message := range messages {
var body []byte = message.Body
fmt.Printf("Message received with body: %s\n", string(body))
fmt.Printf("Received and completed the message\n")
}
}
}
}

func ExampleReceiver_ReceiveMessages_amqpMessage() {
// AMQP is the underlying protocol for all interaction with Service Bus.
// You can, if needed, send and receive messages that have a 1:1 correspondence
Expand Down
67 changes: 57 additions & 10 deletions sdk/messaging/azservicebus/internal/amqpLinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
"github.com/Azure/go-amqp"
)

type LinksWithID struct {
Expand Down Expand Up @@ -114,6 +115,10 @@ type AMQPLinksImpl struct {

ns NamespaceForAMQPLinks

// prefetchedMessagesAfterClose is called after a Receiver is closed. We pass all messages
// we received using the Receiver.Prefetched() function.
prefetchedMessagesAfterClose func(messages []*amqp.Message)

utils.Logger
}

Expand All @@ -126,20 +131,25 @@ type NewAMQPLinksArgs struct {
EntityPath string
CreateLinkFunc CreateLinkFunc
GetRecoveryKindFunc func(err error) RecoveryKind

// PrefetchedMessagesAfterClose is called after a Receiver (in ReceiveAndDelete mode) is closed. It gets passed all
// the messages we could read from [amqp.Receiver.Prefetched].
PrefetchedMessagesAfterClose func(messages []*amqp.Message)
}

// NewAMQPLinks creates a session, starts the claim refresher and creates an associated
// management link for a specific entity path.
func NewAMQPLinks(args NewAMQPLinksArgs) AMQPLinks {
l := &AMQPLinksImpl{
entityPath: args.EntityPath,
managementPath: fmt.Sprintf("%s/$management", args.EntityPath),
audience: args.NS.GetEntityAudience(args.EntityPath),
createLink: args.CreateLinkFunc,
closedPermanently: false,
getRecoveryKindFunc: args.GetRecoveryKindFunc,
ns: args.NS,
Logger: utils.NewLogger(),
entityPath: args.EntityPath,
managementPath: fmt.Sprintf("%s/$management", args.EntityPath),
audience: args.NS.GetEntityAudience(args.EntityPath),
createLink: args.CreateLinkFunc,
closedPermanently: false,
getRecoveryKindFunc: args.GetRecoveryKindFunc,
ns: args.NS,
Logger: utils.NewLogger(),
prefetchedMessagesAfterClose: args.PrefetchedMessagesAfterClose,
}

return l
Expand All @@ -150,6 +160,9 @@ func (links *AMQPLinksImpl) ManagementPath() string {
return links.managementPath
}

// errClosed is used when you try to use a closed link.
var errClosed = NewErrNonRetriable("link was closed by user")

// recoverLink will recycle all associated links (mgmt, receiver, sender and session)
// and recreate them using the link.linkCreator function.
func (links *AMQPLinksImpl) recoverLink(ctx context.Context, theirLinkRevision LinkID) error {
Expand All @@ -161,7 +174,7 @@ func (links *AMQPLinksImpl) recoverLink(ctx context.Context, theirLinkRevision L
links.mu.RUnlock()

if closedPermanently {
return NewErrNonRetriable("link was closed by user")
return errClosed
}

// cheap check before we do the lock
Expand Down Expand Up @@ -288,7 +301,7 @@ func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error) {
l.mu.RUnlock()

if closedPermanently {
return nil, NewErrNonRetriable("link was closed by user")
return nil, errClosed
}

if sender != nil || receiver != nil {
Expand Down Expand Up @@ -570,6 +583,8 @@ func (links *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent b
}
}

pushPrefetchedMessagesWithoutLocking(links, permanent)

links.Sender, links.Receiver, links.session, links.RPCLink = nil, nil, nil, nil

if wasCancelled {
Expand All @@ -582,3 +597,35 @@ func (links *AMQPLinksImpl) closeWithoutLocking(ctx context.Context, permanent b

return nil
}

// pushPrefetchedMessagesWithoutLocking clears the prefetched messages from the link. This function assumes
// the caller has taken care of proper locking and has already closed the Receiver link, if it exists.
func pushPrefetchedMessagesWithoutLocking(links *AMQPLinksImpl, permanent bool) {
if !permanent { // only activate this when the Receiver is shut down. This avoids any possible concurrency issues.
return
}

if links.Receiver == nil || links.prefetchedMessagesAfterClose == nil {
return
}

var prefetched []*amqp.Message

for {
m := links.Receiver.Prefetched()

if m == nil {
break
}

prefetched = append(prefetched, m)
}

if len(prefetched) == 0 {
links.Writef(exported.EventConn, "No messages on receiver after closing.")
return
}

links.Writef(exported.EventConn, "Got %d messages on receiver after closing. These can be received using ReceiveMessages().", len(prefetched))
links.prefetchedMessagesAfterClose(prefetched)
}
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
package internal

// Version is the semantic version number
const Version = "v1.9.0"
const Version = "v1.9.1"
16 changes: 4 additions & 12 deletions sdk/messaging/azservicebus/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net"
"net/http"
"reflect"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
Expand Down Expand Up @@ -69,6 +68,10 @@ func TransformError(err error) error {
return exported.NewError(exported.CodeTimeout, err)
}

if errors.Is(err, errClosed) {
return exported.NewError(exported.CodeClosed, err)
}

// there are a few errors that all boil down to "bad creds or unauthorized"
var amqpErr *amqp.Error

Expand Down Expand Up @@ -141,11 +144,6 @@ func IsCancelError(err error) bool {
return false
}

func IsDrainingError(err error) bool {
// TODO: we should be able to identify these errors programatically
return strings.Contains(err.Error(), "link is currently draining")
}

const errorConditionLockLost = amqp.ErrCond("com.microsoft:message-lock-lost")

var amqpConditionsToRecoveryKind = map[amqp.ErrCond]RecoveryKind{
Expand Down Expand Up @@ -250,12 +248,6 @@ func GetRecoveryKind(err error) RecoveryKind {
return RecoveryKindConn
}

if IsDrainingError(err) {
// temporary, operation should just be retryable since drain will
// eventually complete.
return RecoveryKindNone
}

var rpcErr RPCError

if errors.As(err, &rpcErr) {
Expand Down
3 changes: 2 additions & 1 deletion sdk/messaging/azservicebus/internal/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func Test_ServiceBusError_NoRecoveryNeeded(t *testing.T) {
&amqp.Error{Condition: amqp.ErrCond("com.microsoft:server-busy")},
&amqp.Error{Condition: amqp.ErrCond("com.microsoft:timeout")},
&amqp.Error{Condition: amqp.ErrCond("com.microsoft:operation-cancelled")},
errors.New("link is currently draining"), // not yet exposed from go-amqp
// simple timeouts from the mgmt link
RPCError{Resp: &amqpwrap.RPCResponse{Code: 408}},
RPCError{Resp: &amqpwrap.RPCResponse{Code: 503}},
Expand Down Expand Up @@ -229,6 +228,8 @@ func Test_ServiceBusError_Fatal(t *testing.T) {
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusNotFound}}))
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: RPCResponseCodeLockLost}}))
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusUnauthorized}}))
require.Equal(t, RecoveryKindFatal, GetRecoveryKind(errClosed))

}

func Test_IsLockLostError(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions sdk/messaging/azservicebus/internal/exported/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (

// CodeNotFound means the entity you're attempting to connect to doesn't exist.
CodeNotFound Code = "notfound"

// CodeClosed means the link or connection for this sender/receiver has been closed.
CodeClosed Code = "closed"
)

// Error represents a Service Bus specific error.
Expand Down
13 changes: 10 additions & 3 deletions sdk/messaging/azservicebus/internal/exported/receive_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ package exported
type ReceiveMode int

const (
// PeekLock will lock messages as they are received and can be settled
// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
// functions.
// PeekLock will lock messages as they are received. These messages can then be settled using the
// Receiver's (Complete|Abandon|DeadLetter|Defer)Message functions.
PeekLock ReceiveMode = 0

// ReceiveAndDelete will delete messages as they are received.
//
// NOTE: In ReceiveAndDelete mode you should continue to call ReceiveMessages(), to receive any cached messages, even after the Receiver
// has been closed. See [receiver_and_delete_example] for an example of how to incorporate this into your code.
//
// This is not needed for Receivers in [PeekLock] mode, as cached messages are automatically released to the service.
//
// [receiver_and_delete_example]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus#example-Receiver.ReceiveMessages-ReceiveAndDelete
ReceiveAndDelete ReceiveMode = 1
)
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/internal/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func TestNamespaceNegotiateClaimRenewal(t *testing.T) {
require.NoError(t, err)
time.Sleep(3 * time.Second) // make sure, even with variability, we get at least one renewal

cancel()

require.EqualValues(t, 2, nextRefreshDurationChecks)
require.EqualValues(t, 2, cbsNegotiateClaimCalled)
require.Empty(t, errorsLogged)

cancel()
}

func TestNamespaceNegotiateClaimFailsToGetClient(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion sdk/messaging/azservicebus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,16 @@ func (m *Message) toAMQPMessage() *amqp.Message {
// NOTE: this converter assumes that the Body of this message will be the first
// serialized byte array in the Data section of the messsage.
func newReceivedMessage(amqpMsg *amqp.Message, receiver amqpwrap.AMQPReceiver) *ReceivedMessage {
linkName := ""

if receiver != nil { // nil when we're converting messages from [Receiver.Prefetched] after the Receiver has been closed.
linkName = receiver.LinkName()
}

msg := &ReceivedMessage{
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg),
State: MessageStateActive,
linkName: receiver.LinkName(),
linkName: linkName,
}

if len(msg.RawAMQPMessage.Body.Data) == 1 {
Expand Down
Loading