Skip to content

Commit dc0aa80

Browse files
TyrrrzCopilot
andauthored
Add polyfills for Parallel.ForAsync(...) and Parallel.ForEachAsync(...) (#63)
Co-authored-by: Copilot <[email protected]>
1 parent 28f3002 commit dc0aa80

9 files changed

Lines changed: 492 additions & 6 deletions

File tree

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using FluentAssertions;
7+
using Xunit;
8+
9+
namespace PolyShim.Tests.Net60;
10+
11+
public class ParallelTests
12+
{
13+
[Fact]
14+
public async Task ForEachAsync_Test()
15+
{
16+
// Arrange
17+
var items = new[] { 'a', 'b', 'c', 'd', 'e' };
18+
var results = new ConcurrentBag<char>();
19+
20+
// Act
21+
await Parallel.ForEachAsync(
22+
items,
23+
async (item, cancellationToken) =>
24+
{
25+
await Task.Delay(10, cancellationToken);
26+
results.Add(item);
27+
}
28+
);
29+
30+
// Assert
31+
results.Should().BeEquivalentTo(items);
32+
}
33+
34+
[Fact]
35+
public async Task ForEachAsync_Cancellation_Test()
36+
{
37+
// Arrange
38+
var items = new[] { 'a', 'b', 'c', 'd', 'e' };
39+
var cancellationToken = new CancellationToken(true);
40+
41+
// Act & assert
42+
var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
43+
await Parallel.ForEachAsync(
44+
items,
45+
cancellationToken,
46+
async (_, innerCancellationToken) => await Task.Delay(10, innerCancellationToken)
47+
)
48+
);
49+
50+
ex.CancellationToken.Should().Be(cancellationToken);
51+
}
52+
53+
[Fact]
54+
public async Task ForEachAsync_MaxDegreeOfParallelism_Test()
55+
{
56+
// Arrange
57+
var items = new[] { 1, 2, 3, 4, 5 };
58+
var currentParallelism = 0;
59+
var maxObservedParallelism = 0;
60+
61+
// Act
62+
await Parallel.ForEachAsync(
63+
items,
64+
new ParallelOptions { MaxDegreeOfParallelism = 2 },
65+
async (_, cancellationToken) =>
66+
{
67+
Interlocked.Increment(ref currentParallelism);
68+
69+
int initialValue,
70+
newValue;
71+
do
72+
{
73+
initialValue = maxObservedParallelism;
74+
newValue = Math.Max(initialValue, currentParallelism);
75+
} while (
76+
Interlocked.CompareExchange(ref maxObservedParallelism, newValue, initialValue)
77+
!= initialValue
78+
);
79+
80+
await Task.Delay(50, cancellationToken);
81+
Interlocked.Decrement(ref currentParallelism);
82+
}
83+
);
84+
85+
// Assert
86+
maxObservedParallelism.Should().BeLessThanOrEqualTo(2);
87+
}
88+
89+
[Fact]
90+
public async Task ForEachAsync_AsyncEnumerable_Test()
91+
{
92+
// Arrange
93+
async IAsyncEnumerable<int> GetItemsAsync()
94+
{
95+
for (var i = 1; i <= 5; i++)
96+
{
97+
await Task.Delay(10);
98+
yield return i;
99+
}
100+
}
101+
102+
var results = new ConcurrentBag<int>();
103+
104+
// Act
105+
await Parallel.ForEachAsync(
106+
GetItemsAsync(),
107+
async (item, cancellationToken) =>
108+
{
109+
await Task.Delay(10, cancellationToken);
110+
results.Add(item);
111+
}
112+
);
113+
114+
// Assert
115+
results.Should().BeEquivalentTo([1, 2, 3, 4, 5]);
116+
}
117+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using FluentAssertions;
5+
using Xunit;
6+
7+
namespace PolyShim.Tests.Net80;
8+
9+
public class ParallelTests
10+
{
11+
[Fact]
12+
public async Task ForAsync_Test()
13+
{
14+
// Act
15+
var sum = 0;
16+
await Parallel.ForAsync(
17+
1,
18+
6,
19+
async (i, cancellationToken) =>
20+
{
21+
await Task.Delay(10, cancellationToken);
22+
Interlocked.Add(ref sum, i);
23+
}
24+
);
25+
26+
// Assert
27+
sum.Should().Be(15);
28+
}
29+
30+
[Fact]
31+
public async Task ForAsync_Cancellation_Test()
32+
{
33+
// Arrange
34+
var cancellationToken = new CancellationToken(true);
35+
36+
// Act & assert
37+
var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
38+
await Parallel.ForAsync(
39+
1,
40+
6,
41+
cancellationToken,
42+
async (_, innerCancellationToken) =>
43+
{
44+
await Task.Delay(10, innerCancellationToken);
45+
}
46+
)
47+
);
48+
49+
ex.CancellationToken.Should().Be(cancellationToken);
50+
}
51+
52+
[Fact]
53+
public async Task ForAsync_MaxDegreeOfParallelism_Test()
54+
{
55+
// Arrange
56+
var currentParallelism = 0;
57+
var maxObservedParallelism = 0;
58+
59+
// Act
60+
await Parallel.ForAsync(
61+
1,
62+
21,
63+
new ParallelOptions { MaxDegreeOfParallelism = 4 },
64+
async (_, cancellationToken) =>
65+
{
66+
var parallelism = Interlocked.Increment(ref currentParallelism);
67+
try
68+
{
69+
maxObservedParallelism = Math.Max(maxObservedParallelism, parallelism);
70+
await Task.Delay(50, cancellationToken);
71+
}
72+
finally
73+
{
74+
Interlocked.Decrement(ref currentParallelism);
75+
}
76+
}
77+
);
78+
79+
// Assert
80+
maxObservedParallelism.Should().BeLessThanOrEqualTo(4);
81+
}
82+
}

