我试图使用线程池在多个线程之间分配工作,然后每个线程可以通过 stdout 将数据传递给子程序(在本例中
cat
来验证它的操作。
每当我将线程数指定为 1 以上时,程序就会挂起(使其所有子程序保持运行),但当线程数等于 1 时,程序总是按预期退出。
该问题的最小可重现示例如下:
#include <atomic>
#include <csignal>
#include <functional>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <condition_variable>
#include <queue>
using namespace std;
atomic<bool> running(true);
mutex coutMutex;
void handle_signal(int signal) {
if (signal == SIGINT) {
running = false;
}
}
class ThreadPool {
vector<thread> threads;
queue<function<void()>> taskQueue;
mutex queueMutex;
condition_variable cv;
atomic<bool> stop;
public:
ThreadPool(size_t threadCount) : stop(false) {
for (size_t i = 0; i < threadCount; ++i) {
threads.emplace_back([this] {
while (true) {
function<void()> task;
{
unique_lock<mutex> lock(queueMutex);
cv.wait(lock, [this] { return stop || !taskQueue.empty(); });
if (stop && taskQueue.empty()) return;
task = std::move(taskQueue.front());
taskQueue.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
unique_lock<mutex> lock(queueMutex);
stop = true;
}
cv.notify_all();
for (thread& t : threads) {
t.join();
}
}
void enqueueTask(function<void()> task) {
{
unique_lock<mutex> lock(queueMutex);
taskQueue.push(std::move(task));
}
cv.notify_one();
}
};
void workerFunction(size_t start, size_t end, const vector<const char*>& args) {
int pipefd[2];
if (pipe(pipefd) < 0) {
perror("pipe");
return;
}
// Set the write end of the pipe to non-blocking
int flags = fcntl(pipefd[1], F_GETFL, 0);
fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK);
pid_t pid = fork();
if (pid < 0) {
perror("fork");
return;
}
if (pid == 0) {
// In the child process
close(pipefd[1]); // Close unused write end
dup2(pipefd[0], STDIN_FILENO);
close(pipefd[0]);
execvp("cat", const_cast<char* const*>(args.data()));
perror("execvp");
exit(EXIT_FAILURE);
} else {
// In the parent process
close(pipefd[0]); // Close unused read end
string data = "Example data ";
for (size_t i = start; i < end && running; ++i) {
size_t written = 0;
while (written < data.size() && running) {
ssize_t bytes = write(pipefd[1], data.c_str() + written, data.size() - written);
if (bytes == -1) {
perror("write");
break; // Exit the loop on error
}
written += bytes;
}
}
{
lock_guard<mutex> lock(coutMutex);
cout << "Reached end!" << endl;
}
close(pipefd[1]); // Ensure write end is closed
int status;
waitpid(pid, &status, 0);
if (WIFEXITED(status)) {
lock_guard<mutex> lock(coutMutex);
cout << "Worker finished. Exit code: " << WEXITSTATUS(status) << endl;
} else if (WIFSIGNALED(status)) {
lock_guard<mutex> lock(coutMutex);
cerr << "Worker terminated by signal: " << WTERMSIG(status) << endl;
}
}
}
int main(int argc, char* argv[]) {
signal(SIGINT, handle_signal); // Handle exiting early gracefully for the child processes
vector<const char*> args = {"cat", nullptr};
size_t totalCombinations = 100; // Example number
size_t threadCount = 1; // exits fine with 1 thread, hangs >1
ThreadPool threadPool(threadCount);
size_t chunkSize = totalCombinations / threadCount;
size_t remainder = totalCombinations % threadCount;
size_t start = 0;
for (size_t i = 0; i < threadCount; ++i) {
size_t end = start + chunkSize + (i < remainder ? 1 : 0); // Chunk up our tasks
threadPool.enqueueTask([=, &args]() {
workerFunction(start, end, args);
});
start = end;
}
return 0;
}
它应该由最新版本的 g++ 编译。
你有一个竞争条件,如果两个线程同时分叉,那么这 3 个进程将打开管道的 4 端,你需要在它周围添加一个互斥锁。
#include <atomic>
#include <csignal>
#include <functional>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <condition_variable>
#include <queue>
using namespace std;
atomic<bool> running(true);
mutex coutMutex;
mutex fork_mutex;
void handle_signal(int signal) {
if (signal == SIGINT) {
running = false;
}
}
class ThreadPool {
vector<thread> threads;
queue<function<void()>> taskQueue;
mutex queueMutex;
condition_variable cv;
atomic<bool> stop;
public:
ThreadPool(size_t threadCount) : stop(false) {
for (size_t i = 0; i < threadCount; ++i) {
threads.emplace_back([this] {
while (true) {
function<void()> task;
{
unique_lock<mutex> lock(queueMutex);
cv.wait(lock, [this] { return stop || !taskQueue.empty(); });
if (stop && taskQueue.empty()) return;
task = std::move(taskQueue.front());
taskQueue.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
unique_lock<mutex> lock(queueMutex);
stop = true;
}
cv.notify_all();
for (thread& t : threads) {
t.join();
}
}
void enqueueTask(function<void()> task) {
{
unique_lock<mutex> lock(queueMutex);
taskQueue.push(std::move(task));
}
cv.notify_one();
}
};
void workerFunction(size_t start, size_t end, const vector<const char*>& args) {
int pipefd[2];
pid_t pid;
{
std::lock_guard l(fork_mutex);
if (pipe(pipefd) < 0) {
perror("pipe");
return;
}
// Set the write end of the pipe to non-blocking
int flags = fcntl(pipefd[1], F_GETFL, 0);
fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK);
pid = fork();
}
if (pid < 0) {
perror("fork");
return;
}
if (pid == 0) {
// In the child process
close(pipefd[1]); // Close unused write end
dup2(pipefd[0], STDIN_FILENO);
close(pipefd[0]);
execvp("cat", const_cast<char* const*>(args.data()));
perror("execvp");
exit(EXIT_FAILURE);
} else {
// In the parent process
close(pipefd[0]); // Close unused read end
string data = "Example data ";
for (size_t i = start; i < end && running; ++i) {
size_t written = 0;
while (written < data.size() && running) {
ssize_t bytes = write(pipefd[1], data.c_str() + written, data.size() - written);
if (bytes == -1) {
perror("write");
break; // Exit the loop on error
}
written += bytes;
}
}
{
lock_guard<mutex> lock(coutMutex);
cout << "Reached end!" << endl;
}
close(pipefd[1]); // Ensure write end is closed
int status;
waitpid(pid, &status, 0);
if (WIFEXITED(status)) {
lock_guard<mutex> lock(coutMutex);
cout << "Worker finished. Exit code: " << WEXITSTATUS(status) << endl;
} else if (WIFSIGNALED(status)) {
lock_guard<mutex> lock(coutMutex);
cerr << "Worker terminated by signal: " << WTERMSIG(status) << endl;
}
}
}
int main(int argc, char* argv[]) {
signal(SIGINT, handle_signal); // Handle exiting early gracefully for the child processes
vector<const char*> args = {"cat", nullptr};
size_t totalCombinations = 100; // Example number
size_t threadCount = 2; // fine with 1 thread, hangs >1
ThreadPool threadPool(threadCount);
size_t chunkSize = totalCombinations / threadCount;
size_t remainder = totalCombinations % threadCount;
size_t start = 0;
for (size_t i = 0; i < threadCount; ++i) {
size_t end = start + chunkSize + (i < remainder ? 1 : 0); // Chunk up our tasks
threadPool.enqueueTask([=, &args]() {
workerFunction(start, end, args);
});
start = end;
}
return 0;
}