Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion extensions/Worker.Extensions.ServiceBus/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## Release notes
<!-- Please add your release notes in the following format:
- My change description (#PR/#issue)
-->
-->

- Support binding to ServiceBusReceivedMessage (#1313)
11 changes: 11 additions & 0 deletions extensions/Worker.Extensions.ServiceBus/src/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.


namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus
{
public static class Constants
{
internal const string BinaryContentType = "application/octet-stream";
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.10.0")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.WorkerExtension.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.7.0")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

[assembly: WorkerExtensionStartup(typeof(ServiceBusExtensionStartup))]

namespace Microsoft.Azure.Functions.Worker
{
public class ServiceBusExtensionStartup : WorkerExtensionStartup
{
public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder)
{
if (applicationBuilder == null)
{
throw new ArgumentNullException(nameof(applicationBuilder));
}

applicationBuilder.Services.AddAzureClientsCore(); // Adds AzureComponentFactory

applicationBuilder.Services.Configure<WorkerOptions>((workerOption) =>
{
workerOption.InputConverters.RegisterAt<ServiceBusReceivedMessageConverter>(0);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Threading.Tasks;
using Azure.Core.Amqp;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;

namespace Microsoft.Azure.Functions.Worker
{

[SupportsDeferredBinding]
[SupportedConverterType(typeof(ServiceBusReceivedMessage))]
[SupportedConverterType(typeof(ServiceBusReceivedMessage[]))]
internal class ServiceBusReceivedMessageConverter : IInputConverter
{
public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
ConversionResult result = context?.Source switch
{
ModelBindingData binding => ConversionResult.Success(ConvertToServiceBusReceivedMessage(binding)),
// Only array collections are currently supported, which matches the behavior of the in-proc extension.
CollectionModelBindingData collection => ConversionResult.Success(collection.ModelBindingDataArray
.Select(ConvertToServiceBusReceivedMessage).ToArray()),
_ => ConversionResult.Unhandled()
};
return new ValueTask<ConversionResult>(result);
}

private ServiceBusReceivedMessage ConvertToServiceBusReceivedMessage(ModelBindingData binding)
{
// The lock token is a 16 byte GUID
const int lockTokenLength = 16;

if (binding.ContentType != Constants.BinaryContentType)
{
throw new InvalidOperationException(
$"Unexpected content-type. Only '{Constants.BinaryContentType}' is supported.");
}

ReadOnlyMemory<byte> bytes = binding.Content.ToMemory();
ReadOnlyMemory<byte> lockTokenBytes = bytes.Slice(0, lockTokenLength);
ReadOnlyMemory<byte> messageBytes = bytes.Slice(lockTokenLength, bytes.Length - lockTokenLength);
return ServiceBusReceivedMessage.FromAmqpMessage(AmqpAnnotatedMessage.FromBytes(BinaryData.FromBytes(messageBytes)),
BinaryData.FromBytes(lockTokenBytes));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using System.Collections.Generic;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

namespace Microsoft.Azure.Functions.Worker
{
[AllowConverterFallback(true)]
[InputConverter(typeof(ServiceBusReceivedMessageConverter))]
public sealed class ServiceBusTriggerAttribute : TriggerBindingAttribute, ISupportCardinality
{
private bool _isBatched = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@
<Description>Azure Service Bus extensions for .NET isolated functions</Description>

<!--Version information-->
<VersionPrefix>5.7.0</VersionPrefix>
<VersionPrefix>5.10.0</VersionPrefix>
<VersionSuffix>-preview1</VersionSuffix>

<!--Temporarily opting out of documentation. Pending documentation-->
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>

<Import Project="..\..\..\build\Extensions.props" />


<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.14.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.6.3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\DotNetWorker.Core\DotNetWorker.Core.csproj" />
</ItemGroup>

<ItemGroup>
<SharedReference Include="..\..\Worker.Extensions.Shared/Worker.Extensions.Shared.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp
{
/// <summary>
/// Samples demonstrating binding to the <see cref="ServiceBusReceivedMessage"/> type.
/// </summary>
public class ServiceBusReceivedMessageBindingSamples
{
private readonly ILogger<ServiceBusReceivedMessageBindingSamples> _logger;

public ServiceBusReceivedMessageBindingSamples(ILogger<ServiceBusReceivedMessageBindingSamples> logger)
{
_logger = logger;
}

/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageFunction))]
public void ServiceBusReceivedMessageFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}

/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
/// Note that when doing so, you must also set the <see cref="ServiceBusTriggerAttribute.IsBatched"/> property
/// to <value>true</value>.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
foreach (ServiceBusReceivedMessage message in messages)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}

/// <summary>
/// This functions demonstrates that it is possible to bind to both the ServiceBusReceivedMessage and any of the supported binding contract
/// properties at the same time. If attempting this, the ServiceBusReceivedMessage must be the first parameter. There is not
/// much benefit to doing this as all of the binding contract properties are available as properties on the ServiceBusReceivedMessage.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageWithStringProperties))]
public void ServiceBusReceivedMessageWithStringProperties(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message, string messageId, int deliveryCount)
{
// The MessageId property and the messageId parameter are the same.
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message ID: {id}", messageId);

// Similarly the DeliveryCount property and the deliveryCount parameter are the same.
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}
}
}
1 change: 1 addition & 0 deletions samples/WorkerBindingSamples/WorkerBindingSamples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\extensions\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.ServiceBus\src\Worker.Extensions.ServiceBus.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Storage\src\Worker.Extensions.Storage.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker.ApplicationInsights\DotNetWorker.ApplicationInsights.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker\DotNetWorker.csproj" />
Expand Down
1 change: 1 addition & 0 deletions test/E2ETests/E2EApps/E2EApp/E2EApp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.ServiceBus\src\Worker.Extensions.ServiceBus.csproj" />
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.Storage\src\Worker.Extensions.Storage.csproj" />
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.CosmosDB\src\Worker.Extensions.CosmosDB.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Tests;
using Microsoft.Azure.Functions.Worker;
Expand Down Expand Up @@ -848,6 +849,58 @@ public void FunctionWithRetryPolicyWithInvalidIntervals()
Assert.Throws<ArgumentOutOfRangeException>(() => new ExponentialBackoffRetryAttribute(5, "something_bad", "00:01:00"));
}

[Fact]
public void ServiceBus_SDKTypeBindings()
{
var generator = new FunctionMetadataGenerator();
var module = ModuleDefinition.ReadModule(_thisAssembly.Location);
var typeDef = TestUtility.GetTypeDefinition(typeof(SDKTypeBindings_ServiceBus));
var functions = generator.GenerateFunctionMetadata(typeDef);
var extensions = generator.Extensions;

Assert.Equal(2, functions.Count());

AssertDictionary(extensions, new Dictionary<string, string>
{
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.10.0" },
});

var serviceBusTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction));

ValidateFunction(serviceBusTriggerFunction, nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction), GetEntryPoint(nameof(SDKTypeBindings_ServiceBus), nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction)),
ValidateServiceBusTrigger);

var serviceBusBatchTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusBatchTriggerFunction));

