Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/DotNetWorker.Grpc/Definition/GrpcFunctionDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,22 @@ public GrpcFunctionDefinition(FunctionLoadRequest loadRequest, IMethodInfoLocato
{
EntryPoint = loadRequest.Metadata.EntryPoint;
Name = loadRequest.Metadata.Name;
PathToAssembly = Path.GetFullPath(loadRequest.Metadata.ScriptFile);
Id = loadRequest.FunctionId;

string? scriptRoot = Environment.GetEnvironmentVariable("AzureWebJobsScriptRoot");
if (string.IsNullOrWhiteSpace(scriptRoot))
{
throw new InvalidOperationException("The 'AzureWebJobsScriptRoot' environment variable value is not defined. This is a required environment variable that is automatically set by the Azure Functions runtime.");
}

if (string.IsNullOrWhiteSpace(loadRequest.Metadata.ScriptFile))
{
throw new InvalidOperationException($"Metadata for function '{loadRequest.Metadata.Name} ({loadRequest.Metadata.FunctionId})' does not specify a 'ScriptFile'.");
}

string scriptFile = Path.Combine(scriptRoot, loadRequest.Metadata.ScriptFile);
PathToAssembly = Path.GetFullPath(scriptFile);

var grpcBindingsGroup = loadRequest.Metadata.Bindings.GroupBy(kv => kv.Value.Direction);
var grpcInputBindings = grpcBindingsGroup.Where(kv => kv.Key == BindingInfo.Types.Direction.In).FirstOrDefault();
var grpcOutputBindings = grpcBindingsGroup.Where(kv => kv.Key != BindingInfo.Types.Direction.In).FirstOrDefault();
Expand Down
6 changes: 4 additions & 2 deletions src/DotNetWorker.Grpc/DotNetWorker.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
<AssemblyName>Microsoft.Azure.Functions.Worker.Grpc</AssemblyName>
<RootNamespace>Microsoft.Azure.Functions.Worker.Grpc</RootNamespace>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<MinorProductVersion>7</MinorProductVersion>
<VersionSuffix>-preview2</VersionSuffix>
<MinorProductVersion>8</MinorProductVersion>
<VersionSuffix>-preview1</VersionSuffix>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<Import Project="..\..\build\Common.props" />
Expand Down Expand Up @@ -39,6 +40,7 @@
Grpc.Net.Client implementation-->
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Grpc.Core" Version="2.45.0" />
<Compile Remove="NativeHostIntegration/**" />
</ItemGroup>

<ItemGroup>
Expand Down
50 changes: 12 additions & 38 deletions src/DotNetWorker.Grpc/GrpcServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,15 @@

using System;
using System.Threading.Channels;
using Grpc.Core;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Core.FunctionMetadata;
using Microsoft.Azure.Functions.Worker.Grpc.Messages;
using static Microsoft.Azure.Functions.Worker.Grpc.Messages.FunctionRpc;
using Microsoft.Azure.Functions.Worker.Logging;
using Microsoft.Azure.Functions.Worker.Grpc;
using Microsoft.Azure.Functions.Worker.Diagnostics;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

#if NET5_0_OR_GREATER
using Grpc.Net.Client;
#endif

namespace Microsoft.Extensions.DependencyInjection
{
Expand All @@ -35,7 +28,7 @@ internal static IServiceCollection RegisterOutputChannel(this IServiceCollection
AllowSynchronousContinuations = true
};

return new GrpcHostChannel(System.Threading.Channels.Channel.CreateUnbounded<StreamingMessage>(outputOptions));
return new GrpcHostChannel(Channel.CreateUnbounded<StreamingMessage>(outputOptions));
});
}

Expand All @@ -56,40 +49,21 @@ public static IServiceCollection AddGrpc(this IServiceCollection services)

