Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit cdd1916

Browse files
authored
Merge pull request #15292 from stephentoub/concurrent_clears
Add ConcurrentBag/Queue.Clear
2 parents 323c546 + 852a581 commit cdd1916

9 files changed

Lines changed: 301 additions & 5 deletions

src/System.Collections.Concurrent/ref/System.Collections.Concurrent.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public ConcurrentBag(System.Collections.Generic.IEnumerable<T> collection) { }
6464
bool System.Collections.ICollection.IsSynchronized { get { throw null; } }
6565
object System.Collections.ICollection.SyncRoot { get { throw null; } }
6666
public void Add(T item) { }
67+
public void Clear() { }
6768
public void CopyTo(T[] array, int index) { }
6869
public System.Collections.Generic.IEnumerator<T> GetEnumerator() { throw null; }
6970
bool System.Collections.Concurrent.IProducerConsumerCollection<T>.TryAdd(T item) { throw null; }
@@ -132,6 +133,7 @@ public ConcurrentQueue(System.Collections.Generic.IEnumerable<T> collection) { }
132133
public bool IsEmpty { get { throw null; } }
133134
bool System.Collections.ICollection.IsSynchronized { get { throw null; } }
134135
object System.Collections.ICollection.SyncRoot { get { throw null; } }
136+
public void Clear() { }
135137
public void CopyTo(T[] array, int index) { }
136138
public void Enqueue(T item) { }
137139
public System.Collections.Generic.IEnumerator<T> GetEnumerator() { throw null; }

src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentBag.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,50 @@ public T[] ToArray()
407407
return Array.Empty<T>();
408408
}
409409

410+
/// <summary>
411+
/// Removes all values from the <see cref="ConcurrentBag{T}"/>.
412+
/// </summary>
413+
public void Clear()
414+
{
415+
// If there are no queues in the bag, there's nothing to clear.
416+
if (_workStealingQueues == null)
417+
{
418+
return;
419+
}
420+
421+
// Clear the local queue.
422+
WorkStealingQueue local = GetCurrentThreadWorkStealingQueue(forceCreate: false);
423+
if (local != null)
424+
{
425+
local.LocalClear();
426+
if (local._nextQueue == null && local == _workStealingQueues)
427+
{
428+
// If it's the only queue, nothing more to do.
429+
return;
430+
}
431+
}
432+
433+
// Clear the other queues by stealing all remaining items. We freeze the bag to
434+
// avoid having to contend with too many new items being added while we're trying
435+
// to drain the bag. But we can't just freeze the bag and attempt to remove all
436+
// items from every other queue, as even with freezing the bag it's dangerous to
437+
// manipulate other queues' tail pointers and add/take counts.
438+
bool lockTaken = false;
439+
try
440+
{
441+
FreezeBag(ref lockTaken);
442+
for (WorkStealingQueue queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
443+
{
444+
T ignored;
445+
while (queue.TrySteal(out ignored, take: true));
446+
}
447+
}
448+
finally
449+
{
450+
UnfreezeBag(lockTaken);
451+
}
452+
}
453+
410454
/// <summary>
411455
/// Returns an enumerator that iterates through the <see
412456
/// cref="ConcurrentBag{T}"/>.
@@ -780,6 +824,22 @@ internal void LocalPush(T item)
780824
}
781825
}
782826

827+
/// <summary>Clears the contents of the local queue.</summary>
828+
internal void LocalClear()
829+
{
830+
Debug.Assert(Environment.CurrentManagedThreadId == _ownerThreadId);
831+
lock (this) // synchronize with steals
832+
{
833+
// If the queue isn't empty, reset the state to clear out all items.
834+
if (_headIndex < _tailIndex)
835+
{
836+
_headIndex = _tailIndex = StartIndex;
837+
_addTakeCount = _stealCount = 0;
838+
Array.Clear(_array, 0, _array.Length);
839+
}
840+
}
841+
}
842+
783843
/// <summary>Remove an item from the tail of the queue.</summary>
784844
/// <param name="result">The removed item</param>
785845
internal bool TryLocalPop(out T result)

src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentQueue.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,26 @@ private bool TryPeek(out T result, bool resultUsed)
821821
return false;
822822
}
823823

824+
/// <summary>
825+
/// Removes all objects from the <see cref="ConcurrentQueue{T}"/>.
826+
/// </summary>
827+
public void Clear()
828+
{
829+
lock (_crossSegmentLock)
830+
{
831+
// Simply substitute a new segment for the existing head/tail,
832+
// as is done in the constructor. Operations currently in flight
833+
// may still read from or write to an existing segment that's
834+
// getting dropped, meaning that in flight operations may not be
835+
// linear with regards to this clear operation. To help mitigate
836+
// in-flight operations enqueuing onto the tail that's about to
837+
// be dropped, we first freeze it; that'll force enqueuers to take
838+
// this lock to synchronize and see the new tail.
839+
_tail.EnsureFrozenForEnqueues();
840+
_tail = _head = new Segment(InitialSegmentLength);
841+
}
842+
}
843+
824844
/// <summary>
825845
/// Provides a multi-producer, multi-consumer thread-safe bounded segment. When the queue is full,
826846
/// enqueues fail and return false. When the queue is empty, dequeues fail and return null.

