Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions src/Dapr.AspNetCore/CloudEventsMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,30 @@ public CloudEventsMiddleware(RequestDelegate next, CloudEventsMiddlewareOptions
this.options = options;
}

public Task InvokeAsync(HttpContext httpContext)
public async Task InvokeAsync(HttpContext httpContext)
{
// This middleware unwraps any requests with a cloud events (JSON) content type
// and replaces the request body + request content type so that it can be read by a
// non-cloud-events-aware piece of code.
//
// This corresponds to cloud events in the *structured* format:
// https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#13-content-modes
//
// For *binary* format, we don't have to do anything
//
// We don't support batching.
//
// The philosophy here is that we don't report an error for things we don't support, because
// that would block someone from implementing their own support for it. We only report an error
// when something we do support isn't correct.
if (!MatchesContentType(httpContext, out var charSet))
{
return this.next(httpContext);
await this.next(httpContext);
return;
}

return this.ProcessBodyAsync(httpContext, charSet);
try
{
await this.ProcessBodyAsync(httpContext, charSet);
}
catch (OperationCanceledException) when (httpContext.RequestAborted.IsCancellationRequested && httpContext.Response.HasStarted)
{
// Swallow
}
}

private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
{
JsonElement json;
if (string.Equals(charSet, Encoding.UTF8.WebName, StringComparison.OrdinalIgnoreCase))
{
json = await JsonSerializer.DeserializeAsync<JsonElement>(httpContext.Request.Body);
json = await JsonSerializer.DeserializeAsync<JsonElement>(httpContext.Request.Body, cancellationToken: httpContext.RequestAborted);
}
else
{
Expand Down Expand Up @@ -142,7 +136,7 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
if (isJson || options.SuppressJsonDecodingOfTextPayloads)
{
// Rehydrate body from JSON payload
await JsonSerializer.SerializeAsync<JsonElement>(body, data);
await JsonSerializer.SerializeAsync<JsonElement>(body, data, cancellationToken: httpContext.RequestAborted);
}
else
{
Expand Down
82 changes: 82 additions & 0 deletions test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ namespace Dapr.AspNetCore.Test;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using System.Threading;
using System;
using Shouldly;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Xunit;

public class CloudEventsMiddlewareTest
Expand Down Expand Up @@ -478,4 +481,83 @@ private static string ReadBody(Stream stream, Encoding encoding = null)
var str = encoding.GetString(bytes);
return str;
}

[Fact]
public async Task InvokeAsync_SwallowsCancellation_WhenResponseHasStarted()
{
using var cts = new CancellationTokenSource();

var serviceCollection = new ServiceCollection();
var provider = serviceCollection.BuildServiceProvider();

var app = new ApplicationBuilder(provider);
app.UseCloudEvents();

// Register terminal middleware BEFORE building the pipeline
app.Run(httpContext =>
{
// Simulate that the response has already started by replacing the response feature
httpContext.Features.Set<IHttpResponseFeature>(new TestHttpResponseFeature { HasStarted = true });

// Simulate the client disconnecting after the response started
cts.Cancel();
throw new OperationCanceledException(cts.Token);
});

var pipeline = app.Build();

var context = new DefaultHttpContext();
context.Request.ContentType = "application/cloudevents+json";
context.Request.Body = MakeBody("{ \"data\": { \"name\":\"jimmy\" } }");
context.RequestAborted = cts.Token;

// The middleware should catch and swallow the OperationCanceledException
// because the response has already started and the request was aborted
await pipeline.Invoke(context);
}

[Fact]
public async Task InvokeAsync_PropagatesCancellation_WhenResponseHasNotStarted()
{
using var cts = new CancellationTokenSource();

var serviceCollection = new ServiceCollection();
var provider = serviceCollection.BuildServiceProvider();

var app = new ApplicationBuilder(provider);
app.UseCloudEvents();

// Register terminal middleware that cancels BEFORE the response starts
app.Run(httpContext =>
{
// Response has NOT started (default), client disconnects
cts.Cancel();
throw new OperationCanceledException(cts.Token);
});

var pipeline = app.Build();

var context = new DefaultHttpContext();
context.Request.ContentType = "application/cloudevents+json";
context.Request.Body = MakeBody("{ \"data\": { \"name\":\"jimmy\" } }");
context.RequestAborted = cts.Token;

// The middleware should NOT swallow the exception when the response has not started
await Should.ThrowAsync<OperationCanceledException>(() => pipeline.Invoke(context));
}

/// <summary>
/// A test implementation of <see cref="IHttpResponseFeature"/> that allows controlling
/// the <see cref="HasStarted"/> property for unit testing.
/// </summary>
private sealed class TestHttpResponseFeature : IHttpResponseFeature
{
public int StatusCode { get; set; } = 200;
public string ReasonPhrase { get; set; }
public IHeaderDictionary Headers { get; set; } = new HeaderDictionary();
public Stream Body { get; set; } = new MemoryStream();
public bool HasStarted { get; set; }
public void OnStarting(Func<object, Task> callback, object state) { }
public void OnCompleted(Func<object, Task> callback, object state) { }
}
}
Loading