ValidateFunction(serviceBusBatchTriggerFunction, nameof(SDKTypeBindings_ServiceBus.ServiceBusBatchTriggerFunction), GetEntryPoint(nameof(SDKTypeBindings_ServiceBus), nameof(SDKTypeBindings_ServiceBus.ServiceBusBatchTriggerFunction)),
ValidateServiceBusBatchTrigger);

void ValidateServiceBusTrigger(ExpandoObject b)
{
AssertExpandoObject(b, new Dictionary<string, object>
{
{ "Name", "message" },
{ "Type", "serviceBusTrigger" },
{ "Direction", "In" },
{ "queueName", "queue" },
{ "Properties", new Dictionary<String, Object>( ) { { "SupportsDeferredBinding" , "True"} } }
});
}

void ValidateServiceBusBatchTrigger(ExpandoObject b)
{
AssertExpandoObject(b, new Dictionary<string, object>
{
{ "Name", "messages" },
{ "Type", "serviceBusTrigger" },
{ "Direction", "In" },
{ "queueName", "queue" },
{ "Cardinality", "Many" },
{ "Properties", new Dictionary<String, Object>( ) { { "SupportsDeferredBinding" , "True"} } }
});
}
}

private class EventHubNotBatched
{
[Function("EventHubTrigger")]
Expand Down Expand Up @@ -1034,6 +1087,23 @@ public object BlobStringToBlobPocoArray(
}
}

private class SDKTypeBindings_ServiceBus
{
[Function(nameof(ServiceBusTriggerFunction))]
public static void ServiceBusTriggerFunction(
[ServiceBusTrigger("queue")] ServiceBusReceivedMessage message)
{
throw new NotImplementedException();
}

[Function(nameof(ServiceBusBatchTriggerFunction))]
public static void ServiceBusBatchTriggerFunction(
[ServiceBusTrigger("queue", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
throw new NotImplementedException();
}
}

private class ExternalType_Return
{
public const string FunctionName = "BasicHttpWithExternalTypeReturn";
Expand Down
1 change: 1 addition & 0 deletions test/FunctionMetadataGeneratorTests/SdkTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<ProjectReference Include="..\..\extensions\Worker.Extensions.Storage\src\Worker.Extensions.Storage.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Timer\src\Worker.Extensions.Timer.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.EventHubs\src\Worker.Extensions.EventHubs.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.ServiceBus\src\Worker.Extensions.ServiceBus.csproj" />
<ProjectReference Include="..\..\sdk\FunctionMetadataLoaderExtension\FunctionMetadataLoaderExtension.csproj" />
<ProjectReference Include="..\..\sdk\Sdk\Sdk.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker\DotNetWorker.csproj" />
Expand Down
Loading