我的工作假设是,当与System.Collections.Concurrent集合(包括ConcurrentDictionary)一起使用时,LINQ是线程安全的。
(其他Overflow帖子似乎同意:link)
但是,对LINQ OrderBy扩展方法的实现的检查表明,对于实现ICollection的并发集合的子集(例如ConcurrentDictionary),它似乎不是线程安全的。
OrderedEnumerable GetEnumerator(source here)构造一个Buffer结构实例(source here),它试图将集合转换为ICollection(ConcurrentDictionary实现),然后执行一个collection.CopyTo,其数组初始化为集合的大小。
因此,如果在OrderBy操作期间ConcurrentDictionary(在这种情况下为具体的ICollection)的大小增加,在初始化数组和复制到它之间,此操作将抛出。
以下测试代码显示此异常:
(注意:我很欣赏在一个线程安全的集合上执行一个OrderBy,这个集合在你下面发生变化并没有那么有意义,但我不相信它应该抛出)
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Program
{
class Program
{
static void Main(string[] args)
{
try
{
int loop = 0;
while (true) //Run many loops until exception thrown
{
Console.WriteLine($"Loop: {++loop}");
_DoConcurrentDictionaryWork().Wait();
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
private static async Task _DoConcurrentDictionaryWork()
{
var concurrentDictionary = new ConcurrentDictionary<int, object>();
var keyGenerator = new Random();
var tokenSource = new CancellationTokenSource();
var orderByTaskLoop = Task.Run(() =>
{
var token = tokenSource.Token;
while (token.IsCancellationRequested == false)
{
//Keep ordering concurrent dictionary on a loop
var orderedPairs = concurrentDictionary.OrderBy(x => x.Key).ToArray(); //THROWS EXCEPTION HERE
//...do some more work with ordered snapshot...
}
});
var updateDictTaskLoop = Task.Run(() =>
{
var token = tokenSource.Token;
while (token.IsCancellationRequested == false)
{
//keep mutating dictionary on a loop
var key = keyGenerator.Next(0, 1000);
concurrentDictionary[key] = new object();
}
});
//Wait for 1 second
await Task.Delay(TimeSpan.FromSeconds(1));
//Cancel and dispose token
tokenSource.Cancel();
tokenSource.Dispose();
//Wait for orderBy and update loops to finish (now token cancelled)
await Task.WhenAll(orderByTaskLoop, updateDictTaskLoop);
}
}
}
OrderBy抛出异常会导致以下几个可能的结论之一:
1)我对LINQ作为并发集合的线程安全的假设是不正确的,并且只能在LINQ查询期间对集合执行LINQ(无论它们是并发还是非并发)是安全的
2)LINQ OrderBy的实现存在一个错误,并且实现尝试将源集合转换为ICollection并尝试执行集合副本是不正确的(并且它应该简单地执行到迭代IEnumerable的默认行为) )。
3)我误解了这里发生了什么......
非常感谢!
在任何地方都没有说OrderBy
(或其他LINQ方法)应该始终使用源GetEnumerator
的IEnumerable
,或者它应该在并发集合上是线程安全的。所有承诺的都是这种方法
根据键按升序对序列的元素进行排序。
ConcurrentDictionary
在某种全球意义上也不是线程安全的。对于在其上执行的其他操作,它是线程安全的。甚至更多,文档说
ConcurrentDictionary的所有公共成员和受保护成员都是线程安全的,可以从多个线程同时使用。但是,通过ConcurrentDictionary实现的其中一个接口访问的成员(包括扩展方法)不能保证是线程安全的,并且可能需要由调用者同步。
所以,你的理解是正确的(OrderBy
将看到你传递给它的IEnumerable
真的是ICollection
,然后将获得该集合的长度,分配该大小的缓冲区,然后将调用ICollection.CopyTo
,这当然不是任何类型的线程安全收藏),但它不是OrderBy
的一个错误,因为OrderBy
和ConcurrentDictionary
都没有承诺你所承担的。
如果你想在OrderBy
上以线程安全的方式执行ConcurrentDictionary
,你需要依赖于承诺线程安全的方法。例如:
// note: this is NOT IEnumerable.ToArray()
// but public ToArray() method of ConcurrentDictionary itself
// it is guaranteed to be thread safe with respect to other operations
// on this dictionary
var snapshot = concurrentDictionary.ToArray();
// we are working on snapshot so no one other thread can modify it
// of course at this point real contents of dictionary might not be
// the same as our snapshot
var sorted = snapshot.OrderBy(c => c.Key);
如果你不想分配额外的数组(使用ToArray
),你可以使用Select(c => c)
,它可以在这种情况下工作,但是我们再次处于没有实际意义的领域并依赖某些东西在没有得到承诺的情况下可以安全使用to(Select
也不会总是枚举你的集合。如果集合是数组或列表 - 它将快捷方式并使用索引器代替)。所以你可以像这样创建扩展方法:
public static class Extensions {
public static IEnumerable<T> ForceEnumerate<T>(this ICollection<T> collection) {
foreach (var item in collection)
yield return item;
}
}
如果你想要安全并且不想分配数组,请使用它:
concurrentDictionary.ForceEnumerate().OrderBy(c => c.Key).ToArray();
在这种情况下,我们强制枚举ConcurrentDictionary
(我们知道这是安全的文档),然后将其传递给OrderBy
,知道它不会对纯粹的IEnumerable
造成任何伤害。请注意,正如mjwills在评论中正确指出的那样,这与ToArray
不完全相同,因为ToArray
生成快照(锁定集合防止在构建数组时进行修改)并且Select
\ yield
不会获取任何锁定(因此可能会添加项目当枚举正在进行时。虽然我怀疑在执行相关描述的事情时很重要 - 在OrderBy
完成后的两种情况下 - 你都不知道你的有序结果是否反映了当前的收集状态。