我正在尝试为 C 语言编写一个 ThreadPool 库;主要用于教育目的,但也可能用于以后的实际使用。 我有一个“有效”的实现,因为我可以将所需数量的线程转入等待状态,然后从任务队列中填充它们。
我添加的最后一个功能是,如果添加到队列的任务数量超出其容量,则允许任务队列动态扩展(使用
realloc
)。 你知道,通常的动态数组的东西。
这与以下
main
功能配合得很好:
int main(void) {
srand((unsigned int)time(0));
sdm_threadpool_t *pool = sdm_threadpool_create(NUM_THREADS, DEFAULT_QUEUE_LENGTH);
WrapperArg arg[NUM_TASKS] = {0};
for (int i = 0; i < NUM_TASKS; i++) {
arg[i].input = i;
}
for (int i = 0; i < NUM_TASKS; i++) {
// printf("Submitting job %d...\n", i);
sdm_threadpool_add(pool, function_wrapper, &(arg[i]));
}
sdm_threadpool_join(pool);
for (size_t i=0; i<NUM_TASKS; i++) {
printf("arg[%zu].input = %d :: arg[%zu].output = %d\n",
i, arg[i].input, i, arg[i].output);
}
return 0;
}
注意在
printf
调用之前注释掉的 sdm_threadpool_add
。 如果我删除注释并允许打印,我会得到带有 realloc(): invalid next size
的核心转储。 或者,我应该说,我经常得到核心转储,但并非总是如此。
通过浏览,我怀疑我在某个地方的指针做错了,从而破坏了堆,但我不知道在哪里。 坦白说,我什至不知道如何去调查这个问题以缩小问题范围。
如果您能提供任何能够指出我的错误或帮助我自己调查问题的建议,我们将不胜感激。
谢谢
因为我怀疑它会有用,所以我还在我的小库中包含了我使用的各种函数的代码。
#define _XOPEN_SOURCE 500
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "cthreadpool.h"
void *sdm_threadpool_thread(void *threadpool);
sdm_threadpool_t *sdm_threadpool_create(size_t num_threads, size_t queue_capacity) {
sdm_threadpool_t *pool = NULL;
pool = malloc(sizeof(sdm_threadpool_t));
if (pool == NULL) {
fprintf(stderr, "Memory allocation problem. Quiting\n");
exit(1);
}
memset(pool, 0, sizeof(sdm_threadpool_t));
pool->threads = malloc(sizeof(pthread_t) * num_threads);
if (pool->threads == NULL) {
fprintf(stderr, "Memory allocation problem. Quiting\n");
exit(1);
}
memset(pool->threads, 0, pool->num_threads * sizeof(pool->threads[0]));
pool->task_queue = malloc(sizeof(sdm_threadpool_task_t) * queue_capacity);
if (pool->task_queue == NULL) {
fprintf(stderr, "Memory allocation problem. Quiting\n");
exit(1);
}
memset(pool->task_queue, 0, pool->num_threads * sizeof(pool->task_queue[0]));
pool->num_threads = num_threads;
pool->queue_capacity = queue_capacity;
pthread_mutex_init(&(pool->lock), NULL);
pthread_cond_init(&(pool->notify), NULL);
for (size_t i = 0; i < num_threads; i++) {
pthread_create(&(pool->threads[i]), NULL, &sdm_threadpool_thread, (void*)pool);
}
return pool;
}
int sdm_threadpool_add(sdm_threadpool_t *pool, void (*function)(void *), void *arg) {
pthread_mutex_lock(&(pool->lock));
while (pool->waiting_in_queue >= pool->queue_capacity) {
pool->queue_capacity *= 2;
size_t new_size = sizeof(sdm_threadpool_task_t) * pool->queue_capacity;
printf("Extending queue to %zu bytes\n", new_size);
sdm_threadpool_task_t *new_task_queue = realloc(pool->task_queue, new_size);
if (new_task_queue == NULL) {
fprintf(stderr, "Memory allocation problem. Quiting\n");
exit(1);
}
pool->task_queue = new_task_queue;
}
pool->task_queue[pool->queue_length].function = function;
pool->task_queue[pool->queue_length].arg = arg;
pool->queue_length++;
pool->waiting_in_queue++;
pthread_cond_broadcast(&(pool->notify));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
void *sdm_threadpool_thread(void *threadpool) {
sdm_threadpool_t *pool = (sdm_threadpool_t *)threadpool;
while (1) {
pthread_mutex_lock(&(pool->lock));
while ((pool->waiting_in_queue == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
void (*function)(void *) = pool->task_queue[pool->next_in_queue].function;
void *arg = pool->task_queue[pool->next_in_queue].arg;
pool->next_in_queue++;
pool->waiting_in_queue--;
pthread_mutex_unlock(&(pool->lock));
function(arg);
}
return NULL;
}
void sdm_threadpool_join(sdm_threadpool_t *pool) {
while (pool->waiting_in_queue > 0) {
usleep(1000);
}
sdm_threadpool_destroy(pool);
}
void sdm_threadpool_destroy(sdm_threadpool_t *pool) {
pthread_mutex_lock(&(pool->lock));
pool->shutdown = true;
pthread_cond_broadcast(&(pool->notify));
pthread_mutex_unlock(&(pool->lock));
for (size_t i = 0; i < pool->num_threads; i++) {
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
free(pool->task_queue);
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
free(pool);
}
您正在使用和修改
pool->waiting_in_queue
,但是您在哪里为其分配初始值?
此外,如果
sdm_threadpool
和其他类似类型是自定义结构,请包含这些结构的定义。