我正在创建一个多线程文件服务器。我创建了一个线程池来处理来自客户端的请求。这是我的线程池代码:
void *worker_call(void *thread_id){
printf("Initialized thread #%ld \n", (long)thread_id);
//pull in initialized global configs
extern pthread_mutex_t queue_m;
extern pthread_cond_t worker_c;
extern steque_t queue;
while(1){
if (pthread_mutex_lock(&queue_m) != 0){
fprintf(stderr, "An error occured while locking mutex in #%ld \n", (long)thread_id);
}
while(steque_isempty(&queue) == 1){
printf("thread #%ld - Going to sleep...\n", (long) thread_id);
pthread_cond_wait(&worker_c, &queue_m);
printf("thread #%ld - I'm waking up...\n", (long) thread_id);
}
int *work = steque_pop(&queue);
if (pthread_mutex_unlock(&queue_m) != 0){
fprintf(stderr, "An error occured while unlocking mutex in #%ld \n", (long)thread_id);
}
pthread_cond_broadcast(&worker_c);
sleep(1); //added to make sure that the other threads have a chance to wake up
printf("thread #%ld - what is the value of work: %d\n", (long) thread_id, *work);
// process_request(&(work->ctx), work->path, work->arg, (long)thread_id);
free(work);
}
return NULL;
}
为了测试我的线程池是否正确启动并且所有线程都在工作,我创建了以下测试:
int main(){
int nthreads = 6;
pthread_t threads[nthreads];
long thread_ids[nthreads];
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
pthread_attr_setscope(&thread_attr, PTHREAD_SCOPE_SYSTEM);
for(int t=0;t<nthreads;t++){
thread_ids[t] = t;
if( pthread_create(&threads[t],NULL, worker_call, (void *)thread_ids[t]) != 0 ) {
printf("An error occured while creating thread: %d\n", t);
}
}
pthread_attr_destroy(&thread_attr);
//init multi-threading configs
extern pthread_mutex_t queue_m;
pthread_mutexattr_t m_attr;
pthread_mutexattr_init(&m_attr);
pthread_mutexattr_settype(&m_attr, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init(&queue_m, NULL);
extern pthread_cond_t worker_c;
pthread_cond_init(&worker_c, NULL);
extern steque_t queue;
steque_init(&queue);
//create a simple queue with each item being an int.
//the goal is to simply remove the items from the queue.
for(int i = 0; i < 5; i++){
int *work = malloc(sizeof(int));
*work = i;
steque_enqueue(&queue, work);
}
printf("queue size after creating it: %d\n", steque_size(&queue));
pthread_cond_broadcast(&worker_c);
for(int t=0;t<nthreads;t++){
pthread_join(threads[t], NULL);
}
printf("All the threads finished processing\n");
return 0;
}
现在这段代码成功运行,但是,只有一个线程被唤醒,并且它完成了所有工作,如下面的输出所示:
Initialized thread #0
thread #0 - Going to sleep...
Initialized thread #1
thread #1 - Going to sleep...
Initialized thread #2
thread #2 - Going to sleep...
Initialized thread #3
thread #3 - Going to sleep...
Initialized thread #4
thread #4 - Going to sleep...
Initialized thread #5
thread #5 - Going to sleep...
queue size after creating it: 5
thread #5 - I'm waking up...
thread #5 - what is the value of work: 0
thread #5 - what is the value of work: 1
thread #5 - what is the value of work: 2
thread #5 - what is the value of work: 3
thread #5 - what is the value of work: 4
thread #5 - Going to sleep...
我的问题是,为什么其他线程没有醒来并从队列中获取项目?我尝试在发出 pthread_cond_broadcast 后添加一秒的延迟,以便为其他线程提供足够的时间来锁定互斥体,但该方法没有成功。有人看到我做错了什么吗?
当你说“成功跑步”时,你对成功的定义是什么?这里有几个令人担忧的竞争条件,我很惊讶这竟然有效。例如,worker_call() 在实际初始化之前访问队列、互斥锁和条件变量。
此外,当您最初在 main() 中将项目推入队列时,您应该将该代码包装在互斥体中,因为您的线程也在尝试访问该变量。
我的建议是修复这些竞争条件,然后更新你的结果。
此外,在worker_call中,worker在调用cond_wait之前检查队列是否为空,但在唤醒后不会检查队列是否为空。由于项目的数量可能少于工作人员,因此该函数将需要再次检查队列是否为空。