C++ 的并发集?

问题描述 投票:0回答:2

我正在寻找 C++ 中的无锁数据结构来替换以下内容:

pthread_mutex_lock(plock);
set.insert(element);
pthread_mutex_unlock(plock);

该集合应支持

.insert()
.size()
,复杂度最多为 O(logN),具有迭代器,并且应该能够使用自定义比较器保持其顺序。基本上与 Java 中的
ConcurrentSkipListSet
功能相同。理想情况下,它应该是平台无关的。

我正在查看CDS:http://libcds.sourceforge.net/doc/cds-api/modules.html但不确定哪种数据结构可以实现目标。该文档对于某些数据结构来说并没有真正的复杂性。

任何建议都会很棒,谢谢!

c++ concurrency lock-free concurrentskiplistmap libcds
2个回答
1
投票

使用 C++11,编写自己的代码非常容易:

template <typename T, typename Compare = std::less<T>>
class concurrent_set
{
private:
    set::set<T, Compare> set_;
    std::mutex mutex_;

public:
    typedef typename std::set<T, Compare>::iterator iterator;
    // etc.

    std::pair<iterator, bool>
    insert(const T& val) {
        std::unique_lock<std::mutex> lock(mutex_);
        return set_.insert(val);
    }

    size_type size() const {
        std::unique_lock<std::mutex> lock(mutex_);
        return set_.size();
    }
    // same idea with other functions
};

没有 C++11,也有

boost::mutex


0
投票

这里的实现不是无锁的,但具有足够的并行性来满足大多数实际需求。这个想法是,您不必锁定整个集合 - 您可以将项目分配到存储桶中,以便线程锁定不同的存储桶并且不太可能发生冲突:

#include <atomic>
#include <vector>
#include <unordered_set>
#include <immintrin.h>
#include <thread>

struct SpinLock {
  std::atomic_flag *pSync_ = nullptr;

  SpinLock() = default;

  SpinLock(std::atomic_flag& sync) : pSync_(&sync) {
    while(pSync_->test_and_set(std::memory_order_acq_rel)) {
      __builtin_ia32_pause ();
    }
  }

  ~SpinLock() {
    if(pSync_ != nullptr) {
      pSync_->clear(std::memory_order_release);
    }
  }
};

struct SyncArray {
  std::atomic_flag *pSyncs_ = nullptr;

  SyncArray() = default;

  explicit SyncArray(const size_t nItems) {
    pSyncs_ = static_cast<std::atomic_flag*>(malloc(nItems * sizeof(std::atomic_flag)));
    #pragma omp parallel for schedule(static, kRamPageBytes / sizeof(std::atomic_flag))
    for (size_t i = 0; i < nItems; i++) {
      new (pSyncs_ + i) std::atomic_flag ATOMIC_FLAG_INIT;
    }
  }

  SyncArray& operator=(SyncArray&& src) {
    if(this != &src) {
      std::swap(pSyncs_, src.pSyncs_);
    }
    return *this;
  }

  ~SyncArray() {
    free(pSyncs_);
  }
};

// It's unordered because it's too hard to maintain order with concurrency
template<typename TItem> struct ConcurrentSet
{
    std::vector<std::unordered_set<TItem>> items_;
    SyncArray saItems_;

    static size_t GetConcurrency()
    {
        return ((std::thread::hardware_concurrency()+1) * 3 + 2);
    }

    ConcurrentSet() : items_(GetConcurrency()), saItems_(GetConcurrency()) { }

    void Include(const TItem& item)
    {
        const SingVCI iBucket = std::hash<TItem>{}(item) % items_.size();
        SpinLock lock(saItems_.pSyncs_[iBucket]);
        items_[iBucket].insert(item);
    }

    void Exclude(const TItem& item)
    {
        const SingVCI iBucket = std::hash<TItem>{}(item) % items_.size();
        SpinLock lock(saItems_.pSyncs_[iBucket]);
        items_[iBucket].erase(item);
    }

    bool Contains(const TItem& item)
    {
        const SingVCI iBucket = std::hash<TItem>{}(item) % items_.size();
        SpinLock lock(saItems_.pSyncs_[iBucket]);
        return items_[iBucket].find(item) != items_[iBucket].end();
    }
};
© www.soinside.com 2019 - 2024. All rights reserved.