Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion extensions/Worker.Extensions.CosmosDB/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
<!-- Please add your release notes in the following format:
- My change description (#PR/#issue)
-->
- Fixed incorrect type of CosmosDBTriggerAttribute's `StartFromTime` property.

- Add support for SDK-type bindings via deferred binding feature (#1406)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using Azure.Core;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Functions.Worker.Extensions.CosmosDB;

namespace Microsoft.Azure.Functions.Worker
{
internal class CosmosDBBindingOptions
{
public string? ConnectionString { get; set; }

public string? AccountEndpoint { get; set; }

public TokenCredential? Credential { get; set; }

internal string BuildCacheKey(string connectionString, string region) => $"{connectionString}|{region}";
internal ConcurrentDictionary<string, CosmosClient> ClientCache { get; } = new ConcurrentDictionary<string, CosmosClient>();

internal virtual CosmosClient GetClient(string connection, string preferredLocations = "")
{
if (string.IsNullOrEmpty(connection))
{
throw new ArgumentNullException(nameof(connection));
}

string cacheKey = BuildCacheKey(connection, preferredLocations);

CosmosClientOptions cosmosClientOptions = new ()
{
ConnectionMode = ConnectionMode.Gateway
};

if (!string.IsNullOrEmpty(preferredLocations))
{
cosmosClientOptions.ApplicationPreferredRegions = Utilities.ParsePreferredLocations(preferredLocations);
}

return ClientCache.GetOrAdd(cacheKey, (c) => CreateService(cosmosClientOptions));
}

private CosmosClient CreateService(CosmosClientOptions cosmosClientOptions)
{
return string.IsNullOrEmpty(ConnectionString)
? new CosmosClient(AccountEndpoint, Credential, cosmosClientOptions) // AAD auth
: new CosmosClient(ConnectionString, cosmosClientOptions); // Connection string based auth
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.Functions.Worker.Extensions;
using Microsoft.Azure.Functions.Worker.Extensions.CosmosDB;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.Functions.Worker
{
internal class CosmosDBBindingOptionsSetup : IConfigureNamedOptions<CosmosDBBindingOptions>
{
private readonly IConfiguration _configuration;
private readonly AzureComponentFactory _componentFactory;

public CosmosDBBindingOptionsSetup(IConfiguration configuration, AzureComponentFactory componentFactory)
{
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_componentFactory = componentFactory ?? throw new ArgumentNullException(nameof(componentFactory));
}

public void Configure(CosmosDBBindingOptions options)
{
Configure(Options.DefaultName, options);
}

public void Configure(string name, CosmosDBBindingOptions options)
{
IConfigurationSection connectionSection = _configuration.GetWebJobsConnectionStringSection(name);

if (!connectionSection.Exists())
{
throw new InvalidOperationException($"Cosmos DB connection configuration '{name}' does not exist. " +
"Make sure that it is a defined App Setting.");
}

if (!string.IsNullOrWhiteSpace(connectionSection.Value))
{
options.ConnectionString = connectionSection.Value;
}
else
{
options.AccountEndpoint = connectionSection[Constants.AccountEndpoint];
if (string.IsNullOrWhiteSpace(options.AccountEndpoint))
{
throw new InvalidOperationException($"Connection should have an '{Constants.AccountEndpoint}' property or be a " +
$"string representing a connection string.");
}

options.Credential = _componentFactory.CreateTokenCredential(connectionSection);
}
}
}
}
15 changes: 15 additions & 0 deletions extensions/Worker.Extensions.CosmosDB/src/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.Functions.Worker.Extensions.CosmosDB
{
internal static class Constants
{
internal const string CosmosExtensionName = "CosmosDB";
internal const string ConfigurationSectionName = "AzureWebJobs";
internal const string ConnectionStringsSectionName = "ConnectionStrings";
internal const string AccountEndpoint = "accountEndpoint";
internal const string JsonContentType = "application/json";
}
}
206 changes: 206 additions & 0 deletions extensions/Worker.Extensions.CosmosDB/src/CosmosDBConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.CosmosDB;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind Cosmos DB type parameters.
/// </summary>
internal class CosmosDBConverter : IInputConverter
{
private readonly IOptionsSnapshot<CosmosDBBindingOptions> _cosmosOptions;
private readonly ILogger<CosmosDBConverter> _logger;

public CosmosDBConverter(IOptionsSnapshot<CosmosDBBindingOptions> cosmosOptions, ILogger<CosmosDBConverter> logger)
{
_cosmosOptions = cosmosOptions ?? throw new ArgumentNullException(nameof(cosmosOptions));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
return context?.Source switch
{
ModelBindingData binding => await ConvertFromBindingDataAsync(context, binding),
_ => ConversionResult.Unhandled(),
};
}

private async ValueTask<ConversionResult> ConvertFromBindingDataAsync(ConverterContext context, ModelBindingData modelBindingData)
{
if (!IsCosmosExtension(modelBindingData))
{
return ConversionResult.Unhandled();
}

try
{
var cosmosAttribute = GetBindingDataContent(modelBindingData);
object result = await ToTargetTypeAsync(context.TargetType, cosmosAttribute);

if (result is not null)
{
return ConversionResult.Success(result);
}
}
catch (Exception ex)
{
return ConversionResult.Failed(ex);
}

return ConversionResult.Unhandled();
}

private bool IsCosmosExtension(ModelBindingData bindingData)
{
if (bindingData?.Source is not Constants.CosmosExtensionName)
{
_logger.LogTrace("Source '{source}' is not supported by {converter}", bindingData?.Source, nameof(CosmosDBConverter));
return false;
}

return true;
}

private CosmosDBInputAttribute GetBindingDataContent(ModelBindingData bindingData)
{
return bindingData?.ContentType switch
{
Constants.JsonContentType => bindingData.Content.ToObjectFromJson<CosmosDBInputAttribute>(),
_ => throw new NotSupportedException($"Unexpected content-type. Currently only '{Constants.JsonContentType}' is supported.")
};
}

private async Task<object> ToTargetTypeAsync(Type targetType, CosmosDBInputAttribute cosmosAttribute) => targetType switch
{
Type _ when targetType == typeof(CosmosClient) => CreateCosmosClient<CosmosClient>(cosmosAttribute),
Type _ when targetType == typeof(Database) => CreateCosmosClient<Database>(cosmosAttribute),
Type _ when targetType == typeof(Container) => CreateCosmosClient<Container>(cosmosAttribute),
_ => await CreateTargetObjectAsync(targetType, cosmosAttribute)
};

private async Task<object> CreateTargetObjectAsync(Type targetType, CosmosDBInputAttribute cosmosAttribute)
{
MethodInfo createPOCOMethod;

if (targetType.GenericTypeArguments.Any())
{
targetType = targetType.GenericTypeArguments.FirstOrDefault();

createPOCOMethod = GetType()
.GetMethod(nameof(CreatePOCOCollectionAsync), BindingFlags.Instance | BindingFlags.NonPublic)
.MakeGenericMethod(targetType);
}
else
{
createPOCOMethod = GetType()
.GetMethod(nameof(CreatePOCOAsync), BindingFlags.Instance | BindingFlags.NonPublic)
.MakeGenericMethod(targetType);
}


var container = CreateCosmosClient<Container>(cosmosAttribute) as Container;

if (container is null)
{
throw new InvalidOperationException($"Unable to create Cosmos container client for '{cosmosAttribute.ContainerName}'.");
}

return await (Task<object>)createPOCOMethod.Invoke(this, new object[] { container, cosmosAttribute });
}

private async Task<object> CreatePOCOAsync<T>(Container container, CosmosDBInputAttribute cosmosAttribute)
{
if (String.IsNullOrEmpty(cosmosAttribute.Id) || String.IsNullOrEmpty(cosmosAttribute.PartitionKey))
{
throw new InvalidOperationException("The 'Id' and 'PartitionKey' properties of a CosmosDB single-item input binding cannot be null or empty.");
}

ItemResponse<T> item = await container.ReadItemAsync<T>(cosmosAttribute.Id, new PartitionKey(cosmosAttribute.PartitionKey));

if (item is null || item?.StatusCode is not System.Net.HttpStatusCode.OK || item.Resource is null)
{
throw new InvalidOperationException($"Unable to retrieve document with ID '{cosmosAttribute.Id}' and PartitionKey '{cosmosAttribute.PartitionKey}'");
}

return item.Resource;
}

private async Task<object> CreatePOCOCollectionAsync<T>(Container container, CosmosDBInputAttribute cosmosAttribute)
{
QueryDefinition queryDefinition = null!;
if (!String.IsNullOrEmpty(cosmosAttribute.SqlQuery))
{
queryDefinition = new QueryDefinition(cosmosAttribute.SqlQuery);
if (cosmosAttribute.SqlQueryParameters?.Count() > 0)
{
foreach (var parameter in cosmosAttribute.SqlQueryParameters)
{
queryDefinition.WithParameter(parameter.Key, parameter.Value.ToString());
}
}
}

PartitionKey partitionKey = String.IsNullOrEmpty(cosmosAttribute.PartitionKey)
? PartitionKey.None
: new PartitionKey(cosmosAttribute.PartitionKey);

// Workaround until bug in Cosmos SDK is fixed
// Currently pending release: https://github.com/Azure/azure-cosmos-dotnet-v3/commit/d6e04a92f8778565eb1d1452738d37c7faf3c47a
QueryRequestOptions queryRequestOptions = new();
if (partitionKey != PartitionKey.None)
{
queryRequestOptions = new() { PartitionKey = partitionKey };
}

using (var iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition, requestOptions: queryRequestOptions))
{
if (iterator is null)
{
throw new InvalidOperationException($"Unable to retrieve documents for container '{container.Id}'.");
}

return await ExtractCosmosDocumentsAsync(iterator);
}
}

private async Task<IList<T>> ExtractCosmosDocumentsAsync<T>(FeedIterator<T> iterator)
{
var documentList = new List<T>();
while (iterator.HasMoreResults)
{
FeedResponse<T> response = await iterator.ReadNextAsync();
documentList.AddRange(response.Resource);
}
return documentList;
}

private T CreateCosmosClient<T>(CosmosDBInputAttribute cosmosAttribute)
{
var cosmosDBOptions = _cosmosOptions.Get(cosmosAttribute?.Connection);
CosmosClient cosmosClient = cosmosDBOptions.GetClient(cosmosAttribute?.Connection!, cosmosAttribute?.PreferredLocations!);

Type targetType = typeof(T);
object cosmosReference = targetType switch
{
Type _ when targetType == typeof(Database) => cosmosClient.GetDatabase(cosmosAttribute?.DatabaseName),
Type _ when targetType == typeof(Container) => cosmosClient.GetContainer(cosmosAttribute?.DatabaseName, cosmosAttribute?.ContainerName),
_ => cosmosClient
};

return (T)cosmosReference;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

namespace Microsoft.Azure.Functions.Worker
{
[SupportsDeferredBinding]
public sealed class CosmosDBInputAttribute : InputBindingAttribute
{
/// <summary>
Expand Down Expand Up @@ -44,7 +46,7 @@ public CosmosDBInputAttribute(string databaseName, string containerName)

/// <summary>
/// Optional.
/// When specified on an output binding and <see cref="CreateIfNotExists"/> is true, defines the partition key
/// When specified on an output binding and <see cref="CreateIfNotExists"/> is true, defines the partition key
/// path for the created container.
/// When specified on an input binding, specifies the partition key value for the lookup.
/// May include binding parameters.
Expand All @@ -67,5 +69,11 @@ public CosmosDBInputAttribute(string databaseName, string containerName)
/// PreferredLocations = "East US,South Central US,North Europe"
/// </example>
public string? PreferredLocations { get; set; }

/// <summary>
/// Optional.
/// Defines the parameters to be used with the SqlQuery
/// </summary>
public IDictionary<string, object>? SqlQueryParameters { get; set; }
}
}
Loading