在 C++ 中使用线程中向量的引用来解除分配运行时问题

问题描述 投票:0回答:1

我已经实现了使用向量引用的线程,并且不会根据我添加的通知系统同时更改相同的值,因此不应该存在任何原子性问题。我认为。可能存在

isAnyOne
布尔值,但它们会设置为相同的值,因此我希望不应该存在竞争条件。 我找不到解决我遇到的运行时问题的方法。这个问题就像使用调试器运行时的断点一样,因为它会导致内存泄漏。 问题是,在运行 coe 结束时,调试器将我指向此处: enter image description here 该程序的完整代码如下所示:

#include <iostream>
#include <vector>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <algorithm>

using namespace std;
using namespace std::chrono;

const uint8_t BASE = 10;
mutex mtx;
condition_variable cv;

void OutputList(vector<uint8_t> &inputNumbers) {
    for (int i = 0; i < inputNumbers.size(); i++) {
        if (i == inputNumbers.size() - 1) {
            cout << unsigned(inputNumbers[i]) << " ]";
        }
        else if (i == 0) {
            cout << "[ " << unsigned(inputNumbers[i]) << ", ";
        }
        else {
            cout << unsigned(inputNumbers[i]) << ", ";
        }
    }
    cout << endl;
}

// Set the doneThreads item at threadIndex to true when operation done.
void ThreadHandler(vector<int> &doneThreads, int threadIndex, bool &finishedIterations, vector<uint8_t> &input, vector<int> &startIndexes, vector<int> &endIndexes, vector<uint8_t> &output, vector<uint8_t> &carries, int &iterationCount, bool &isAnyOne) {
    // Continue processing until all iterations are finished
    while (!finishedIterations) {
        unique_lock<mutex> lock(mtx); // Acquire a unique lock on the mutex to ensure thread safety
        cv.wait(lock, [&doneThreads, threadIndex] {
            // If all threads aren't done then start
            return doneThreads[threadIndex] == 0;
        });

        // Do Work Here
        cout << "Doing Work on thread!" << endl;
        for (int i = startIndexes[threadIndex]; i < endIndexes[threadIndex]; i++) {
            uint8_t intermediate = input[i] + input[input.size() - (i + 1)];
            uint8_t localCarry = (intermediate >= 10) ? 1 : 0;
            output[i] = localCarry ? intermediate - 10 : intermediate;
            carries[i+1] = localCarry;
            if (localCarry == 1) {
                isAnyOne = true;
            }
        }
        // Mark this thread's work as done for the current iteration
        doneThreads[threadIndex] += 1;
        lock.unlock();
        cv.notify_all();
    }
}

void ControllerThread(int &iterationCount, bool &finishedIterations, vector<int> &doneThreads, vector<int> &startIndexes, vector<int> &endIndexes, vector<uint8_t> &output, vector<uint8_t> &carries, bool &isAnyOne, int numThreads, vector<uint8_t> &input) {
    iterationCount = 0;
    while(iterationCount < 10) {
        unique_lock<mutex> lock(mtx);
        cv.wait(lock, [&doneThreads] {
            // If all threads are done then start
            return all_of(doneThreads.begin(), doneThreads.end(), [](int done) {return done >= 1;});
        });

        cout << "Running main code!" << endl;

        // Set conditions for threads
        int itemsPerThread = input.size() / numThreads;
        // If the input size and number of threads aren't divisible
        int lastThreadExcess = input.size() % numThreads;
        // Initializing all the threads
        for (int i = 0; i < numThreads; i++) {
            int startIndex = i * itemsPerThread;
            int endIndex = (i+1) * itemsPerThread;
            if (i == numThreads - 1) {
                endIndex += lastThreadExcess;
            }
            startIndexes[i] = startIndex;
            endIndexes[i] = endIndex;
        }

        // Carry propogation area
        while (isAnyOne) {
            isAnyOne = false;
            for (int i = 0; i < carries.size(); i++) {
                uint8_t intermediate;
                if (output.size() > i) {
                    intermediate = output.at(i) + carries.at(i);
                    carries[i] = 0;
                    uint8_t localCarry = (intermediate >= 10) ? 1 : 0;
                    if (localCarry == 1) {
                        isAnyOne = true;
                        intermediate -= 10;
                        if (carries.size() > i)
                            carries[i+1] = localCarry;
                        else
                            carries.push_back(localCarry);
                    }
                    output[i] = intermediate;
                }
                else if (carries[i] > 0) {
                    intermediate = carries.at(i);
                    carries[i] = 0;
                    output.push_back(intermediate);
                    break;
                }
            }
        }
        OutputList(output);

        // Resetting the threads to start doing tasks again
        for (int j = 0; j < doneThreads.size(); j++) {
            doneThreads[j] = 0;
        }
        // Starting the threads to do their tasks
        lock.unlock();
        cv.notify_all();
        if (iterationCount > 0) {
            input = output;
        }
        iterationCount++;
    }
    // This top section of thread processing is finished so setting this variable
    finishedIterations = true;
}