PolyShim/Net60/Parallel.cs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
#if FEATURE_TASK
2+
#if (NETCOREAPP && !NET6_0_OR_GREATER) || (NETFRAMEWORK) || (NETSTANDARD)
3+
#nullable enable
4+
// ReSharper disable RedundantUsingDirective
5+
// ReSharper disable CheckNamespace
6+
// ReSharper disable InconsistentNaming
7+
// ReSharper disable PartialTypeWithSinglePart
8+
9+
using System;
10+
using System.Collections.Generic;
11+
using System.Linq;
12+
using System.Threading;
13+
using System.Threading.Tasks;
14+
15+
internal static partial class PolyfillExtensions
16+
{
17+
extension(Parallel)
18+
{
19+
// Task instead of ValueTask for maximum compatibility
20+
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-ienumerable((-0))-system-threading-tasks-paralleloptions-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
21+
public static async Task ForEachAsync<T>(
22+
IEnumerable<T> source,
23+
ParallelOptions parallelOptions,
24+
Func<T, CancellationToken, Task> body
25+
)
26+
{
27+
using var semaphore = new SemaphoreSlim(
28+
parallelOptions.MaxDegreeOfParallelism switch
29+
{
30+
> 0 => parallelOptions.MaxDegreeOfParallelism,
31+
_ => Environment.ProcessorCount,
32+
}
33+
);
34+
35+
var tasks = new List<Task>();
36+
foreach (var item in source)
37+
{
38+
tasks.Add(
39+
Task.Factory.StartNew(
40+
async () =>
41+
{
42+
#if !NETFRAMEWORK || NET45_OR_GREATER
43+
await semaphore
44+
.WaitAsync(parallelOptions.CancellationToken)
45+
.ConfigureAwait(false);
46+
#else
47+
semaphore.Wait(parallelOptions.CancellationToken);
48+
#endif
49+
try
50+
{
51+
await body(item, parallelOptions.CancellationToken)
52+
.ConfigureAwait(false);
53+
}
54+
finally
55+
{
56+
semaphore.Release();
57+
}
58+
},
59+
parallelOptions.CancellationToken,
60+
TaskCreationOptions.None,
61+
parallelOptions.TaskScheduler ?? TaskScheduler.Default
62+
)
63+
.Unwrap()
64+
);
65+
}
66+
67+
await Task.WhenAll(tasks).ConfigureAwait(false);
68+
}
69+
70+
// Task instead of ValueTask for maximum compatibility
71+
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-ienumerable((-0))-system-threading-cancellationtoken-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
72+
public static async Task ForEachAsync<T>(
73+
IEnumerable<T> source,
74+
CancellationToken cancellationToken,
75+
Func<T, CancellationToken, Task> body
76+
) =>
77+
await ForEachAsync(
78+
source,
79+
new ParallelOptions { CancellationToken = cancellationToken },
80+
body
81+
)
82+
.ConfigureAwait(false);
83+
84+
// Task instead of ValueTask for maximum compatibility
85+
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-ienumerable((-0))-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
86+
public static async Task ForEachAsync<T>(
87+
IEnumerable<T> source,
88+
Func<T, CancellationToken, Task> body
89+
) => await ForEachAsync(source, CancellationToken.None, body).ConfigureAwait(false);
90+
91+
#if FEATURE_ASYNCINTERFACES
92+
// Task instead of ValueTask for maximum compatibility
93+
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-iasyncenumerable((-0))-system-threading-tasks-paralleloptions-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
94+
public static async Task ForEachAsync<T>(
95+
IAsyncEnumerable<T> source,
96+
ParallelOptions parallelOptions,
97+
Func<T, CancellationToken, Task> body
98+
)
99+
{
100+
using var semaphore = new SemaphoreSlim(
101+
parallelOptions.MaxDegreeOfParallelism switch
102+
{
103+
> 0 => parallelOptions.MaxDegreeOfParallelism,
104+
_ => Environment.ProcessorCount,
105+
}
106+
);
107+
108+
var tasks = new List<Task>();
109+
110+
await foreach (var item in source.WithCancellation(parallelOptions.CancellationToken))
111+
{
112+
tasks.Add(Task.Factory.StartNew(
113+
async () =>
114+
{
115+
await semaphore.WaitAsync(parallelOptions.CancellationToken).ConfigureAwait(false);
116+
117+
try
118+
{
119+
await body(item, parallelOptions.CancellationToken).ConfigureAwait(false);
120+
}
121+
finally
122+
{
123+
semaphore.Release();
124+
}
125+
},
126+
parallelOptions.CancellationToken,
127+
TaskCreationOptions.None,
128+
parallelOptions.TaskScheduler ?? TaskScheduler.Default
129+
).Unwrap());
130+
}
131+
132+
await Task.WhenAll(tasks).ConfigureAwait(false);
133+
}
134+
135+
// Task instead of ValueTask for maximum compatibility
136+
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-iasyncenumerable((-0))-system-threading-cancellationtoken-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
137+
public static async Task ForEachAsync<T>(
138+
IAsyncEnumerable<T> source,
139+
CancellationToken cancellationToken,
140+
Func<T, CancellationToken, Task> body
141+
) =>
142+
await ForEachAsync(
143+
source,
144+
new ParallelOptions { CancellationToken = cancellationToken },
145+
body
146+
).ConfigureAwait(false);
147+
148+
// Task instead of ValueTask for maximum compatibility
149+
// https://learn.microsoft.com/dotnet/api/system.threading.tasks.parallel.foreachasync#system-threading-tasks-parallel-foreachasync-1(system-collections-generic-iasyncenumerable((-0))-system-func((-0-system-threading-cancellationtoken-system-threading-tasks-valuetask)))
150+
public static async Task ForEachAsync<T>(
151+
IAsyncEnumerable<T> source,
152+
Func<T, CancellationToken, Task> body
153+
) => await ForEachAsync(source, CancellationToken.None, body).ConfigureAwait(false);
154+
#endif
155+
}
156+
}
157+
#endif
158+
#endif

0 commit comments

Comments
 (0)