并发收集支持随机(先进先出)和特定Remove

问题描述 投票:0回答:1

我正在编写一个应用程序,该应用程序管理一个集合,该集合需要在多线程环境中频繁地将项目入队和出队。对于单线程,一个简单的列表可能就足够了,但是环境的并发性质会带来一些问题。

总结如下:

该结构需要有一个 bool TryAdd(T) 方法,最好是 Add(TKey, TValue);

该结构需要有一个 T TryRemove() 方法,该方法采用随机的或最好是第一个添加的项目(本质上实现了一个 FIFO 队列);

结构体需要有一个bool TryRemove(T)方法,最好是Remove(TKey);

到目前为止,我有三个想法,都有各自的问题:

  1. 实现一个包含 ConcurrentDictionary 和 ConcurrentQueue 的类,如下所示:
     internal class ConcurrentQueuedDictionary<TKey, TValue> where TKey : notnull
     {
        ConcurrentDictionary<TKey, TValue> _dictionary;
        ConcurrentQueue<TKey> _queue;
        object _locker;

        public bool TryAdd(TKey key, TValue value)
        {
            if (!_dictionary.TryAdd(key, value))
                return false;
            lock (_locker)
                _queue.Enqueue(key);
            return true;
        }

        public TValue TryRemove()
        {
            TKey key;
            lock (_locker) {
                if (_queue.IsEmpty)
                    return default(TValue);
                _queue.TryDequeue(out key);
            }
            TValue value;
            if (!_dictionary.Remove(key, out value))
                throw new Exception();
            return value;
        }

        public bool TryRemove(TKey key)
        {
            lock (_locker)
            {
                var copiedList = _queue.ToList();
                if (copiedList.Remove(key))
                    return false;
                _queue = new(copiedList);
            }
            return _dictionary.TryRemove(key, out _);
        }
    }

但这将需要在Remove(T)上加锁,因为它需要初始队列的完整深拷贝,而不需要删除的项目,同时不允许从其他线程读取,这意味着至少Remove()也会有这个锁,并且这个意味着经常执行的操作;

  1. 实现一个包含 ConcurrentDictionary 和 ConcurrentDictionary 的类,其中 order 在 TryAdd 上定义,具有两个属性 _addOrder 和 _removeOrder,如下所示:
       internal class ConcurrentQueuedDictionary<TKey, TValue> where TKey : notnull
       {
            ConcurrentDictionary<TKey, TValue> _dictionary;
            ConcurrentDictionary<int, TKey> _order;
            int _addOrder = 0;
            int _removeOrder = 0;
    
            public bool TryAdd(TKey key, TValue value)
            {
                if (!_dictionary.TryAdd(key, value))
                    return false;
                if (!_order.TryAdd(unchecked(Interlocked.Increment(ref _addOrder)), key))
                    throw new Exception(); //Operation faulted, mismatch of data in _order
                return true;
            }
    
            public TValue TryRemove()
            {
                TKey key;
                if (!(_order.Count > 0 && _order.Remove(unchecked(Interlocked.Increment(ref _removeOrder)), out key)))
                    return default(TValue);
                return _dictionary[key];
            }
    
            public bool TryRemove(TKey key)
            {
                if (!_order.Remove(_order.Where(item => item.Value.Equals(key)).First().Key, out _))
                    return false;
                if (!_dictionary.Remove(key, out _))
                    throw new Exception();
                return true;
            }
       }

但我很确定只是说出这个实现就已经把我列入了某个地方的精神病观察名单,因为正常工作将是一场受虐狂的噩梦;

  1. 直接锁定列表,因为无论如何,选项 1 都需要锁。

有什么想法吗?我对这个问题有点困惑,因为我对并发集合没有最好的掌握。我是否需要自定义 IProducerConsumerCollection?是否可以对并发集合元素进行随机(或排队)和特定访问?你们以前遇到过这个问题吗,也许我看问题的方式不对?

编辑:拼写错误、格式

c# multithreading collections concurrency
1个回答
2
投票

通过组合内置的并发集合来创建这样的并发结构几乎是不可能的,当然前提是正确性是最重要的并且严格禁止竞争条件。好消息是,只要受保护区域内的操作是轻量级的(其持续时间以纳秒为单位),每秒几千次获取

lock
远未达到争用开始成为问题的极限。

实现 O(1) 操作复杂度的一种方法是组合

LinkedList<T>
Dictionary<K,V>
:

/// <summary>
/// Represents a thread-safe first-in-first-out (FIFO) collection of key/value pairs,
/// where the key is unique.
/// </summary>
public class ConcurrentKeyedQueue<TKey, TValue>
{
    private readonly LinkedList<KeyValuePair<TKey, TValue>> _queue;
    private readonly Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>>
        _dictionary;

    public ConcurrentKeyedQueue(IEqualityComparer<TKey> comparer = default)
    {
        _queue = new();
        _dictionary = new(comparer);
    }

    public int Count { get { lock (_queue) return _queue.Count; } }

    public bool TryEnqueue(TKey key, TValue value)
    {
        lock (_queue)
        {
            ref var node = ref CollectionsMarshal
                .GetValueRefOrAddDefault(_dictionary, key, out bool exists);
            if (exists) return false;
            node = new(new(key, value));
            _queue.AddLast(node);
            Debug.Assert(_queue.Count == _dictionary.Count);
            return true;
        }
    }

    public bool TryDequeue(out TKey key, out TValue value)
    {
        lock (_queue)
        {
            if (_queue.Count == 0) { key = default; value = default; return false; }
            var node = _queue.First;
            (key, value) = node.Value;
            _queue.RemoveFirst();
            bool removed = _dictionary.Remove(key);
            Debug.Assert(removed);
            Debug.Assert(_queue.Count == _dictionary.Count);
            return true;
        }
    }

    public bool TryTake(TKey key, out TValue value)
    {
        lock (_queue)
        {
            bool removed = _dictionary.Remove(key, out var node);
            if (!removed) { value = default; return false; }
            _queue.Remove(node);
            (_, value) = node.Value;
            Debug.Assert(_queue.Count == _dictionary.Count);
            return true;
        }
    }

    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        lock (_queue) return _queue.ToArray();
    }
}

此组合还用于创建 LRU 缓存

您可以使用

lock
 属性测量您自己的负载环境中的 
Monitor.LockContentionCount
争用: “获取尝试获取监视器锁定时发生争用的次数。” 如果您看到每秒的增量是一位数,没有什么可担心的。

对于不使用

CollectionsMarshal.GetValueRefOrAddDefault
方法的版本,因此它可以在早于 .NET 6 的 .NET 版本上使用,请参阅此答案的 first revision

© www.soinside.com 2019 - 2024. All rights reserved.