我正在寻找 C++ 中的无锁数据结构来替换以下内容:
pthread_mutex_lock(plock);
set.insert(element);
pthread_mutex_unlock(plock);
该集合应支持
.insert()
和 .size()
,复杂度最多为 O(logN),具有迭代器,并且应该能够使用自定义比较器保持其顺序。基本上与 Java 中的 ConcurrentSkipListSet
功能相同。理想情况下,它应该是平台无关的。
我正在查看CDS:http://libcds.sourceforge.net/doc/cds-api/modules.html但不确定哪种数据结构可以实现目标。该文档对于某些数据结构来说并没有真正的复杂性。
任何建议都会很棒,谢谢!
使用 C++11,编写自己的代码非常容易:
template <typename T, typename Compare = std::less<T>>
class concurrent_set
{
private:
set::set<T, Compare> set_;
std::mutex mutex_;
public:
typedef typename std::set<T, Compare>::iterator iterator;
// etc.
std::pair<iterator, bool>
insert(const T& val) {
std::unique_lock<std::mutex> lock(mutex_);
return set_.insert(val);
}
size_type size() const {
std::unique_lock<std::mutex> lock(mutex_);
return set_.size();
}
// same idea with other functions
};
没有 C++11,也有
boost::mutex
。
这里的实现不是无锁的,但具有足够的并行性来满足大多数实际需求。这个想法是,您不必锁定整个集合 - 您可以将项目分配到存储桶中,以便线程锁定不同的存储桶并且不太可能发生冲突:
#include <atomic>
#include <vector>
#include <unordered_set>
#include <immintrin.h>
#include <thread>
struct SpinLock {
std::atomic_flag *pSync_ = nullptr;
SpinLock() = default;
SpinLock(std::atomic_flag& sync) : pSync_(&sync) {
while(pSync_->test_and_set(std::memory_order_acq_rel)) {
__builtin_ia32_pause ();
}
}
~SpinLock() {
if(pSync_ != nullptr) {
pSync_->clear(std::memory_order_release);
}
}
};
struct SyncArray {
std::atomic_flag *pSyncs_ = nullptr;
SyncArray() = default;
explicit SyncArray(const size_t nItems) {
pSyncs_ = static_cast<std::atomic_flag*>(malloc(nItems * sizeof(std::atomic_flag)));
#pragma omp parallel for schedule(static, kRamPageBytes / sizeof(std::atomic_flag))
for (size_t i = 0; i < nItems; i++) {
new (pSyncs_ + i) std::atomic_flag ATOMIC_FLAG_INIT;
}
}
SyncArray& operator=(SyncArray&& src) {
if(this != &src) {
std::swap(pSyncs_, src.pSyncs_);
}
return *this;
}
~SyncArray() {
free(pSyncs_);
}
};
// It's unordered because it's too hard to maintain order with concurrency
template<typename TItem> struct ConcurrentSet
{
std::vector<std::unordered_set<TItem>> items_;
SyncArray saItems_;
static size_t GetConcurrency()
{
return ((std::thread::hardware_concurrency()+1) * 3 + 2);
}
ConcurrentSet() : items_(GetConcurrency()), saItems_(GetConcurrency()) { }
void Include(const TItem& item)
{
const SingVCI iBucket = std::hash<TItem>{}(item) % items_.size();
SpinLock lock(saItems_.pSyncs_[iBucket]);
items_[iBucket].insert(item);
}
void Exclude(const TItem& item)
{
const SingVCI iBucket = std::hash<TItem>{}(item) % items_.size();
SpinLock lock(saItems_.pSyncs_[iBucket]);
items_[iBucket].erase(item);
}
bool Contains(const TItem& item)
{
const SingVCI iBucket = std::hash<TItem>{}(item) % items_.size();
SpinLock lock(saItems_.pSyncs_[iBucket]);
return items_[iBucket].find(item) != items_[iBucket].end();
}
};