多线程编程中使用的同步原语,用于等待条件为真。
std::condition_variable_any 的 libc++ 实现
条件变量对于notify()和unlock_sleep()应该有一个单一的顺序(在wait()中使用的一个虚构的函数调用,其中互斥体被解锁并且线程作为一个睡眠)
使用`std::condition_variable`退出程序时出错
我遇到了一些与以下代码具有相同逻辑的代码的问题: #包括 #包括 #包括 #包括 int main() { ...
我想完成一个程序,它可以在循环中接受用户输入并使用多线程进行一些计算,当用户想要退出时,循环退出。就绪意味着线程已准备好......
使用Python线程使model_averaging在联邦学习中独占
我使用以下代码创建 num_of_clients 线程: 套接字线程 = [] 客户端数量 = 1 所有数据 = b"" 而真实: 尝试: 对于我在范围内(no_of_client):
以下代码使用条件变量和监视器标志来同步主线程和线程2之间的操作: int main() { std::互斥体 m; std::condition_variable 简历; 标准::
Condition_variable wait_for超时问题
我正在尝试学习如何让条件变量与某些线程一起使用,但遇到了一些麻烦。目标是让主线程在设定的时间内生成一些“任务”,
我不确定我是否应该使用 std::semaphore 或 std::condition 变量来处理线程池系统中的线程等待。 我认为使用信号量并增加它的风险很低
想象一下,我们有一个条件变量 cv_ 与互斥量 mtx_ 相关联 线程 t1 现在有一个与 mtx_ 关联的 unique_lock ul 并且它现在有 cv_,而其他线程是 cv_.wait(unique_lock(mtx_)...
为什么这段代码偶尔会挂在 std::condition_variable::wait() 上?
我已经在 C++ 中实现了一个非常标准的单消费者-多生产者模式,此外还限制了队列中的任务数量。 一个 Worker 在一个
pthread_cond_wait()如何与mutex解耦?
我一直在思考条件变量背后的逻辑,并且对与之相关的最常见问题感到满意。 基本上,如果我们做类似的事情: mutex.lock() whi...
我正在尝试实现一个工作对象(这是一个等待任务并仅在销毁时终止的线程),但我在使用 std::condition_variable 时遇到了问题: #包括 我正在尝试实现一个工作对象(这是一个等待任务并仅在销毁时终止的线程),但是我在使用std::condition_variable: #include <thread> #include <mutex> #include <condition_variable> #include <functional> #include <atomic> #include <iostream> class Worker { public: Worker(Worker&&) = delete; Worker(const Worker&) = delete; Worker() : stop(false) , thread(&Worker::worker_thread, this) {} void push(std::function<void()> _f) { std::unique_lock lock(thread_mutex); task = std::move(_f); new_task.notify_one(); } ~Worker() { /* not implemented yet */ } private: void worker_thread() { std::unique_lock lock(thread_mutex); while (true) { new_task.wait(lock); if (stop) return; task(); } } std::atomic<bool> stop; std::function<void()> task; std::thread thread; std::mutex thread_mutex; std::condition_variable new_task; }; 我在这里想出了这个目前不适用于 gcc 的例子: int main() { Worker t; t.push([] { std::cout << "Hello from worker" << std::endl; }); for (int i = 0; i < 10; ++i) t.push([i] { std::cout << i << std::endl; }); return 0; } 如果运行代码我得到这个输出: terminate called without an active exception //(because destructor yet to be implemented) 9 9 9 9 9 . . . and so on 9 所以这是我的代码应该如何工作: 当构造一个Worker对象时,它会产生一个执行worker_thread函数的线程。 此函数锁定thread_mutex,并且应该仅在等待条件变量时将其解锁。 当一个任务被推送时,push 函数会尝试锁定互斥量,它应该只在它可以的时候,也就是当 worker_thread 正在等待带有条件变量的任务时。 所以如果线程正在等待,push应该能够获取锁并在task缓冲区中移动新任务,然后通知条件变量,唤醒线程。 一个提示是这段代码: int main() { Worker t; t.push([] { std::cout << "Hello from worker" << std::endl; }); //for (int i = 0; i < 10; ++i) // t.push([i] { std::cout << i << std::endl; }); return 0; } 永远打招呼,有时它会崩溃,但应该只打印一次然后等待下一个任务。 这更奇怪,因为我最初的想法是一个接一个地执行多个推送会出现问题,也许这可能会导致锁出现一些问题,但在最后一个示例中,我只调用了一次push 并且仍然我有问题。 有人能明白问题出在哪里吗? 阅读有关初始化顺序的信息。当 thread_mutex 和 new_task 尚未初始化时,您运行一个线程。使用未初始化的成员运行 worker_thread 是未定义的行为。 Worker 构造,推送任务,销毁可以在线程工作者启动之前发生,并且工作者永远等待条件变量。您应该首先使用相反方向的条件变量来向构造函数发出有关正在运行的工作人员的信号。 这是适用于两个示例的解决方案: class Worker { public: Worker(Worker&&) = delete; Worker(const Worker&) = delete; Worker() : stop(false) , task(nullptr) , thread(&Worker::worker_thread, this) {} void push(std::function<void()> _f) { std::unique_lock lock(thread_mutex); cv.wait(lock, [this] { return !task; }); task = std::move(_f); new_task.notify_one(); } ~Worker() { std::unique_lock lock(thread_mutex); cv.wait(lock, [this] { return !task; }); stop = true; new_task.notify_one(); lock.unlock(); if (thread.joinable()) thread.join(); } private: void worker_thread() { std::unique_lock lock(thread_mutex); while (true) { cv.wait(lock, [this] { return task || stop; }); // if (stop) return; task(); task = nullptr; // reset task for check new_task.notify_one(); } } bool stop; // does not need to be atomic std::function<void()> task; std::mutex thread_mutex; std::condition_variable cv; std::thread thread; // moved to bottom }; 我遇到的主要问题是我不明白条件变量是如何工作的。 条件变量不等待信号,它等待条件。 所以偶尔条件变量会“唤醒”线程,检查条件是否满足是用户的责任。 使用条件变量时,检查条件很重要,否则它会时不时地唤醒并运行之后的操作。 所以这样做的一种方法是: while (!condition) cv.wait(lock); 或者这个,使用 lambdas: cv.wait(lock, [] { return condition; }); 所以cv.notify_one()只是一个条件可能已经改变的提示,而不是唤醒线程的命令。 此外,我必须小心初始化,因为在我之前的代码中,线程是在条件变量和互斥锁之前初始化的,在这种情况下是否有所不同尚不清楚,它可能确实如此。 成员变量按照声明的顺序进行初始化。 最后,我还需要检查push 和析构函数中的另一个条件。我需要在两者中都看到任务无效,或者设置为 0 或 NULL. 这是因为如果设置为NULL,就意味着push函数可以安全的修改worker_thread未使用的任务。 析构函数做类似的事情,它需要在销毁之前查看线程是否执行完最后一个任务,将stop标志设置为true, 那是因为工作人员在执行任务之前检查是否设置了标志。 就这些,谢谢大家的热心帮助,希望这个问题对所有需要了解条件变量的程序员有所帮助
我正在尝试实现一个工作对象,它是一个等待任务并仅在销毁时终止的线程: #包括 #包括 #包括 #
以非阻塞方式将数据从事件线程传递到持续运行的线程的最高效/优雅/稳健的解决方案 (C++20)
我正在进行一个研究项目,本质上是一个以 30 fps 显示一系列图像的 Windows 窗口。我最初一直在使用 GLFW。但是,API 的问题(尽管超级...
C++中将数据从事件线程传递到持续运行的线程的最有效方式(20)
我正在进行一个研究项目,本质上是一个以 30 fps 显示一系列图像的 Windows 窗口。我最初一直在使用 GLFW。但是,API 的问题(尽管超级...
嗨我想瘫痪阅读和分析多个文本文件的过程。所以我有一个生产者 Producer 和两个消费者 TextAnalyzer A 和 TextAnalyzer B。 生产者有一个 RequstQueue
我需要一些帮助来理解如何在 C 中使用条件变量来解决练习。这是一个小例子: #包括 #包括 #包括 我需要一些帮助来理解如何在 C 中使用条件变量来解决练习。这是一个小例子: #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <string.h> #include <errno.h> #include <sys/types.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #define OKTOWRITE "/oktowrite" #define MESSAGE "/message" #define MUTEX "/lock" int main(int argc, char** argv) { pthread_cond_t* condition; pthread_mutex_t *mutex; char* message; int des_cond, des_msg, des_mutex; int mode = S_IRWXU | S_IRWXG; des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode); if (des_mutex < 0) { perror("failure on shm_open on des_mutex"); exit(1); } if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) { perror("Error on ftruncate to sizeof pthread_cond_t\n"); exit(-1); } mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0); if (mutex == MAP_FAILED ) { perror("Error on mmap on mutex\n"); exit(1); } pthread_mutex_init(mutex, NULL ); des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode); if (des_cond < 0) { perror("failure on shm_open on des_cond"); exit(1); } if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) { perror("Error on ftruncate to sizeof pthread_cond_t\n"); exit(-1); } condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t), PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0); if (condition == MAP_FAILED ) { perror("Error on mmap on condition\n"); exit(1); } pthread_cond_init(condition, NULL ); if (!fork()) { sleep(3); pthread_mutex_lock(mutex); pthread_cond_signal(condition); pthread_mutex_unlock(mutex); printf("son signaled\n"); exit(0); } else { printf("wait on condition\n"); pthread_mutex_lock(mutex); pthread_cond_wait(condition, mutex); pthread_mutex_unlock(mutex); printf("Signaled by son process, wake up\n"); pthread_mutex_destroy(mutex); pthread_cond_destroy(condition); shm_unlink(OKTOWRITE); shm_unlink(MESSAGE); shm_unlink(MUTEX); return 0; } } 问题是进程的父亲继续被锁定,即使在儿子发出信号之后也是如此。一切都在共享内存中(使用shm_open和mmap)所以两个进程的条件应该相同。 在调用 wait 或 signal 之前锁定互斥锁可能是我犯了一个错误吗? 编辑: 感谢所有帮助过我的人。这是标有关键部分的正确代码: #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <string.h> #include <errno.h> #include <sys/types.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #define OKTOWRITE "/condwrite" #define MESSAGE "/msg" #define MUTEX "/mutex_lock" int main(int argc, char** argv) { pthread_cond_t* condition; pthread_mutex_t* mutex; char* message; int des_cond, des_msg, des_mutex; int mode = S_IRWXU | S_IRWXG; des_mutex = shm_open(MUTEX, O_CREAT | O_RDWR | O_TRUNC, mode); if (des_mutex < 0) { perror("failure on shm_open on des_mutex"); exit(1); } if (ftruncate(des_mutex, sizeof(pthread_mutex_t)) == -1) { perror("Error on ftruncate to sizeof pthread_cond_t\n"); exit(-1); } mutex = (pthread_mutex_t*) mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, des_mutex, 0); if (mutex == MAP_FAILED ) { perror("Error on mmap on mutex\n"); exit(1); } des_cond = shm_open(OKTOWRITE, O_CREAT | O_RDWR | O_TRUNC, mode); if (des_cond < 0) { perror("failure on shm_open on des_cond"); exit(1); } if (ftruncate(des_cond, sizeof(pthread_cond_t)) == -1) { perror("Error on ftruncate to sizeof pthread_cond_t\n"); exit(-1); } condition = (pthread_cond_t*) mmap(NULL, sizeof(pthread_cond_t), PROT_READ | PROT_WRITE, MAP_SHARED, des_cond, 0); if (condition == MAP_FAILED ) { perror("Error on mmap on condition\n"); exit(1); } /* HERE WE GO */ /**************************************/ /* set mutex shared between processes */ pthread_mutexattr_t mutexAttr; pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(mutex, &mutexAttr); /* set condition shared between processes */ pthread_condattr_t condAttr; pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED); pthread_cond_init(condition, &condAttr); /*************************************/ if (!fork()) { sleep(10); pthread_mutex_lock(mutex); pthread_cond_signal(condition); printf("son signaled\n"); pthread_mutex_unlock(mutex); exit(0); } else { printf("father waits on condition\n"); pthread_mutex_lock(mutex); pthread_cond_wait(condition, mutex); pthread_mutex_unlock(mutex); printf("Signaled by son process, wake up!!!!!!!!\n"); pthread_condattr_destroy(&condAttr); pthread_mutexattr_destroy(&mutexAttr); pthread_mutex_destroy(mutex); pthread_cond_destroy(condition); shm_unlink(OKTOWRITE); shm_unlink(MESSAGE); shm_unlink(MUTEX); } return 0; } 要在进程之间共享,需要通过正确初始化的属性相应地初始化互斥量:http://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutexattr_setpshared.html #include <pthread.h> ... pthread_mutex_t * pmutex = NULL; pthread_mutexattr_t attrmutex; /* Initialise attribute to mutex. */ pthread_mutexattr_init(&attrmutex); pthread_mutexattr_setpshared(&attrmutex, PTHREAD_PROCESS_SHARED); /* Allocate memory to pmutex here. */ /* Initialise mutex. */ pthread_mutex_init(pmutex, &attrmutex); /* Use the mutex. */ /* Clean up. */ pthread_mutex_destroy(pmutex); pthread_mutexattr_destroy(&attrmutex); (为了本示例的可读性,省略了错误检查) 这同样适用于应该在进程之间共享的条件变量:http://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_condattr_setpshared.html #include <pthread.h> ... pthread_cond_t * pcond = NULL; pthread_condattr_t attrcond; /* Initialise attribute to condition. */ pthread_condattr_init(&attrcond); pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED); /* Allocate memory to pcond here. */ /* Initialise condition. */ pthread_cond_init(pcond, &attrcond); /* Use the condition. */ /* Clean up. */ pthread_cond_destroy(pcond); pthread_condattr_destroy(&attrcond); (为了本示例的可读性,省略了错误检查) 另请参阅此答案:https://stackoverflow.com/a/2390670/694576 等待一个条件之前应该有一个 while 语句,像这样: pthread_mutex_lock(mutex); while(!conditionSatisfied) pthread_cond_wait(condition, mutex); pthread_mutex_unlock(mutex); while 信号应该通过以下方式完成: pthread_mutex_lock(mutex); conditionSatisfied = true; pthread_cond_signal(condition); pthread_mutex_unlock(mutex); 是的,必须在pthread_cond_wait之前锁定互斥量,但不必为pthread_cond_signal锁定。如果你回头看看你的代码,你会发现互斥量将被解锁两次,这是错误的标志......孩子也有可能在已被父母破坏的互斥量上调用解锁...... 顺便说一下,睡眠并不能保证父进程会先执行。为了确保这一点,您将需要……一个条件变量……
为什么我的 pthread_cond_signal 没有立即唤醒阻塞的线程?
我正在编写一个多线程程序,其中辅助线程在满足条件后运行,即数据结构中存在一定数量的元素。 空白* gc_thread_handler(void *arg) {
在同一个 std::condition_variable_any::wait_for 和 std::std::stop_token 上等待多个线程时死锁
在我正在编写的应用程序中,我有一个线程模型,它被简化如下: 生成器 jthread (m_WorkerGenerator) 正在启动异步任务。 多个异步任务工作直到...
C++ 11 - condition_variable - wait_until不能如期工作
我已经实现了一个示例程序来理解在C++11中wait_for和wait_until是如何工作的。 代码 - #include #include #include #include ...
我有一个有自己线程的类 player: player.cpp int soh::player::instance_count{ 0 }; int soh::player::current_player{ -1 }; std::mutex soh::player::mutex; std::condition_variable soh::player::...