// gRPC Core services
services.AddSingleton<IWorker, GrpcWorker>();
services.AddSingleton<FunctionRpcClient>(p =>
{
IOptions<GrpcWorkerStartupOptions> argumentsOptions = p.GetService<IOptions<GrpcWorkerStartupOptions>>()
?? throw new InvalidOperationException("gRPC Services are not correctly registered.");

GrpcWorkerStartupOptions arguments = argumentsOptions.Value;

string uriString = $"http://{arguments.Host}:{arguments.Port}";
if (!Uri.TryCreate(uriString, UriKind.Absolute, out Uri? grpcUri))
{
throw new InvalidOperationException($"The gRPC channel URI '{uriString}' could not be parsed.");
}


#if NET5_0_OR_GREATER
GrpcChannel grpcChannel = GrpcChannel.ForAddress(grpcUri, new GrpcChannelOptions()
{
MaxReceiveMessageSize = arguments.GrpcMaxMessageLength,
MaxSendMessageSize = arguments.GrpcMaxMessageLength,
Credentials = ChannelCredentials.Insecure
});
// If we are running in the native host process, use the native client
// for communication (interop). Otherwise; use the gRPC client.
if (AppContext.GetData("AZURE_FUNCTIONS_NATIVE_HOST") is not null)
{
services.AddSingleton<IWorkerClientFactory, Azure.Functions.Worker.Grpc.NativeHostIntegration.NativeWorkerClientFactory>();
}
else
{
services.AddSingleton<IWorkerClientFactory, GrpcWorkerClientFactory>();
}
#else

var options = new ChannelOption[]
{
new ChannelOption(Grpc.Core.ChannelOptions.MaxReceiveMessageLength, arguments.GrpcMaxMessageLength),
new ChannelOption(Grpc.Core.ChannelOptions.MaxSendMessageLength, arguments.GrpcMaxMessageLength)
};

Grpc.Core.Channel grpcChannel = new Grpc.Core.Channel(arguments.Host, arguments.Port, ChannelCredentials.Insecure, options);

services.AddSingleton<IWorkerClientFactory, GrpcWorkerClientFactory>();
#endif
return new FunctionRpcClient(grpcChannel);
});

