我正在尝试实现一个工作对象(这是一个等待任务并仅在销毁时终止的线程),但是我在使用
std::condition_variable
:
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <iostream>
class Worker {
public:
Worker(Worker&&) = delete;
Worker(const Worker&) = delete;
Worker()
: stop(false)
, thread(&Worker::worker_thread, this)
{}
void push(std::function<void()> _f) {
std::unique_lock lock(thread_mutex);
task = std::move(_f);
new_task.notify_one();
}
~Worker() { /* not implemented yet */ }
private:
void worker_thread() {
std::unique_lock lock(thread_mutex);
while (true) {
new_task.wait(lock);
if (stop) return;
task();
}
}
std::atomic<bool> stop;
std::function<void()> task;
std::thread thread;
std::mutex thread_mutex;
std::condition_variable new_task;
};
我在这里想出了这个目前不适用于 gcc 的例子:
int main() {
Worker t;
t.push([] { std::cout << "Hello from worker" << std::endl; });
for (int i = 0; i < 10; ++i)
t.push([i] { std::cout << i << std::endl; });
return 0;
}
如果运行代码我得到这个输出:
terminate called without an active exception //(because destructor yet to be implemented)
9
9
9
9
9
.
.
. and so on
9
所以这是我的代码应该如何工作:
当构造一个
Worker
对象时,它会产生一个执行worker_thread
函数的线程。
此函数锁定thread_mutex
,并且应该仅在等待条件变量时将其解锁。
当一个任务被推送时,
push
函数会尝试锁定互斥量,它应该只在它可以的时候,也就是当 worker_thread
正在等待带有条件变量的任务时。
所以如果线程正在等待,
push
应该能够获取锁并在task
缓冲区中移动新任务,然后通知条件变量,唤醒线程。
一个提示是这段代码:
int main() {
Worker t;
t.push([] { std::cout << "Hello from worker" << std::endl; });
//for (int i = 0; i < 10; ++i)
// t.push([i] { std::cout << i << std::endl; });
return 0;
}
永远打招呼,有时它会崩溃,但应该只打印一次然后等待下一个任务。
这更奇怪,因为我最初的想法是一个接一个地执行多个推送会出现问题,也许这可能会导致锁出现一些问题,但在最后一个示例中,我只调用了一次
push
并且仍然我有问题。
有人能明白问题出在哪里吗?
thread_mutex
和 new_task
尚未初始化时,您运行一个线程。使用未初始化的成员运行 worker_thread
是未定义的行为。Worker
构造,推送任务,销毁可以在线程工作者启动之前发生,并且工作者永远等待条件变量。您应该首先使用相反方向的条件变量来向构造函数发出有关正在运行的工作人员的信号。这是适用于两个示例的解决方案:
class Worker {
public:
Worker(Worker&&) = delete;
Worker(const Worker&) = delete;
Worker()
: stop(false)
, task(nullptr)
, thread(&Worker::worker_thread, this)
{}
void push(std::function<void()> _f) {
std::unique_lock lock(thread_mutex);
cv.wait(lock, [this] { return !task; });
task = std::move(_f);
new_task.notify_one();
}
~Worker() {
std::unique_lock lock(thread_mutex);
cv.wait(lock, [this] { return !task; });
stop = true;
new_task.notify_one();
lock.unlock();
if (thread.joinable())
thread.join();
}
private:
void worker_thread() {
std::unique_lock lock(thread_mutex);
while (true) {
cv.wait(lock, [this] { return task || stop; }); //
if (stop) return;
task();
task = nullptr; // reset task for check
new_task.notify_one();
}
}
bool stop; // does not need to be atomic
std::function<void()> task;
std::mutex thread_mutex;
std::condition_variable cv;
std::thread thread; // moved to bottom
};
我遇到的主要问题是我不明白条件变量是如何工作的。
条件变量不等待信号,它等待条件。 所以偶尔条件变量会“唤醒”线程,检查条件是否满足是用户的责任。 使用条件变量时,检查条件很重要,否则它会时不时地唤醒并运行之后的操作。 所以这样做的一种方法是:
while (!condition)
cv.wait(lock);
或者这个,使用 lambdas:
cv.wait(lock, [] { return condition; });
所以
cv.notify_one()
只是一个条件可能已经改变的提示,而不是唤醒线程的命令。
此外,我必须小心初始化,因为在我之前的代码中,线程是在条件变量和互斥锁之前初始化的,在这种情况下是否有所不同尚不清楚,它可能确实如此。 成员变量按照声明的顺序进行初始化。
最后,我还需要检查
push
和析构函数中的另一个条件。我需要在两者中都看到任务无效,或者设置为 0
或 NULL
.
这是因为如果设置为NULL,就意味着
push
函数可以安全的修改worker_thread
未使用的任务。
析构函数做类似的事情,它需要在销毁之前查看线程是否执行完最后一个任务,将stop
标志设置为true
,
那是因为工作人员在执行任务之前检查是否设置了标志。
就这些,谢谢大家的热心帮助,希望这个问题对所有需要了解条件变量的程序员有所帮助