Skip to content

Commit 1750fed

Browse files
authored
Fix behaviour around large buffers and early reader completion (#36)
* Fix behaviour around large buffers and early reader completion * Cleanup pipe reader on behalf of the target config * Tweak test for early completion/pipe blocking logic
1 parent 0eaab5c commit 1750fed

2 files changed

Lines changed: 60 additions & 5 deletions

File tree

src/TurnerSoftware.Aqueduct/PipeBifurcation.cs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,26 @@ public BifurcationState(BifurcationTargetConfig<TResult> config)
3030

3131
public bool IsCompleted { get; private set; }
3232

33+
private async Task<TResult> RunReaderWithCleanup(CancellationToken cancellationToken)
34+
{
35+
try
36+
{
37+
var result = await Config.Reader(Pipe.Reader, cancellationToken);
38+
await Pipe.Reader.CompleteAsync();
39+
return result;
40+
}
41+
catch (Exception ex)
42+
{
43+
await Pipe.Reader.CompleteAsync(ex);
44+
throw;
45+
}
46+
}
47+
3348
public void StartReader(CancellationToken cancellationToken)
3449
{
3550
try
3651
{
37-
ReaderTask = Config.Reader(Pipe.Reader, cancellationToken);
52+
ReaderTask = RunReaderWithCleanup(cancellationToken);
3853
}
3954
catch (Exception ex)
4055
{
@@ -55,10 +70,19 @@ public async ValueTask<bool> WriteAsync(ReadOnlySequence<byte> buffer, Cancellat
5570
return false;
5671
}
5772

58-
//Await faulted readers to correctly bubble exceptions
59-
if (ReaderTask is not null && ReaderTask.IsFaulted)
73+
if (ReaderTask is not null)
6074
{
61-
await ReaderTask;
75+
//Await faulted readers to correctly bubble exceptions
76+
if (ReaderTask.IsFaulted)
77+
{
78+
await ReaderTask;
79+
}
80+
81+
//If the reader task finishes early for some other reason
82+
if (ReaderTask.IsCompleted)
83+
{
84+
return false;
85+
}
6286
}
6387

6488
var bytesToRead = (int)buffer.Length;
@@ -68,7 +92,7 @@ public async ValueTask<bool> WriteAsync(ReadOnlySequence<byte> buffer, Cancellat
6892
}
6993

7094
var destination = Pipe.Writer.GetMemory(bytesToRead);
71-
buffer.CopyTo(destination.Span);
95+
buffer.Slice(0, bytesToRead).CopyTo(destination.Span);
7296

7397
Pipe.Writer.Advance(bytesToRead);
7498

tests/TurnerSoftware.Aqueduct.Tests/PipeBifurcationTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,37 @@ await PipeBifurcation.BifurcatedReadAsync(
7272
targetReaderHasCompleted.Should().BeTrue();
7373
}
7474

75+
[TestMethod]
76+
public async Task SingleTarget_ReaderCompletesEarly_Success()
77+
{
78+
var source = new Pipe();
79+
var buffer = new byte[16];
80+
await source.Writer.WriteAsync(buffer);
81+
var targetReaderHasCompleted = false;
82+
83+
var bifurcationTask = PipeBifurcation.BifurcatedReadAsync(
84+
source.Reader,
85+
new BifurcationSourceConfig(minReadBufferSize: -1),
86+
new BifurcationTargetConfig(
87+
async (Stream reader, CancellationToken cancellationToken) =>
88+
{
89+
var buffer = new byte[1];
90+
await reader.ReadAsync(buffer, cancellationToken);
91+
targetReaderHasCompleted = true;
92+
},
93+
//These are set to exaggerate the problem where exiting early
94+
//still has the pipe being fed bytes till the point it blocks
95+
blockAfter: 16,
96+
resumeAfter: 8
97+
)
98+
);
99+
100+
await source.Writer.WriteAsync(buffer);
101+
await source.Writer.CompleteAsync();
102+
await bifurcationTask;
103+
targetReaderHasCompleted.Should().BeTrue();
104+
}
105+
75106
[TestMethod]
76107
public async Task MultiTarget_DefaultConfig_Success()
77108
{

0 commit comments

Comments
 (0)