services.AddOptions<GrpcWorkerStartupOptions>()
.Configure<IConfiguration>((arguments, config) =>
Expand Down
93 changes: 22 additions & 71 deletions src/DotNetWorker.Grpc/GrpcWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Azure.Core.Serialization;
using Grpc.Core;
using Microsoft.Azure.Functions.Worker.Context.Features;
using Microsoft.Azure.Functions.Worker.Core.FunctionMetadata;
using Microsoft.Azure.Functions.Worker.Grpc;
Expand All @@ -21,52 +19,43 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using static Microsoft.Azure.Functions.Worker.Grpc.Messages.FunctionRpc;
using MsgType = Microsoft.Azure.Functions.Worker.Grpc.Messages.StreamingMessage.ContentOneofCase;

namespace Microsoft.Azure.Functions.Worker
{
internal class GrpcWorker : IWorker
internal class GrpcWorker : IWorker, IMessageProcessor
{
private readonly ChannelReader<StreamingMessage> _outputReader;
private readonly ChannelWriter<StreamingMessage> _outputWriter;

private readonly IFunctionsApplication _application;
private readonly FunctionRpcClient _rpcClient;
private readonly IInvocationFeaturesFactory _invocationFeaturesFactory;
private readonly IOutputBindingsInfoProvider _outputBindingsInfoProvider;
private readonly IInputConversionFeatureProvider _inputConversionFeatureProvider;
private readonly IMethodInfoLocator _methodInfoLocator;
private readonly GrpcWorkerStartupOptions _startupOptions;
private readonly WorkerOptions _workerOptions;
private readonly ObjectSerializer _serializer;
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly IWorkerClientFactory _workerClientFactory;
private readonly IInvocationHandler _invocationHandler;
private readonly IFunctionMetadataProvider _functionMetadataProvider;

public GrpcWorker(IFunctionsApplication application, FunctionRpcClient rpcClient, GrpcHostChannel outputChannel, IInvocationFeaturesFactory invocationFeaturesFactory,
IOutputBindingsInfoProvider outputBindingsInfoProvider, IMethodInfoLocator methodInfoLocator,
IOptions<GrpcWorkerStartupOptions> startupOptions, IOptions<WorkerOptions> workerOptions,
IInputConversionFeatureProvider inputConversionFeatureProvider,
IFunctionMetadataProvider functionMetadataProvider,
IHostApplicationLifetime hostApplicationLifetime,
ILogger<GrpcWorker> logger)
private IWorkerClient? _workerClient;

public GrpcWorker(IFunctionsApplication application,
IWorkerClientFactory workerClientFactory,
IInvocationFeaturesFactory invocationFeaturesFactory,
IOutputBindingsInfoProvider outputBindingsInfoProvider,
IMethodInfoLocator methodInfoLocator,
IOptions<WorkerOptions> workerOptions,
IInputConversionFeatureProvider inputConversionFeatureProvider,
IFunctionMetadataProvider functionMetadataProvider,
IHostApplicationLifetime hostApplicationLifetime,
ILogger<GrpcWorker> logger)
{
if (outputChannel == null)
{
throw new ArgumentNullException(nameof(outputChannel));
}

_outputReader = outputChannel.Channel.Reader;
_outputWriter = outputChannel.Channel.Writer;

_hostApplicationLifetime = hostApplicationLifetime ?? throw new ArgumentNullException(nameof(hostApplicationLifetime));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
_application = application ?? throw new ArgumentNullException(nameof(application));
_rpcClient = rpcClient ?? throw new ArgumentNullException(nameof(rpcClient));
_invocationFeaturesFactory = invocationFeaturesFactory ?? throw new ArgumentNullException(nameof(invocationFeaturesFactory));
_outputBindingsInfoProvider = outputBindingsInfoProvider ?? throw new ArgumentNullException(nameof(outputBindingsInfoProvider));
_methodInfoLocator = methodInfoLocator ?? throw new ArgumentNullException(nameof(methodInfoLocator));
_startupOptions = startupOptions?.Value ?? throw new ArgumentNullException(nameof(startupOptions));

_workerOptions = workerOptions?.Value ?? throw new ArgumentNullException(nameof(workerOptions));
_serializer = workerOptions.Value.Serializer ?? throw new InvalidOperationException(nameof(workerOptions.Value.Serializer));
_inputConversionFeatureProvider = inputConversionFeatureProvider ?? throw new ArgumentNullException(nameof(inputConversionFeatureProvider));
Expand All @@ -78,54 +67,16 @@ public GrpcWorker(IFunctionsApplication application, FunctionRpcClient rpcClient

public async Task StartAsync(CancellationToken token)
{
var eventStream = _rpcClient.EventStream(cancellationToken: token);

await SendStartStreamMessageAsync(eventStream.RequestStream);

_ = StartWriterAsync(eventStream.RequestStream);
_ = StartReaderAsync(eventStream.ResponseStream);
_workerClient = await _workerClientFactory.StartClientAsync(this, token);
}

public Task StopAsync(CancellationToken token)
{
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken token) => Task.CompletedTask;

private async Task SendStartStreamMessageAsync(IClientStreamWriter<StreamingMessage> requestStream)
{
StartStream str = new StartStream()
{
WorkerId = _startupOptions.WorkerId
};

StreamingMessage startStream = new StreamingMessage()
{
StartStream = str
};

await requestStream.WriteAsync(startStream);
}

private async Task StartWriterAsync(IClientStreamWriter<StreamingMessage> requestStream)
{
await foreach (StreamingMessage rpcWriteMsg in _outputReader.ReadAllAsync())
{
await requestStream.WriteAsync(rpcWriteMsg);
}
}

private async Task StartReaderAsync(IAsyncStreamReader<StreamingMessage> responseStream)
{
while (await responseStream.MoveNext())
{
await ProcessRequestAsync(responseStream.Current);
}
}

private Task ProcessRequestAsync(StreamingMessage request)
Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
{
// Dispatch and return.
Task.Run(() => ProcessRequestCoreAsync(request));
_ = ProcessRequestCoreAsync(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update env reload case (line 116) to return capabilities and worker metadata. Can be a follow up PR I can take care of.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has the host been updated to support that? yes, we should, but either way, I'm inclined to say this should be a separate change


return Task.CompletedTask;
}

Expand Down Expand Up @@ -179,7 +130,7 @@ private async Task ProcessRequestCoreAsync(StreamingMessage request)
return;
}

await _outputWriter.WriteAsync(responseMessage);
await _workerClient!.SendMessageAsync(responseMessage);
}

internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
Expand Down
Loading