c++11 2 线程无锁队列

问题描述 投票:0回答:2

除了主线程之外,我还有一个线程接收数据并将其写入文件中。

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;

void setData(const std::vector<int>& data) {
    std::lock_guard<std::mutex> lock(mutex);
    dataQueue.push(data);
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::vector<int>& data= dataQueue.front();

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

setData
为主线程使用,
write
实际上是写入线程。我使用
std::lock_quard
来避免内存冲突,但是当锁定写入线程时,它会减慢主线程的速度,因为它必须等待队列解锁。但我想我可以避免这种情况,因为线程永远不会同时作用于队列的同一元素。

所以我想做到无锁,但我真的不明白应该如何实现它。我的意思是,我怎样才能在不锁定任何东西的情况下做到这一点?此外,如果写入线程比主线程快,则队列可能大部分时间为空,因此它应该以某种方式等待新数据,而不是无限循环以检查非空队列。

编辑:我将简单的

std::lock_guard
更改为
std::cond_variable
,以便它可以在队列为空时等待。但是主线程仍然可以被阻塞,当
cvQeue.wait(.)
被解析时,它会重新获取锁。此外,如果主线程执行了
cvQueue.notify_one()
但写入线程没有等待怎么办?

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;
std::condition_variable cvQueue;

void setData(const std::vector<int>& data) {
    std::unique_lock<std::mutex> lock(mutex);
    dataQueue.push(data);
    cvQueue.notify_one();
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::unique_lock<std::mutex> lock(mutex);
            cvQueue.wait(lock);

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}
multithreading c++11 locking lock-free
2个回答
2
投票

如果只有两个线程,则可以使用无锁单生产者单消费者 (SPSC) 队列。
可以在这里找到有界版本:https://github.com/rigtorp/SPSCQueue
Dmitry Vyukov 在这里提出了一个无界版本:http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue(不过您应该注意,该代码应该适合使用原子) .)

关于阻塞弹出操作 - 这是无锁数据结构所不提供的,因为这样的操作显然不是无锁的。然而,以这样的方式调整链接的实现应该是相对直接的,如果队列在推送之前为空,则推送操作会通知条件变量。


-1
投票

我想我有一些东西可以满足我的需求。我做了一个使用

LockFreeQueue
std::atomic
。因此,我可以自动管理队列头/尾的状态。

template<typename T>
class LockFreeQueue {
public:
    void push(const T& newElement) {
        fifo.push(newElement);
        tail.fetch_add(1);
        cvQueue.notify_one();
    }

    void pop() {
        size_t oldTail = tail.load();
        size_t oldHead = head.load();

        if (oldTail == oldHead) {
            return;
        }

        fifo.pop();
        head.store(++oldHead);
    }

    bool isEmpty() {
        return head.load() == tail.load();
    }

    T& getFront() {
        return fifo.front();
    }

    void waitForNewElements() {
        if (tail.load() == head.load()) {
            std::mutex m;
            std::unique_lock<std::mutex> lock(m);
            cvQueue.wait_for(lock, std::chrono::milliseconds(TIMEOUT_VALUE));
        }
    }

private:
    std::queue<T> fifo;
    std::atomic<size_t> head = { 0 };
    std::atomic<size_t> tail = { 0 };
    std::condition_variable cvQueue;
};

LockFreeQueue<std::vector<int>> dataQueue;
std::atomic<bool> store(true);

void setData(const std::vector<int>& data) {
    dataQueue.push(data);
    // do other things
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store.load()) {

        dataQueue.waitForNewElements();

        while (!dataQueue.isEmpty()) {
            std::vector<int>& data= dataQueue.getFront();

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

我仍然有一把锁

waitForNewElements
,但它并没有锁定整个过程,因为它正在等待事情做。但最大的改进是,生产者可以推动,而消费者可以流行。仅当
LockFreQueue::tail
LockFreeQueue::head
相同时才禁止。意思是队列为空,进入等待状态。

我不太满意的是

cvQueue.wait_for(lock, TIMEOUT_VALUE)
。我想做一个简单的
cvQueue.wait(lock)
,但问题是当结束线程时,我在主线程中做了
store.store(false)
。因此,如果写入线程正在等待,那么它永远不会在没有超时的情况下结束。因此,我设置了一个足够大的超时,以便大多数时候
condition_variable
由锁解决,而当线程结束时,由超时解决。

如果您觉得有什么地方不对或者需要改进,请随时发表评论。

© www.soinside.com 2019 - 2024. All rights reserved.