src/System.Collections.Concurrent/tests/ConcurrentBagTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace System.Collections.Concurrent.Tests
1212
{
13-
public class ConcurrentBagTests : ProducerConsumerCollectionTests
13+
public partial class ConcurrentBagTests : ProducerConsumerCollectionTests
1414
{
1515
protected override IProducerConsumerCollection<T> CreateProducerConsumerCollection<T>() => new ConcurrentBag<T>();
1616
protected override IProducerConsumerCollection<int> CreateProducerConsumerCollection(IEnumerable<int> collection) => new ConcurrentBag<int>(collection);
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Linq;
6+
using System.Collections.Generic;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Xunit;
10+
11+
namespace System.Collections.Concurrent.Tests
12+
{
13+
public partial class ConcurrentBagTests
14+
{
15+
[Theory]
16+
[InlineData(false, 0)]
17+
[InlineData(false, 1)]
18+
[InlineData(false, 20)]
19+
[InlineData(true, 0)]
20+
[InlineData(true, 1)]
21+
[InlineData(true, 20)]
22+
public static void Clear_AddItemsToThisAndOtherThreads_EmptyAfterClear(bool addToLocalThread, int otherThreads)
23+
{
24+
var bag = new ConcurrentBag<int>();
25+
26+
const int ItemsPerThread = 100;
27+
28+
for (int repeat = 0; repeat < 2; repeat++)
29+
{
30+
// If desired, add items on other threads
31+
if (addToLocalThread)
32+
{
33+
for (int i = 0; i < ItemsPerThread; i++) bag.Add(i);
34+
}
35+
36+
// If desired, add items on other threads
37+
int origThreadId = Environment.CurrentManagedThreadId;
38+
Task.WaitAll((from _ in Enumerable.Range(0, otherThreads)
39+
select Task.Factory.StartNew(() =>
40+
{
41+
Assert.NotEqual(origThreadId, Environment.CurrentManagedThreadId);
42+
for (int i = 0; i < ItemsPerThread; i++) bag.Add(i);
43+
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());
44+
45+
// Make sure we got the expected number of items, then clear, and make sure it's empty
46+
Assert.Equal((ItemsPerThread * otherThreads) + (addToLocalThread ? ItemsPerThread : 0), bag.Count);
47+
bag.Clear();
48+
Assert.Equal(0, bag.Count);
49+
}
50+
}
51+
52+
[Fact]
53+
public static void Clear_DuringEnumeration_DoesntAffectEnumeration()
54+
{
55+
const int ExpectedCount = 100;
56+
var bag = new ConcurrentBag<int>(Enumerable.Range(0, ExpectedCount));
57+
using (IEnumerator<int> e = bag.GetEnumerator())
58+
{
59+
bag.Clear();
60+
int count = 0;
61+
while (e.MoveNext()) count++;
62+
Assert.Equal(ExpectedCount, count);
63+
}
64+
}
65+
66+
[Theory]
67+
[InlineData(1, 10)]
68+
[InlineData(3, 100)]
69+
[InlineData(8, 1000)]
70+
public static void Clear_ConcurrentUsage_NoExceptions(int threadsCount, int itemsPerThread)
71+
{
72+
var bag = new ConcurrentBag<int>();
73+
Task.WaitAll((from i in Enumerable.Range(0, threadsCount) select Task.Run(() =>
74+
{
75+
var random = new Random();
76+
for (int j = 0; j < itemsPerThread; j++)
77+
{
78+
int item;
79+
switch (random.Next(5))
80+
{
81+
case 0: bag.Add(j); break;
82+
case 1: bag.TryPeek(out item); break;
83+
case 2: bag.TryTake(out item); break;
84+
case 3: bag.Clear(); break;
85+
case 4: bag.ToArray(); break;
86+
}
87+
}
88+
})).ToArray());
89+
}
90+
}
91+
}

src/System.Collections.Concurrent/tests/ConcurrentQueueTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
namespace System.Collections.Concurrent.Tests
1212
{
13-
public class ConcurrentQueueTests : ProducerConsumerCollectionTests
13+
public partial class ConcurrentQueueTests : ProducerConsumerCollectionTests
1414
{
1515
protected override IProducerConsumerCollection<T> CreateProducerConsumerCollection<T>() => new ConcurrentQueue<T>();
1616
protected override IProducerConsumerCollection<int> CreateProducerConsumerCollection(IEnumerable<int> collection) => new ConcurrentQueue<int>(collection);
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Xunit;
10+
11+
namespace System.Collections.Concurrent.Tests
12+
{
13+
public partial class ConcurrentQueueTests
14+
{
15+
[Theory]
16+
[InlineData(0)]
17+
[InlineData(1)]
18+
[InlineData(1000)]
19+
public void Clear_CountMatch(int count)
20+
{
21+
var q = new ConcurrentQueue<int>(Enumerable.Range(1, count));
22+
Assert.Equal(count, q.Count);
23+
24+
// Clear initial items
25+
q.Clear();
26+
Assert.Equal(0, q.Count);
27+
Assert.True(q.IsEmpty);
28+
Assert.Equal(Enumerable.Empty<int>(), q);
29+
30+
// Clear again has no effect
31+
q.Clear();
32+
Assert.True(q.IsEmpty);
33+
34+
// Add more items then clear and verify
35+
for (int i = 0; i < count; i++)
36+
{
37+
q.Enqueue(i);
38+
}
39+
Assert.Equal(Enumerable.Range(0, count), q);
40+
q.Clear();
41+
Assert.Equal(0, q.Count);
42+
Assert.True(q.IsEmpty);
43+
Assert.Equal(Enumerable.Empty<int>(), q);
44+
45+
// Add items and clear after each item
46+
for (int i = 0; i < count; i++)
47+
{
48+
q.Enqueue(i);
49+
Assert.Equal(1, q.Count);
50+
q.Clear();
51+
Assert.Equal(0, q.Count);
52+
}
53+
}
54+
55+
[Fact]
56+
public static void Clear_DuringEnumeration_DoesntAffectEnumeration()
57+
{
58+
const int ExpectedCount = 100;
59+
var q = new ConcurrentQueue<int>(Enumerable.Range(0, ExpectedCount));
60+
using (IEnumerator<int> beforeClear = q.GetEnumerator())
61+
{
62+
q.Clear();
63+
using (IEnumerator<int> afterClear = q.GetEnumerator())
64+
{
65+
int count = 0;
66+
while (beforeClear.MoveNext()) count++;
67+
Assert.Equal(ExpectedCount, count);
68+
69+
count = 0;
70+
while (afterClear.MoveNext()) count++;
71+
Assert.Equal(0, count);
72+
}
73+
}
74+
}
75+
76+
[Theory]
77+
[InlineData(1, 10)]
78+
[InlineData(3, 100)]
79+
[InlineData(8, 1000)]
80+
public void Concurrent_Clear_NoExceptions(int threadsCount, int itemsPerThread)
81+
{
82+
var q = new ConcurrentQueue<int>();
83+
Task.WaitAll((from i in Enumerable.Range(0, threadsCount) select Task.Run(() =>
84+
{
85+
var random = new Random();
86+
for (int j = 0; j < itemsPerThread; j++)
87+
{
88+
switch (random.Next(7))
89+
{
90+
case 0:
91+
int c = q.Count;
92+
break;
93+
case 1:
94+
bool e = q.IsEmpty;
95+
break;
96+
case 2:
97+
q.Enqueue(random.Next(int.MaxValue));
98+
break;
99+
case 3:
100+
q.ToArray();
101+
break;
102+
case 4:
103+
int d;
104+
q.TryDequeue(out d);
105+
break;
106+
case 5:
107+
int p;
108+
q.TryPeek(out p);
109+
break;
110+
case 6:
111+
q.Clear();
112+
break;
113+
}
114+
}
115+
})).ToArray());
116+
}
117+
}
118+
}

src/System.Collections.Concurrent/tests/Configurations.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
<BuildConfigurations>
55
netstandard1.3;
66
netstandard;
7+
netcoreapp;
78
</BuildConfigurations>
89
</PropertyGroup>
910
</Project>

src/System.Collections.Concurrent/tests/System.Collections.Concurrent.Tests.csproj

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<?xml version="1.0" encoding="utf-8"?>
1+
<?xml version="1.0" encoding="utf-8"?>
22
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
33
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
44
<PropertyGroup>
@@ -86,12 +86,16 @@
8686
<ItemGroup Condition="'$(TargetGroup)' != 'netcoreapp'">
8787
<Compile Include="ConcurrentDictionary\ConcurrentDictionaryExtensions.cs" />
8888
</ItemGroup>
89-
<ItemGroup Condition="'$(TargetGroup)'!='netstandard'">
89+
<ItemGroup Condition="'$(TargetGroup)' == 'netcoreapp'">
90+
<Compile Include="ConcurrentBagTests.netcoreapp.cs" />
91+
<Compile Include="ConcurrentQueueTests.netcoreapp.cs" />
92+
</ItemGroup>
93+
<ItemGroup Condition="'$(TargetGroup)'!='netstandard' And '$(TargetGroup)'!='netcoreapp'">
9094
<Compile Include="$(CommonPath)\System\SerializableAttribute.cs">
9195
<Link>Common\System\SerializableAttribute.cs</Link>
9296
</Compile>
9397
</ItemGroup>
94-
<ItemGroup Condition="'$(TargetGroup)'=='netstandard'">
98+
<ItemGroup Condition="'$(TargetGroup)'=='netstandard' Or '$(TargetGroup)'=='netcoreapp'">
9599
<Compile Include="$(CommonTestPath)\System\Collections\IEnumerable.NonGeneric.Serialization.Tests.cs">
96100
<Link>Common\System\Collections\IEnumerable.NonGeneric.Serialization.Tests.cs</Link>
97101
</Compile>

0 commit comments

Comments
 (0)