消费者生产商下降

问题描述 投票:0回答:1

我有一个线程(生产者)从每个t mSec的某些来源获取数据。一旦获取并准备好数据,其他线程(消费者)应该获取数据并对其进行一些处理。

但是,无法保证哪个线程更快(生产者可能比消费者更慢或更快)。

我做了什么:

//Producer
while(is_enabled_) {
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
    std::unique_lock<std::mutex> lk(mutex_);
    ready_ = false;
    //acquiring the data
    ready_ = true;
    lk.unlock();
    cv_.notify_all();
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_milliseconds < duration ? 0 : sleep_milliseconds - duration));
}

考虑到消费者都这样做:

//A consumer
while(is_enabled_){
    std::unique_lock<std::mutex> lk(mutex_);
    cv_.wait(lk, [this] {return this->ready_; });
    //Process the data
}

我没有排队。只应处理最后获取的数据,每个消费者只能处理一次。如果获取了一些数据且消费者没有时间处理它,则数据将被删除,另一个数据将被生产者覆盖。

另一方面,如果消费者比生产者更快,他们应该等到新数据准备好而不是处理旧数据。

我面临的问题是,如果生产者没有足够快地生成新数据,那么消费者正在使用生产者生成的相同旧数据。

我的实施缺乏什么?

c++ multithreading c++11
1个回答
0
投票

你可以通过这样的方式实现你的目标:

全局变量:std::vector<bool> newData;

对于生产者:

while(is_enabled_) {
    std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
    std::unique_lock<std::mutex> lk(mutex_);
    ready_ = false;
    //acquiring the data
    ready_ = true;
   std::fill(newData.begin(), newData.end(), true);
    lk.unlock();
    cv_.notify_all();
    std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_milliseconds < duration ? 0 : sleep_milliseconds - duration));
}

在while循环之前的消费者中:

int index;
std::unique_lock<std::mutex> lk(mutex_);
index = newData.size();
newData.push_back(false);
lk.unlock();

然后身体是这样的

while(is_enabled_){
    std::unique_lock<std::mutex> lk(mutex_);
    if(newData[index]) {
        cv_.wait(lk, [this] {return this->ready_; });
        newData[index] = false;
    //Process the data
    }else  {
          lk.unlock();
          std::this_thread::sleep_for(std::chrono::milliseconds(50);
    }
}

希望这可以帮助。

© www.soinside.com 2019 - 2024. All rights reserved.