int main() {
    vector<uint8_t> inputNumbers;
    int startNumber = 196;
    int quotient = startNumber;
    while (quotient != 0) {
        uint8_t remainder = quotient % 10;
        // Add the remainder to the list
        inputNumbers.push_back(remainder);
        // Get the new quotient
        quotient = quotient / 10;
    }
    // OutputList(inputNumbers);
    auto start = high_resolution_clock::now();
    {
        vector<uint8_t> output(inputNumbers.size(), 0);
        vector<uint8_t> carries(inputNumbers.size() + 1, 0);
        vector<thread> handles;
        int numThreads = 1;
        if (numThreads > inputNumbers.size()) {
            numThreads = inputNumbers.size();
        }
        vector<int> doneThreads(numThreads, 0);
        vector<int> startIndexes(numThreads, 0);
        vector<int> endIndexes(numThreads, 0);
        bool finishedIterations = false;
        int iterationCount = 0;
        bool isAnyOne = false;

        // Set up the threads
        for (int i = 0; i < numThreads; i++) {
            handles.emplace_back(ThreadHandler, ref(doneThreads), i, ref(finishedIterations), ref(inputNumbers), ref(startIndexes), ref(endIndexes), ref(output), ref(carries), ref(iterationCount), ref(isAnyOne));
        }

        // Set up the controller thread, for the iterating through
        thread controller(ControllerThread, ref(iterationCount), ref(finishedIterations), ref(doneThreads), ref(startIndexes), ref(endIndexes), ref(output), ref(carries), ref(isAnyOne), numThreads, ref(inputNumbers));

        // Join all threads once all operations have happened
        for (auto &handle : handles) {
            handle.join();
        }
        controller.join();
    }
    auto stop = high_resolution_clock::now();
    auto duration = duration_cast<milliseconds>(stop - start);
    cout << "Number generated that is: " << inputNumbers.size() << " in length and took : " << duration.count() << " ms" << endl;
    // OutputList(inputNumbers);
    return 0;
}

感谢您的帮助。我不知道为什么会发生这种情况。

我尝试更改为智能指针并添加原子性,我认为这可能是一个问题,但没有成功。任何解决此问题的方法将不胜感激。

c++ multithreading vector memory-management reference
1个回答
0
投票

感谢@edrezen 为我找到问题解决方案的帮助。我不敢相信事情是如此简单。索引错误。我稍微更新了代码以修复此索引错误,进位数组应始终比输入数组大 1。控制器线程的更新代码如下所示:

void ControllerThread(int &iterationCount, bool &finishedIterations, vector<int> &doneThreads, vector<int> &startIndexes, vector<int> &endIndexes, vector<uint8_t> &output, vector<uint8_t> &carries, bool &isAnyOne, int numThreads, vector<uint8_t> &input) {
    iterationCount = 0;
    while(iterationCount < totalIterations) {
        unique_lock<mutex> lock(mtx);
        cv.wait(lock, [&doneThreads] {
            // If all threads are done then start
            return all_of(doneThreads.begin(), doneThreads.end(), [](int done) {return done >= 1;});
        });

        // Carry propogation area
        while (isAnyOne) {
            isAnyOne = false;
            for (int i = 0; i < carries.size(); i++) {
                uint8_t intermediate;
                if (output.size() > i) {
                    intermediate = output.at(i) + carries.at(i);
                    carries[i] = 0;
                    uint8_t localCarry = (intermediate >= 10) ? 1 : 0;
                    if (localCarry == 1) {
                        isAnyOne = true;
                        intermediate -= 10;
                        if (carries.size() > i)
                            carries[i+1] = localCarry;
                        else
                            carries.push_back(localCarry);
                    }
                    output[i] = intermediate;
                }
                else if (carries[i] > 0) {
                    intermediate = carries.at(i);
                    carries[i] = 0;
                    output.push_back(intermediate);
                    break;
                }
            }
        }
        OutputList(output);

        if (iterationCount > 0) {
            input = output;
        }
        while (carries.size() < input.size() + 1) {
            carries.push_back(0);
        }
        // Set conditions for threads
        int itemsPerThread = input.size() / numThreads;
        // If the input size and number of threads aren't divisible
        int lastThreadExcess = input.size() % numThreads;
        // Initializing all the threads
        for (int i = 0; i < numThreads; i++) {
            int startIndex = i * itemsPerThread;
            int endIndex = (i+1) * itemsPerThread;
            if (i == numThreads - 1) {
                endIndex += lastThreadExcess;
            }
            startIndexes[i] = startIndex;
            endIndexes[i] = endIndex;
        }
        // Resetting the threads to start doing tasks again
        for (int j = 0; j < doneThreads.size(); j++) {
            doneThreads[j] = 0;
        }
        // Starting the threads to do their tasks
        lock.unlock();
        cv.notify_all();
        iterationCount++;
    }
    // This top section of thread processing is finished so setting this variable
    finishedIterations = true;
}
© www.soinside.com 2019 - 2024. All rights reserved.