Skip to content

Commit 70695d9

Browse files
fix(streams): prevent race condition in ChannelSource on channel completion (#7941) (#7951)
* fix(streams): prevent race condition in ChannelSource on channel completion (#7940) Fixed a race condition in ChannelSourceLogic that caused intermittent NullReferenceException when completing a ChannelWriter while the stream was waiting for data. The issue occurred because two async callbacks could fire simultaneously when the channel writer completed: 1. The _reader.Completion continuation → OnReaderComplete → CompleteStage 2. The WaitToReadAsync continuation → OnValueRead(false) → CompleteStage Both paths could pass the IsStageCompleted check before either completed the stage, leading to concurrent access of stage internals. The fix adds an atomic flag (_completing) using Interlocked.Exchange to ensure only one completion path ever executes. This is applied to: - OnReaderComplete - channel completion callback - OnValueRead - when data is not available - OnValueReadFailure - when read fails - OnPull - synchronous completion path * refactor: use CompareExchange instead of Exchange for atomic flag CompareExchange is more semantically correct - it only sets the value if it's currently 0, rather than unconditionally setting it.
1 parent 2492bdb commit 70695d9

File tree

2 files changed

+85
-1
lines changed

2 files changed

+85
-1
lines changed

src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
//-----------------------------------------------------------------------
77

88
using System;
9+
using System.Collections.Concurrent;
10+
using System.Collections.Immutable;
11+
using System.Linq;
912
using System.Threading;
1013
using System.Threading.Channels;
1114
using System.Threading.Tasks;
@@ -92,5 +95,52 @@ public async Task ChannelSource_must_read_incoming_events()
9295
probe.ExpectNext(4);
9396
probe.ExpectNext(5);
9497
}
98+
99+
/// <summary>
100+
/// Reproduces GitHub issue #7940: NullReferenceException when completing
101+
/// a ChannelReader while the stream is waiting for data.
102+
/// </summary>
103+
[Fact(DisplayName = "ChannelSource should not throw NRE when completing channel while waiting for data")]
104+
public async Task ChannelSource_should_not_throw_NRE_when_completing_channel_while_waiting_for_data()
105+
{
106+
// This test reproduces the race condition from #7940
107+
// Run multiple iterations to increase chance of hitting the race
108+
for (var iteration = 0; iteration < 20; iteration++)
109+
{
110+
var channel = Channel.CreateUnbounded<string>();
111+
var processed = new ConcurrentBag<string>();
112+
113+
// Exactly matches the repro from the issue - using ImmutableArray.Create and Sink.Ignore<Done>
114+
var streamTask = ChannelSource.FromReader(channel.Reader)
115+
.Select(ImmutableArray.Create)
116+
.Select(s =>
117+
{
118+
foreach (var item in s) processed.Add(item);
119+
return Done.Instance;
120+
})
121+
.ToMaterialized(Sink.Ignore<Done>(), Keep.Right)
122+
.Run(_materializer);
123+
124+
// Write some items
125+
var testInput = Enumerable.Range(1, 5).Select(i => i.ToString()).ToList();
126+
foreach (var item in testInput)
127+
await channel.Writer.WriteAsync(item);
128+
129+
// Wait 1 second for stream to process items and then wait for more data
130+
// This is the key to reproducing the race - the stream needs to be
131+
// waiting in WaitToReadAsync when we complete the writer (channel is empty)
132+
await Task.Delay(1000);
133+
134+
// Complete the channel - this can cause NRE if there's a race
135+
// between OnReaderComplete and the async continuation of WaitToReadAsync
136+
channel.Writer.Complete();
137+
138+
// Stream should complete cleanly without exceptions
139+
await streamTask;
140+
141+
// Verify all items were processed
142+
processed.Count.Should().Be(5, $"iteration {iteration} failed");
143+
}
144+
}
95145
}
96146
}

src/core/Akka.Streams/Implementation/ChannelSources.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//-----------------------------------------------------------------------
77

88
using System;
9+
using System.Threading;
910
using System.Threading.Channels;
1011
using System.Threading.Tasks;
1112
using Akka.Streams.Stage;
@@ -21,6 +22,10 @@ sealed class ChannelSourceLogic<T> : OutGraphStageLogic
2122
private readonly Action<Exception> _onReaderComplete;
2223
private readonly Action<Task<bool>> _onReadReady;
2324

25+
// Flag to prevent race condition between OnReaderComplete and OnValueRead
26+
// when channel completion and WaitToReadAsync fire simultaneously (issue #7940)
27+
private int _completing;
28+
2429
public ChannelSourceLogic(SourceShape<T> source, Outlet<T> outlet,
2530
ChannelReader<T> reader) : base(source)
2631
{
@@ -44,20 +49,41 @@ public ChannelSourceLogic(SourceShape<T> source, Outlet<T> outlet,
4449

4550
private void OnReaderComplete(Exception reason)
4651
{
52+
// Use atomic compare-exchange to ensure only one completion path runs
53+
// This prevents race with OnValueRead when both fire simultaneously
54+
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
55+
return; // Already completing from another path
56+
4757
if (reason is null)
4858
CompleteStage();
4959
else
5060
FailStage(reason);
5161
}
5262

53-
private void OnValueReadFailure(Exception reason) => FailStage(reason);
63+
private void OnValueReadFailure(Exception reason)
64+
{
65+
// Use atomic compare-exchange to ensure only one completion path runs
66+
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
67+
return; // Already completing from another path
68+
69+
FailStage(reason);
70+
}
5471

5572
private void OnValueRead(bool dataAvailable)
5673
{
5774
if (dataAvailable && _reader.TryRead(out var element))
75+
{
5876
Push(_outlet, element);
77+
}
5978
else
79+
{
80+
// Use atomic compare-exchange to ensure only one completion path runs
81+
// This prevents race with OnReaderComplete when both fire simultaneously
82+
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
83+
return; // Already completing from another path
84+
6085
CompleteStage();
86+
}
6187
}
6288

6389
public override void OnPull()
@@ -73,9 +99,17 @@ public override void OnPull()
7399
{
74100
var dataAvailable = continuation.GetAwaiter().GetResult();
75101
if (dataAvailable && _reader.TryRead(out element))
102+
{
76103
Push(_outlet, element);
104+
}
77105
else
106+
{
107+
// Use atomic compare-exchange to ensure only one completion path runs
108+
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
109+
return; // Already completing from another path
110+
78111
CompleteStage();
112+
}
79113
}
80114
else
81115
continuation.AsTask().ContinueWith(_onReadReady);

0 commit comments

Comments
 (0)