所以我遇到了一个问题:当我尝试创建最后一个线程时,它总是说核心被转储了。如果我写创建5或2个线程无关紧要。这是我的代码:UPD:现在我不能做超过3个线程和线程不执行我希望他们做的功能(消耗和生产)
UPD_2:现在我有一个这样的消息:终止调用后终止调用'终止调用递归终止调用递归中止(核心转储)
#include<cstdlib>
#include <iostream>
#include <string>
#include <mutex>
#include <pthread.h>
#include <condition_variable>
#define NUM_THREADS 4
using namespace std;
struct thread_data
{
int thread_id;
int repeat;
};
class our_monitor{
private:
int buffer[100];
mutex m;
int n = 0, lo = 0, hi = 0;
condition_variable in,out;
unique_lock<mutex> lk;
public:
our_monitor():lk(m)
{
}
void insert(int val, int repeat)
{
in.wait(lk, [&]{return n <= 100-repeat;});
for(int i=0; i<repeat; i++)
{
buffer[hi] = val;
hi = (hi + 1) % 100; //ring buffer
n = n +1; //one more item in buffer
}
lk.unlock();
out.notify_one();
}
int remove(int repeat)
{
out.wait(lk, [&]{return n >= repeat;});
int val;
for(int i=0; i<repeat; i++)
{
val = buffer[lo];
lo = (lo + 1) % 100;
n -= 1;
}
lk.unlock();
in.notify_one();
return val;
}
};
our_monitor mon;
void* produce(void *threadarg)
{
struct thread_data *my_data;
my_data = (struct thread_data *) threadarg;
cout<<"IN produce after paramiters"<< my_data->repeat<<endl;
int item;
item = rand()%100 + 1;
mon.insert(item, my_data->repeat);
cout<< "Item: "<< item << " Was prodused by thread:"<< my_data->thread_id << endl;
}
void* consume(void *threadarg)
{
struct thread_data *my_data;
my_data = (struct thread_data *) threadarg;
cout<<"IN consume after paramiters"<< my_data->repeat<<endl;
int item;
item = mon.remove(my_data->repeat);
if(item) cout<< "Item: "<< item << " Was consumed by thread:"<< my_data->thread_id << endl;
}
int main()
{
our_monitor *mon = new our_monitor();
pthread_t threads[NUM_THREADS];
thread_data td[NUM_THREADS];
int rc;
int i;
for( i = 0; i < NUM_THREADS; i++ )
{
td[i].thread_id = i;
td[i].repeat = rand()%5 + 1;
if(i % 2 == 0)
{
cout << "main() : creating produce thread, " << i << endl;
rc = pthread_create(&threads[i], NULL, produce, (void*) &td[i]);
if (rc)
{
cout << "Error:unable to create thread," << rc << endl;
exit(-1);
}
} else
{
cout << "main() : creating consume thread, " << i << endl;
rc = pthread_create(&threads[i], NULL, consume, (void *)&td[i]);
if (rc)
{
cout << "Error:unable to create thread," << rc << endl;
exit(-1);
}
}
}
pthread_join(threads[0], NULL);
pthread_join(threads[1], NULL);
pthread_join(threads[2], NULL);
//pthread_exit(NULL);
}
UPD:现在我不能做超过3个线程和线程不执行我希望他们做的功能(消费和生产)
UPD_2:现在我有一个这样的消息:终止调用后终止调用'终止调用递归终止调用递归中止(核心转储)
从cppref关于std :: condition_variable.wait(...)
“如果当前线程未锁定lock.mutex(),则调用此函数是未定义的行为。”
http://en.cppreference.com/w/cpp/thread/condition_variable/wait
不幸的是,该程序不会在第47行崩溃,而是在第55行崩溃,在那里您解锁未锁定的锁。
输入功能时锁定锁定。我已经快速检查了你剩下的逻辑,我觉得85%肯定没问题。
虽然我有你在这里,但这不是绝对必要的,但这是一个很好的做法。 std :: lock_guard和std :: unique_lock在进入范围时自动锁定互斥锁,并在它离开范围时解锁它。这有助于简化异常处理和奇怪的函数返回。我建议您将lk作为成员变量删除,并将其用作范围局部变量。
void insert(int val, int repeat)
{
{ // Scoped. Somewhat pedantic in this case, but it's always best to signal after the mutex is unlocked
std::unique_lock<std::mutex> lk(m);
in.wait(lk, [&]{return n <= 100-repeat;});
for(int i=0; i<repeat; i++)
{
buffer[hi] = val;
hi = (hi + 1) % 100; //ring buffer
n = n +1; //one more item in buffer
}
}
out.notify_one();
}
好的,现在是最后一期。生产者/消费者的好处是我们可以同时生产和消费。但是,我们只是锁定了我们的功能,因此不再可能。您现在可以做的是在for循环中移动条件锁定/等待/解锁/工作/信号
在伪代码中:
// produce:
while (true)
{
{
unique_lock lk(m)
wait(m, predicate)
}
produce 1
signal
}
这相当于使用信号量(C ++ '11 stl没有,但你可以很容易地创建自己的信号量,如上所示。)
// produce:
semaphore in(100);
semaphore out(0);
while (true)
{
in.down(1) // Subtracts 1 from in.count. Blocks when in.count == 0 (meaning the buffer is full)
produce 1
out.up(1) // Adds 1 to out.count
}
当main
结束时,td
超出范围而不再存在。但是你将指针传递给了线程。只要任何线程可能正在使用它,您需要确保td
继续存在。