我已经实现了使用向量引用的线程,并且不会根据我添加的通知系统同时更改相同的值,因此不应该存在任何原子性问题。我认为。可能存在
isAnyOne
布尔值,但它们会设置为相同的值,因此我希望不应该存在竞争条件。
我找不到解决我遇到的运行时问题的方法。这个问题就像使用调试器运行时的断点一样,因为它会导致内存泄漏。
问题是,在运行 coe 结束时,调试器将我指向此处:
该程序的完整代码如下所示:
#include <iostream>
#include <vector>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <algorithm>
using namespace std;
using namespace std::chrono;
const uint8_t BASE = 10;
mutex mtx;
condition_variable cv;
void OutputList(vector<uint8_t> &inputNumbers) {
for (int i = 0; i < inputNumbers.size(); i++) {
if (i == inputNumbers.size() - 1) {
cout << unsigned(inputNumbers[i]) << " ]";
}
else if (i == 0) {
cout << "[ " << unsigned(inputNumbers[i]) << ", ";
}
else {
cout << unsigned(inputNumbers[i]) << ", ";
}
}
cout << endl;
}
// Set the doneThreads item at threadIndex to true when operation done.
void ThreadHandler(vector<int> &doneThreads, int threadIndex, bool &finishedIterations, vector<uint8_t> &input, vector<int> &startIndexes, vector<int> &endIndexes, vector<uint8_t> &output, vector<uint8_t> &carries, int &iterationCount, bool &isAnyOne) {
// Continue processing until all iterations are finished
while (!finishedIterations) {
unique_lock<mutex> lock(mtx); // Acquire a unique lock on the mutex to ensure thread safety
cv.wait(lock, [&doneThreads, threadIndex] {
// If all threads aren't done then start
return doneThreads[threadIndex] == 0;
});
// Do Work Here
cout << "Doing Work on thread!" << endl;
for (int i = startIndexes[threadIndex]; i < endIndexes[threadIndex]; i++) {
uint8_t intermediate = input[i] + input[input.size() - (i + 1)];
uint8_t localCarry = (intermediate >= 10) ? 1 : 0;
output[i] = localCarry ? intermediate - 10 : intermediate;
carries[i+1] = localCarry;
if (localCarry == 1) {
isAnyOne = true;
}
}
// Mark this thread's work as done for the current iteration
doneThreads[threadIndex] += 1;
lock.unlock();
cv.notify_all();
}
}
void ControllerThread(int &iterationCount, bool &finishedIterations, vector<int> &doneThreads, vector<int> &startIndexes, vector<int> &endIndexes, vector<uint8_t> &output, vector<uint8_t> &carries, bool &isAnyOne, int numThreads, vector<uint8_t> &input) {
iterationCount = 0;
while(iterationCount < 10) {
unique_lock<mutex> lock(mtx);
cv.wait(lock, [&doneThreads] {
// If all threads are done then start
return all_of(doneThreads.begin(), doneThreads.end(), [](int done) {return done >= 1;});
});
cout << "Running main code!" << endl;
// Set conditions for threads
int itemsPerThread = input.size() / numThreads;
// If the input size and number of threads aren't divisible
int lastThreadExcess = input.size() % numThreads;
// Initializing all the threads
for (int i = 0; i < numThreads; i++) {
int startIndex = i * itemsPerThread;
int endIndex = (i+1) * itemsPerThread;
if (i == numThreads - 1) {
endIndex += lastThreadExcess;
}
startIndexes[i] = startIndex;
endIndexes[i] = endIndex;
}
// Carry propogation area
while (isAnyOne) {
isAnyOne = false;
for (int i = 0; i < carries.size(); i++) {
uint8_t intermediate;
if (output.size() > i) {
intermediate = output.at(i) + carries.at(i);
carries[i] = 0;
uint8_t localCarry = (intermediate >= 10) ? 1 : 0;
if (localCarry == 1) {
isAnyOne = true;
intermediate -= 10;
if (carries.size() > i)
carries[i+1] = localCarry;
else
carries.push_back(localCarry);
}
output[i] = intermediate;
}
else if (carries[i] > 0) {
intermediate = carries.at(i);
carries[i] = 0;
output.push_back(intermediate);
break;
}
}
}
OutputList(output);
// Resetting the threads to start doing tasks again
for (int j = 0; j < doneThreads.size(); j++) {
doneThreads[j] = 0;
}
// Starting the threads to do their tasks
lock.unlock();
cv.notify_all();
if (iterationCount > 0) {
input = output;
}
iterationCount++;
}
// This top section of thread processing is finished so setting this variable
finishedIterations = true;
}
int main() {
vector<uint8_t> inputNumbers;
int startNumber = 196;
int quotient = startNumber;
while (quotient != 0) {
uint8_t remainder = quotient % 10;
// Add the remainder to the list
inputNumbers.push_back(remainder);
// Get the new quotient
quotient = quotient / 10;
}
// OutputList(inputNumbers);
auto start = high_resolution_clock::now();
{
vector<uint8_t> output(inputNumbers.size(), 0);
vector<uint8_t> carries(inputNumbers.size() + 1, 0);
vector<thread> handles;
int numThreads = 1;
if (numThreads > inputNumbers.size()) {
numThreads = inputNumbers.size();
}
vector<int> doneThreads(numThreads, 0);
vector<int> startIndexes(numThreads, 0);
vector<int> endIndexes(numThreads, 0);
bool finishedIterations = false;
int iterationCount = 0;
bool isAnyOne = false;
// Set up the threads
for (int i = 0; i < numThreads; i++) {
handles.emplace_back(ThreadHandler, ref(doneThreads), i, ref(finishedIterations), ref(inputNumbers), ref(startIndexes), ref(endIndexes), ref(output), ref(carries), ref(iterationCount), ref(isAnyOne));
}
// Set up the controller thread, for the iterating through
thread controller(ControllerThread, ref(iterationCount), ref(finishedIterations), ref(doneThreads), ref(startIndexes), ref(endIndexes), ref(output), ref(carries), ref(isAnyOne), numThreads, ref(inputNumbers));
// Join all threads once all operations have happened
for (auto &handle : handles) {
handle.join();
}
controller.join();
}
auto stop = high_resolution_clock::now();
auto duration = duration_cast<milliseconds>(stop - start);
cout << "Number generated that is: " << inputNumbers.size() << " in length and took : " << duration.count() << " ms" << endl;
// OutputList(inputNumbers);
return 0;
}
感谢您的帮助。我不知道为什么会发生这种情况。
我尝试更改为智能指针并添加原子性,我认为这可能是一个问题,但没有成功。任何解决此问题的方法将不胜感激。
感谢@edrezen 为我找到问题解决方案的帮助。我不敢相信事情是如此简单。索引错误。我稍微更新了代码以修复此索引错误,进位数组应始终比输入数组大 1。控制器线程的更新代码如下所示:
void ControllerThread(int &iterationCount, bool &finishedIterations, vector<int> &doneThreads, vector<int> &startIndexes, vector<int> &endIndexes, vector<uint8_t> &output, vector<uint8_t> &carries, bool &isAnyOne, int numThreads, vector<uint8_t> &input) {
iterationCount = 0;
while(iterationCount < totalIterations) {
unique_lock<mutex> lock(mtx);
cv.wait(lock, [&doneThreads] {
// If all threads are done then start
return all_of(doneThreads.begin(), doneThreads.end(), [](int done) {return done >= 1;});
});
// Carry propogation area
while (isAnyOne) {
isAnyOne = false;
for (int i = 0; i < carries.size(); i++) {
uint8_t intermediate;
if (output.size() > i) {
intermediate = output.at(i) + carries.at(i);
carries[i] = 0;
uint8_t localCarry = (intermediate >= 10) ? 1 : 0;
if (localCarry == 1) {
isAnyOne = true;
intermediate -= 10;
if (carries.size() > i)
carries[i+1] = localCarry;
else
carries.push_back(localCarry);
}
output[i] = intermediate;
}
else if (carries[i] > 0) {
intermediate = carries.at(i);
carries[i] = 0;
output.push_back(intermediate);
break;
}
}
}
OutputList(output);
if (iterationCount > 0) {
input = output;
}
while (carries.size() < input.size() + 1) {
carries.push_back(0);
}
// Set conditions for threads
int itemsPerThread = input.size() / numThreads;
// If the input size and number of threads aren't divisible
int lastThreadExcess = input.size() % numThreads;
// Initializing all the threads
for (int i = 0; i < numThreads; i++) {
int startIndex = i * itemsPerThread;
int endIndex = (i+1) * itemsPerThread;
if (i == numThreads - 1) {
endIndex += lastThreadExcess;
}
startIndexes[i] = startIndex;
endIndexes[i] = endIndex;
}
// Resetting the threads to start doing tasks again
for (int j = 0; j < doneThreads.size(); j++) {
doneThreads[j] = 0;
}
// Starting the threads to do their tasks
lock.unlock();
cv.notify_all();
iterationCount++;
}
// This top section of thread processing is finished so setting this variable
finishedIterations = true;
}