无锁 spsc 队列中的数据未正确排序

问题描述 投票:0回答:1

我正在尝试编写一个有界无锁单生产者、单消费者队列,但是,插入队列的元素的顺序与从队列中删除元素的顺序不同。 main.cpp 中的测试代码在一个线程中将 100,000,000 个连续整数添加到队列中,并在另一个线程中弹出这些值,检查这些值是否是连续的。

目前,运行程序时,它会打印“Oh no!last value was: 219718 but this value is 220231”(程序运行之间的数值不同)。数字的范围似乎取决于

yield_frequency
中的
writer_thread
,较低的
yield_frequency
值会导致测试(经常但并非总是)在 100 左右的数字处失败。

(该代码是一个 wip,因此可能有一些代码与 C++ 最佳实践不匹配)

spsc_queue.hpp

#pragma once

#include <memory>
#include <utility>
#include <bit>
#include <atomic>
#include <optional>

template <typename T> class ChannelReader;
template <typename T> class ChannelWriter;

// instances of this class cannot be created directly, make_queue returns a pair of ChannelReader and ChannelWriter instances that wrap the queue to prevent reads (or writes) from multiple threads
template <typename T>
class SpscQueue {
    friend ChannelReader<T>;
    friend ChannelWriter<T>;
    public:
    static std::pair<ChannelReader<T>, ChannelWriter<T>> make_queue(size_t size) {
        if (!std::__has_single_bit(size)) throw "";
        std::shared_ptr<SpscQueue<T>> queue_ptr(new SpscQueue<T>(size));
        return std::make_pair(ChannelReader<T>(queue_ptr), ChannelWriter<T>(queue_ptr));
    }
    ~SpscQueue() {
        delete[] data;
    }
    private:
    SpscQueue (size_t _size): data(new T[_size]()), size(_size) {}
    alignas(64) T* data;
    const size_t size;
    alignas(64) std::atomic<size_t> read_idx{0};
    alignas(64) std::atomic<size_t> write_idx{0};
};

// wrapper class that allows ONE thread to read from a SPSC queue
template <typename T>
class ChannelReader {
    friend SpscQueue<T>;
    public:
    // attempt to read from the queue, returning the value if successful, and std::nullopt otherwise
    std::optional<T> try_get_next() {
        size_t read_idx = queue->read_idx;
        size_t write_idx = cached_write_idx;
        if (write_idx <= read_idx) {
            write_idx = queue->write_idx;
            cached_write_idx = write_idx;
        }
        if (write_idx <= read_idx) {
            return std::nullopt;
        } else {
            queue->read_idx++;
            return std::move(queue->data[read_idx % queue->size]);
        }
    }
    ChannelReader(const ChannelReader&) = delete;
    ChannelReader& operator=(const ChannelReader&) = delete;
    ChannelReader(ChannelReader&&) = default;
    ChannelReader& operator=(ChannelReader&&) = default;
    private:
    ChannelReader(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
    std::shared_ptr<SpscQueue<T>> queue;
    alignas(64) size_t cached_write_idx{0};
};

// wrapper class that allows ONE thread to write to a SPSC queue
template <typename T>
class ChannelWriter {
    friend SpscQueue<T>;
    public:
    // attempt to write to the queue, returns true if the write was successful, and false otherwise
    bool try_write_next(const T& obj) {
        size_t read_idx = cached_read_idx;
        size_t write_idx = queue->write_idx;
        if (write_idx >= read_idx + queue->size) {
            read_idx = queue->read_idx;
            cached_read_idx = read_idx;          
        }
        if (write_idx >= read_idx + queue->size) {
            return false;
        } else {
            queue->data[write_idx % queue->size] = obj;
            ++queue->write_idx;
            return true;
        }
    }
    ChannelWriter(const ChannelWriter&) = delete;
    ChannelWriter& operator=(const ChannelWriter&) = delete;
    ChannelWriter(ChannelWriter&&) = default;
    ChannelWriter& operator=(ChannelWriter&&) = default;
    private:
    ChannelWriter(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
    std::shared_ptr<SpscQueue<T>> queue;
    alignas(64) size_t cached_read_idx{0};
};

主.cpp

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

#include "../include/spsc_queue.hpp"

namespace chrono = std::chrono;

int main() {
  std::atomic<bool> latch{false};
  auto [reader, writer] = SpscQueue<int>::make_queue(512);
  std::thread reader_thread([_reader = std::move(reader), &latch]() mutable {
    latch.wait(false);
    std::cerr << "Starting reader...\n";
    int last_val = -1;
    while (last_val != 100'000'000 - 1) {
      if (auto data = _reader.try_get_next()) {
        if (*data != last_val + 1) {
          std::cerr << "Oh no! last value was: " << last_val
                    << " but this value is " << *data << '\n';
          std::exit(1);
        }
        last_val = *data;
        // std::cerr << last_val << " Read\n";
      }
    }
  });
  std::thread writer_thread([_writer = std::move(writer), &latch]() mutable {
    latch.wait(false);
    std::cerr << "Starting writer...\n";
    for (int i = 0; i < 100'000'000; ++i) {
      for (int j = 0; !_writer.try_write_next(i); ++j) {
        constexpr int yield_frequency = 1 << 0;
        if (j % yield_frequency)
          std::this_thread::yield();
      }
      // std::cerr << "writer wrote value" << i << '\n';
      // if (i == 10'000'000) std::exit(1);
    }
  });
  std::cout << "Start" << std::endl;
  {
    auto start = chrono::steady_clock::now();
    latch = true;
    latch.notify_all();
    reader_thread.join();
    writer_thread.join();
    auto finish = chrono::steady_clock::now();
    auto elapsed_seconds =
        chrono::duration_cast<chrono::duration<double>>(finish - start).count();
    std::cout << elapsed_seconds << std::endl;
  }
  std::cout << "End" << std::endl;
}
c++ multithreading lock-free lockless
1个回答
0
投票

220231 - 219718 = 513
,环绕有一个错误......您在读取值之前递增读取索引,写入器可能会覆盖读取器要读取的值。

解决办法是先读取值再递增,和写入时一样,先写入值,然后递增写入索引。

© www.soinside.com 2019 - 2024. All rights reserved.