C++ ThreadPool 在多个线程中挂起,但在单线程中工作

问题描述 投票:0回答:1

我试图使用线程池在多个线程之间分配工作,然后每个线程可以通过 stdout 将数据传递给子程序(在本例中

cat
来验证它的操作。

每当我将线程数指定为 1 以上时,程序就会挂起(使其所有子程序保持运行),但当线程数等于 1 时,程序总是按预期退出。

该问题的最小可重现示例如下:

#include <atomic>
#include <csignal>
#include <functional>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <condition_variable>
#include <queue>

using namespace std;

atomic<bool> running(true);
mutex coutMutex;

void handle_signal(int signal) {
    if (signal == SIGINT) {
        running = false;
    }
}

class ThreadPool {
    vector<thread> threads;
    queue<function<void()>> taskQueue;
    mutex queueMutex;
    condition_variable cv;
    atomic<bool> stop;

public:
    ThreadPool(size_t threadCount) : stop(false) {
        for (size_t i = 0; i < threadCount; ++i) {
            threads.emplace_back([this] {
                while (true) {
                    function<void()> task;
                    {
                        unique_lock<mutex> lock(queueMutex);
                        cv.wait(lock, [this] { return stop || !taskQueue.empty(); });
                        if (stop && taskQueue.empty()) return;
                        task = std::move(taskQueue.front());
                        taskQueue.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            unique_lock<mutex> lock(queueMutex);
            stop = true;
        }
        cv.notify_all();
        for (thread& t : threads) {
            t.join();
        }
    }

    void enqueueTask(function<void()> task) {
        {
            unique_lock<mutex> lock(queueMutex);
            taskQueue.push(std::move(task));
        }
        cv.notify_one();
    }
};

void workerFunction(size_t start, size_t end, const vector<const char*>& args) {
    int pipefd[2];

    if (pipe(pipefd) < 0) {
        perror("pipe");
        return;
    }

    // Set the write end of the pipe to non-blocking
    int flags = fcntl(pipefd[1], F_GETFL, 0);
    fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK);

    pid_t pid = fork();
    if (pid < 0) {
        perror("fork");
        return;
    }

    if (pid == 0) {
        // In the child process
        close(pipefd[1]);  // Close unused write end
        dup2(pipefd[0], STDIN_FILENO);
        close(pipefd[0]);

        execvp("cat", const_cast<char* const*>(args.data()));
        perror("execvp");
        exit(EXIT_FAILURE);
    } else {
        // In the parent process
        close(pipefd[0]);  // Close unused read end

        string data = "Example data ";
        
        for (size_t i = start; i < end && running; ++i) {
            size_t written = 0;
            while (written < data.size() && running) {
                ssize_t bytes = write(pipefd[1], data.c_str() + written, data.size() - written);
                if (bytes == -1) {
                    perror("write");
                    break;  // Exit the loop on error
                }
                written += bytes;
            }
        }
        {
            lock_guard<mutex> lock(coutMutex);
            cout << "Reached end!" << endl;
        }
        
        close(pipefd[1]); // Ensure write end is closed
        int status;
        waitpid(pid, &status, 0);

        if (WIFEXITED(status)) {
            lock_guard<mutex> lock(coutMutex);
            cout << "Worker finished. Exit code: " << WEXITSTATUS(status) << endl;
        } else if (WIFSIGNALED(status)) {
            lock_guard<mutex> lock(coutMutex);
            cerr << "Worker terminated by signal: " << WTERMSIG(status) << endl;
        }
    }
}


int main(int argc, char* argv[]) {
    signal(SIGINT, handle_signal); // Handle exiting early gracefully for the child processes

    vector<const char*> args = {"cat", nullptr};

    size_t totalCombinations = 100;  // Example number
    size_t threadCount = 1; // exits fine with 1 thread, hangs >1
    ThreadPool threadPool(threadCount);

    size_t chunkSize = totalCombinations / threadCount;
    size_t remainder = totalCombinations % threadCount;

    size_t start = 0;
    for (size_t i = 0; i < threadCount; ++i) {
        size_t end = start + chunkSize + (i < remainder ? 1 : 0); // Chunk up our tasks
        threadPool.enqueueTask([=, &args]() {
            workerFunction(start, end, args);
        });
        start = end;
    }

    return 0;
}

它应该由最新版本的 g++ 编译。

c++ multithreading fork
1个回答
0
投票

你有一个竞争条件,如果两个线程同时分叉,那么这 3 个进程将打开管道的 4 端,你需要在它周围添加一个互斥锁。

#include <atomic>
#include <csignal>
#include <functional>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <errno.h>
#include <condition_variable>
#include <queue>

using namespace std;

atomic<bool> running(true);
mutex coutMutex;

mutex fork_mutex;

void handle_signal(int signal) {
    if (signal == SIGINT) {
        running = false;
    }
}

class ThreadPool {
    vector<thread> threads;
    queue<function<void()>> taskQueue;
    mutex queueMutex;
    condition_variable cv;
    atomic<bool> stop;

public:
    ThreadPool(size_t threadCount) : stop(false) {
        for (size_t i = 0; i < threadCount; ++i) {
            threads.emplace_back([this] {
                while (true) {
                    function<void()> task;
                    {
                        unique_lock<mutex> lock(queueMutex);
                        cv.wait(lock, [this] { return stop || !taskQueue.empty(); });
                        if (stop && taskQueue.empty()) return;
                        task = std::move(taskQueue.front());
                        taskQueue.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            unique_lock<mutex> lock(queueMutex);
            stop = true;
        }
        cv.notify_all();
        for (thread& t : threads) {
            t.join();
        }
    }

    void enqueueTask(function<void()> task) {
        {
            unique_lock<mutex> lock(queueMutex);
            taskQueue.push(std::move(task));
        }
        cv.notify_one();
    }
};

void workerFunction(size_t start, size_t end, const vector<const char*>& args) {
    
    int pipefd[2];
    pid_t pid;
    {
    std::lock_guard l(fork_mutex);
    if (pipe(pipefd) < 0) {
        perror("pipe");
            return;
    }

    // Set the write end of the pipe to non-blocking
    int flags = fcntl(pipefd[1], F_GETFL, 0);
    fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK);

    pid = fork();
    }
    if (pid < 0) {
        perror("fork");
        return;
    }

    if (pid == 0) {
        // In the child process
        close(pipefd[1]);  // Close unused write end
        dup2(pipefd[0], STDIN_FILENO);
        close(pipefd[0]);

        execvp("cat", const_cast<char* const*>(args.data()));
        perror("execvp");
        exit(EXIT_FAILURE);
    } else {
        // In the parent process
        close(pipefd[0]);  // Close unused read end

        string data = "Example data ";
        
        for (size_t i = start; i < end && running; ++i) {
            size_t written = 0;
            while (written < data.size() && running) {
                ssize_t bytes = write(pipefd[1], data.c_str() + written, data.size() - written);
                if (bytes == -1) {
                    perror("write");
                    break;  // Exit the loop on error
                }
                written += bytes;
            }
        }
        {
            lock_guard<mutex> lock(coutMutex);
            cout << "Reached end!" << endl;
        }
        
        close(pipefd[1]); // Ensure write end is closed
        int status;
        waitpid(pid, &status, 0);

        if (WIFEXITED(status)) {
            lock_guard<mutex> lock(coutMutex);
            cout << "Worker finished. Exit code: " << WEXITSTATUS(status) << endl;
        } else if (WIFSIGNALED(status)) {
            lock_guard<mutex> lock(coutMutex);
            cerr << "Worker terminated by signal: " << WTERMSIG(status) << endl;
        }
    }
}


int main(int argc, char* argv[]) {
    signal(SIGINT, handle_signal); // Handle exiting early gracefully for the child processes

    vector<const char*> args = {"cat", nullptr};

    size_t totalCombinations = 100;  // Example number
    size_t threadCount = 2; // fine with 1 thread, hangs >1
    ThreadPool threadPool(threadCount);

    size_t chunkSize = totalCombinations / threadCount;
    size_t remainder = totalCombinations % threadCount;

    size_t start = 0;
    for (size_t i = 0; i < threadCount; ++i) {
        size_t end = start + chunkSize + (i < remainder ? 1 : 0); // Chunk up our tasks
        threadPool.enqueueTask([=, &args]() {
            workerFunction(start, end, args);
        });
        start = end;
    }

    return 0;
}
© www.soinside.com 2019 - 2024. All rights reserved.