Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public ConcurrentBag(System.Collections.Generic.IEnumerable<T> collection) { }
bool System.Collections.ICollection.IsSynchronized { get { throw null; } }
object System.Collections.ICollection.SyncRoot { get { throw null; } }
public void Add(T item) { }
public void Clear() { }
public void CopyTo(T[] array, int index) { }
public System.Collections.Generic.IEnumerator<T> GetEnumerator() { throw null; }
bool System.Collections.Concurrent.IProducerConsumerCollection<T>.TryAdd(T item) { throw null; }
Expand Down Expand Up @@ -132,6 +133,7 @@ public ConcurrentQueue(System.Collections.Generic.IEnumerable<T> collection) { }
public bool IsEmpty { get { throw null; } }
bool System.Collections.ICollection.IsSynchronized { get { throw null; } }
object System.Collections.ICollection.SyncRoot { get { throw null; } }
public void Clear() { }
public void CopyTo(T[] array, int index) { }
public void Enqueue(T item) { }
public System.Collections.Generic.IEnumerator<T> GetEnumerator() { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,50 @@ public T[] ToArray()
return Array.Empty<T>();
}

/// <summary>
/// Removes all values from the <see cref="ConcurrentBag{T}"/>.
/// </summary>
public void Clear()
{
// If there are no queues in the bag, there's nothing to clear.
if (_workStealingQueues == null)
{
return;
}

// Clear the local queue.
WorkStealingQueue local = GetCurrentThreadWorkStealingQueue(forceCreate: false);
if (local != null)
{
local.LocalClear();
if (local._nextQueue == null && local == _workStealingQueues)
{
// If it's the only queue, nothing more to do.
return;
}
}

// Clear the other queues by stealing all remaining items. We freeze the bag to
// avoid having to contend with too many new items being added while we're trying
// to drain the bag. But we can't just freeze the bag and attempt to remove all
// items from every other queue, as even with freezing the bag it's dangerous to
// manipulate other queues' tail pointers and add/take counts.
bool lockTaken = false;
try
{
FreezeBag(ref lockTaken);
for (WorkStealingQueue queue = _workStealingQueues; queue != null; queue = queue._nextQueue)
{
T ignored;
while (queue.TrySteal(out ignored, take: true));
}
}
finally
{
UnfreezeBag(lockTaken);
}
}

/// <summary>
/// Returns an enumerator that iterates through the <see
/// cref="ConcurrentBag{T}"/>.
Expand Down Expand Up @@ -780,6 +824,22 @@ internal void LocalPush(T item)
}
}

/// <summary>Clears the contents of the local queue.</summary>
internal void LocalClear()
{
Debug.Assert(Environment.CurrentManagedThreadId == _ownerThreadId);
lock (this) // synchronize with steals
{
// If the queue isn't empty, reset the state to clear out all items.
if (_headIndex < _tailIndex)
{
_headIndex = _tailIndex = StartIndex;
_addTakeCount = _stealCount = 0;
Array.Clear(_array, 0, _array.Length);
}
}
}

/// <summary>Remove an item from the tail of the queue.</summary>
/// <param name="result">The removed item</param>
internal bool TryLocalPop(out T result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,26 @@ private bool TryPeek(out T result, bool resultUsed)
return false;
}

/// <summary>
/// Removes all objects from the <see cref="ConcurrentQueue{T}"/>.
/// </summary>
public void Clear()
{
lock (_crossSegmentLock)
{
// Simply substitute a new segment for the existing head/tail,
// as is done in the constructor. Operations currently in flight
// may still read from or write to an existing segment that's
// getting dropped, meaning that in flight operations may not be
// linear with regards to this clear operation. To help mitigate
// in-flight operations enqueuing onto the tail that's about to
// be dropped, we first freeze it; that'll force enqueuers to take
// this lock to synchronize and see the new tail.
_tail.EnsureFrozenForEnqueues();
_tail = _head = new Segment(InitialSegmentLength);
}
}

