我正在尝试编写一个有界无锁单生产者、单消费者队列,但是,插入队列的元素的顺序与从队列中删除元素的顺序不同。 main.cpp 中的测试代码在一个线程中将 100,000,000 个连续整数添加到队列中,并在另一个线程中弹出这些值,检查这些值是否是连续的。
目前,运行程序时,它会打印“Oh no!last value was: 219718 but this value is 220231”(程序运行之间的数值不同)。数字的范围似乎取决于
yield_frequency
中的 writer_thread
,较低的 yield_frequency
值会导致测试(经常但并非总是)在 100 左右的数字处失败。
(该代码是一个 wip,因此可能有一些代码与 C++ 最佳实践不匹配)
spsc_queue.hpp
#pragma once
#include <memory>
#include <utility>
#include <bit>
#include <atomic>
#include <optional>
template <typename T> class ChannelReader;
template <typename T> class ChannelWriter;
// instances of this class cannot be created directly, make_queue returns a pair of ChannelReader and ChannelWriter instances that wrap the queue to prevent reads (or writes) from multiple threads
template <typename T>
class SpscQueue {
friend ChannelReader<T>;
friend ChannelWriter<T>;
public:
static std::pair<ChannelReader<T>, ChannelWriter<T>> make_queue(size_t size) {
if (!std::__has_single_bit(size)) throw "";
std::shared_ptr<SpscQueue<T>> queue_ptr(new SpscQueue<T>(size));
return std::make_pair(ChannelReader<T>(queue_ptr), ChannelWriter<T>(queue_ptr));
}
~SpscQueue() {
delete[] data;
}
private:
SpscQueue (size_t _size): data(new T[_size]()), size(_size) {}
alignas(64) T* data;
const size_t size;
alignas(64) std::atomic<size_t> read_idx{0};
alignas(64) std::atomic<size_t> write_idx{0};
};
// wrapper class that allows ONE thread to read from a SPSC queue
template <typename T>
class ChannelReader {
friend SpscQueue<T>;
public:
// attempt to read from the queue, returning the value if successful, and std::nullopt otherwise
std::optional<T> try_get_next() {
size_t read_idx = queue->read_idx;
size_t write_idx = cached_write_idx;
if (write_idx <= read_idx) {
write_idx = queue->write_idx;
cached_write_idx = write_idx;
}
if (write_idx <= read_idx) {
return std::nullopt;
} else {
queue->read_idx++;
return std::move(queue->data[read_idx % queue->size]);
}
}
ChannelReader(const ChannelReader&) = delete;
ChannelReader& operator=(const ChannelReader&) = delete;
ChannelReader(ChannelReader&&) = default;
ChannelReader& operator=(ChannelReader&&) = default;
private:
ChannelReader(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
std::shared_ptr<SpscQueue<T>> queue;
alignas(64) size_t cached_write_idx{0};
};
// wrapper class that allows ONE thread to write to a SPSC queue
template <typename T>
class ChannelWriter {
friend SpscQueue<T>;
public:
// attempt to write to the queue, returns true if the write was successful, and false otherwise
bool try_write_next(const T& obj) {
size_t read_idx = cached_read_idx;
size_t write_idx = queue->write_idx;
if (write_idx >= read_idx + queue->size) {
read_idx = queue->read_idx;
cached_read_idx = read_idx;
}
if (write_idx >= read_idx + queue->size) {
return false;
} else {
queue->data[write_idx % queue->size] = obj;
++queue->write_idx;
return true;
}
}
ChannelWriter(const ChannelWriter&) = delete;
ChannelWriter& operator=(const ChannelWriter&) = delete;
ChannelWriter(ChannelWriter&&) = default;
ChannelWriter& operator=(ChannelWriter&&) = default;
private:
ChannelWriter(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
std::shared_ptr<SpscQueue<T>> queue;
alignas(64) size_t cached_read_idx{0};
};
主.cpp
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include "../include/spsc_queue.hpp"
namespace chrono = std::chrono;
int main() {
std::atomic<bool> latch{false};
auto [reader, writer] = SpscQueue<int>::make_queue(512);
std::thread reader_thread([_reader = std::move(reader), &latch]() mutable {
latch.wait(false);
std::cerr << "Starting reader...\n";
int last_val = -1;
while (last_val != 100'000'000 - 1) {
if (auto data = _reader.try_get_next()) {
if (*data != last_val + 1) {
std::cerr << "Oh no! last value was: " << last_val
<< " but this value is " << *data << '\n';
std::exit(1);
}
last_val = *data;
// std::cerr << last_val << " Read\n";
}
}
});
std::thread writer_thread([_writer = std::move(writer), &latch]() mutable {
latch.wait(false);
std::cerr << "Starting writer...\n";
for (int i = 0; i < 100'000'000; ++i) {
for (int j = 0; !_writer.try_write_next(i); ++j) {
constexpr int yield_frequency = 1 << 0;
if (j % yield_frequency)
std::this_thread::yield();
}
// std::cerr << "writer wrote value" << i << '\n';
// if (i == 10'000'000) std::exit(1);
}
});
std::cout << "Start" << std::endl;
{
auto start = chrono::steady_clock::now();
latch = true;
latch.notify_all();
reader_thread.join();
writer_thread.join();
auto finish = chrono::steady_clock::now();
auto elapsed_seconds =
chrono::duration_cast<chrono::duration<double>>(finish - start).count();
std::cout << elapsed_seconds << std::endl;
}
std::cout << "End" << std::endl;
}
220231 - 219718 = 513
,环绕有一个错误......您在读取值之前递增读取索引,写入器可能会覆盖读取器要读取的值。
解决办法是先读取值再递增,和写入时一样,先写入值,然后递增写入索引。