/// <summary>
/// Provides a multi-producer, multi-consumer thread-safe bounded segment. When the queue is full,
/// enqueues fail and return false. When the queue is empty, dequeues fail and return null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace System.Collections.Concurrent.Tests
{
public class ConcurrentBagTests : ProducerConsumerCollectionTests
public partial class ConcurrentBagTests : ProducerConsumerCollectionTests
{
protected override IProducerConsumerCollection<T> CreateProducerConsumerCollection<T>() => new ConcurrentBag<T>();
protected override IProducerConsumerCollection<int> CreateProducerConsumerCollection(IEnumerable<int> collection) => new ConcurrentBag<int>(collection);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace System.Collections.Concurrent.Tests
{
public partial class ConcurrentBagTests
{
[Theory]
[InlineData(false, 0)]
[InlineData(false, 1)]
[InlineData(false, 20)]
[InlineData(true, 0)]
[InlineData(true, 1)]
[InlineData(true, 20)]
public static void Clear_AddItemsToThisAndOtherThreads_EmptyAfterClear(bool addToLocalThread, int otherThreads)
{
var bag = new ConcurrentBag<int>();

const int ItemsPerThread = 100;

for (int repeat = 0; repeat < 2; repeat++)
{
// If desired, add items on other threads
if (addToLocalThread)
{
for (int i = 0; i < ItemsPerThread; i++) bag.Add(i);
}

// If desired, add items on other threads
int origThreadId = Environment.CurrentManagedThreadId;
Task.WaitAll((from _ in Enumerable.Range(0, otherThreads)
select Task.Factory.StartNew(() =>
{
Assert.NotEqual(origThreadId, Environment.CurrentManagedThreadId);
for (int i = 0; i < ItemsPerThread; i++) bag.Add(i);
}, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());

// Make sure we got the expected number of items, then clear, and make sure it's empty
Assert.Equal((ItemsPerThread * otherThreads) + (addToLocalThread ? ItemsPerThread : 0), bag.Count);
bag.Clear();
Assert.Equal(0, bag.Count);
}
}

[Fact]
public static void Clear_DuringEnumeration_DoesntAffectEnumeration()
{
const int ExpectedCount = 100;
var bag = new ConcurrentBag<int>(Enumerable.Range(0, ExpectedCount));
using (IEnumerator<int> e = bag.GetEnumerator())
{
bag.Clear();
int count = 0;
while (e.MoveNext()) count++;
Assert.Equal(ExpectedCount, count);
}
}

[Theory]
[InlineData(1, 10)]
[InlineData(3, 100)]
[InlineData(8, 1000)]
public static void Clear_ConcurrentUsage_NoExceptions(int threadsCount, int itemsPerThread)
{
var bag = new ConcurrentBag<int>();
Task.WaitAll((from i in Enumerable.Range(0, threadsCount) select Task.Run(() =>
{
var random = new Random();
for (int j = 0; j < itemsPerThread; j++)
{
int item;
switch (random.Next(5))
{
case 0: bag.Add(j); break;
case 1: bag.TryPeek(out item); break;
case 2: bag.TryTake(out item); break;
case 3: bag.Clear(); break;
case 4: bag.ToArray(); break;
}
}
})).ToArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace System.Collections.Concurrent.Tests
{
public class ConcurrentQueueTests : ProducerConsumerCollectionTests
public partial class ConcurrentQueueTests : ProducerConsumerCollectionTests
{
protected override IProducerConsumerCollection<T> CreateProducerConsumerCollection<T>() => new ConcurrentQueue<T>();
protected override IProducerConsumerCollection<int> CreateProducerConsumerCollection(IEnumerable<int> collection) => new ConcurrentQueue<int>(collection);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace System.Collections.Concurrent.Tests
{
public partial class ConcurrentQueueTests
{
[Theory]
[InlineData(0)]
[InlineData(1)]
[InlineData(1000)]
public void Clear_CountMatch(int count)
{
var q = new ConcurrentQueue<int>(Enumerable.Range(1, count));
Assert.Equal(count, q.Count);

// Clear initial items
q.Clear();
Assert.Equal(0, q.Count);
Assert.True(q.IsEmpty);
Assert.Equal(Enumerable.Empty<int>(), q);

// Clear again has no effect
q.Clear();
Assert.True(q.IsEmpty);

// Add more items then clear and verify
for (int i = 0; i < count; i++)
{
q.Enqueue(i);
}
Assert.Equal(Enumerable.Range(0, count), q);
q.Clear();
Assert.Equal(0, q.Count);
Assert.True(q.IsEmpty);
Assert.Equal(Enumerable.Empty<int>(), q);

// Add items and clear after each item
for (int i = 0; i < count; i++)
{
q.Enqueue(i);
Assert.Equal(1, q.Count);
q.Clear();
Assert.Equal(0, q.Count);
}
}

[Fact]
public static void Clear_DuringEnumeration_DoesntAffectEnumeration()
{
const int ExpectedCount = 100;
var q = new ConcurrentQueue<int>(Enumerable.Range(0, ExpectedCount));
using (IEnumerator<int> beforeClear = q.GetEnumerator())
{
q.Clear();
using (IEnumerator<int> afterClear = q.GetEnumerator())
{
int count = 0;
while (beforeClear.MoveNext()) count++;
Assert.Equal(ExpectedCount, count);

count = 0;
while (afterClear.MoveNext()) count++;
Assert.Equal(0, count);
}
}
}

[Theory]
[InlineData(1, 10)]
[InlineData(3, 100)]
[InlineData(8, 1000)]
public void Concurrent_Clear_NoExceptions(int threadsCount, int itemsPerThread)
{
var q = new ConcurrentQueue<int>();
Task.WaitAll((from i in Enumerable.Range(0, threadsCount) select Task.Run(() =>
{
var random = new Random();
for (int j = 0; j < itemsPerThread; j++)
{
switch (random.Next(7))
{
case 0:
int c = q.Count;
break;
case 1:
bool e = q.IsEmpty;
break;
case 2:
q.Enqueue(random.Next(int.MaxValue));
break;
case 3:
q.ToArray();
break;
case 4:
int d;
q.TryDequeue(out d);
break;
case 5:
int p;
q.TryPeek(out p);
break;
case 6:
q.Clear();
break;
}
}
})).ToArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<BuildConfigurations>
netstandard1.3;
netstandard;
netcoreapp;
</BuildConfigurations>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
<PropertyGroup>
Expand Down Expand Up @@ -86,12 +86,16 @@
<ItemGroup Condition="'$(TargetGroup)' != 'netcoreapp'">
<Compile Include="ConcurrentDictionary\ConcurrentDictionaryExtensions.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'!='netstandard'">
<ItemGroup Condition="'$(TargetGroup)' == 'netcoreapp'">
<Compile Include="ConcurrentBagTests.netcoreapp.cs" />
<Compile Include="ConcurrentQueueTests.netcoreapp.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'!='netstandard' And '$(TargetGroup)'!='netcoreapp'">
<Compile Include="$(CommonPath)\System\SerializableAttribute.cs">
<Link>Common\System\SerializableAttribute.cs</Link>
</Compile>
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'=='netstandard'">
<ItemGroup Condition="'$(TargetGroup)'=='netstandard' Or '$(TargetGroup)'=='netcoreapp'">
<Compile Include="$(CommonTestPath)\System\Collections\IEnumerable.NonGeneric.Serialization.Tests.cs">
<Link>Common\System\Collections\IEnumerable.NonGeneric.Serialization.Tests.cs</Link>
</Compile>
Expand Down