我是多线程新手,我正在尝试实现一个简单的线程安全任务队列,其中每个线程都可以从中提取工作,直到没有更多任务为止。任何线程都不会进行任务排队。
出于测试目的,每个任务仅包含一个数字。
static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;
typedef struct Task{
int number;
}Task;
typedef struct Cell{
Task t;
struct Cell* next;
}Cell;
typedef struct TQueue{
struct Cell* head;
struct Cell* tail;
}TQueue;
int empty(TQueue *Queue)
return queue->head == queue->tail;
void startQueue(TQueue *queue){
queue->head = malloc(sizeof(Cell));
queue->tail = queue->head;
}
void enqueue(TQueue *queue, Task C){
queue->tail->next = malloc(sizeof(Cell));
queue->tail = queue->tail->next;
queue->tail->t = C;
queue->tail->next = NULL;
}
Task * dequeue(TQueue* queue){
pthread_mutex_lock( &task_mutex);
Task * t;
if(empty(queue)) t = NULL;
else{
struct Cell* p = queue->head;
queue->head = queue->head->next;
t = &queue->head->t;
free(p);
}
pthread_mutex_unlock( &task_mutex);
return t;
}
void * work( void* arg){
TQueue* queue = (TQueue *)arg;
Task* t = malloc(sizeof(Task));
for(t = dequeue(queue); t != NULL; t = dequeue(queue))
printf("%d ", t->number);
free(t);
pthread_exit(NULL);
return 0;
}
为了一个简单的测试,我在 main 上运行了这个:
int main(){
TQueue* queue = malloc(sizeof(TQueue));
startQueue(queue);
pthread_t threads[3];
Task t[3];
for(int i = 0; i < 3; i++){
t[i].number = i + 1;
enqueue(queue, t[i]);
}
for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue);
for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL);
return 0;
}
预期的输出是任意顺序的
1 2 3
,但有时它会打印一个包含奇怪数字的序列,例如1823219 2 3
。我无法检测到任何竞争条件或相关问题,所以我感谢任何帮助。
我发现了更多错误。
我已经注释了你的代码。我从你的第一篇文章和第二篇文章中汲取了一些内容。我已经修复了代码,显示了之前和之后的情况[请原谅无偿的风格清理]:
#include <stdio.h>
#include <pthread.h>
#include <malloc.h>
static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;
typedef struct Task {
int number;
} Task;
typedef struct Cell {
// NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets
// messy
#if 0
Task t;
#else
Task *t;
#endif
struct Cell *next;
} Cell;
typedef struct TQueue {
struct Cell *head;
struct Cell *tail;
} TQueue;
void
startQueue(TQueue *queue)
{
#if 0
queue->head = malloc(sizeof(Cell));
#else
queue->head = NULL;
#endif
queue->tail = NULL;
}
int
empty(TQueue *queue)
{
// NOTE/BUG: dequeue never touches tail, so this test is incorrect
#if 0
return (queue->head == queue->tail);
#else
return (queue->head == NULL);
#endif
}
void
enqueue(TQueue *queue, Task *t)
{
Cell *p;
pthread_mutex_lock(&task_mutex);
p = malloc(sizeof(Cell));
p->next = NULL;
p->t = t;
if (queue->tail == NULL) {
queue->tail = p;
queue->head = p;
}
else {
queue->tail->next = p;
queue->tail = p;
}
pthread_mutex_unlock(&task_mutex);
}
Task *
dequeue(TQueue *queue)
{
Task *t;
pthread_mutex_lock(&task_mutex);
if (empty(queue))
t = NULL;
else {
Cell *p = queue->head;
if (p == queue->tail)
queue->tail = NULL;
queue->head = p->next;
// NOTE/BUG: this is setting t to the second element in the list,
// not the first
// NOTE/BUG: this is also undefined behavior, in original code (with
// original struct definition), because what t points to _does_ get
// freed before return
#if 0
t = &queue->head->t;
#else
t = p->t;
#endif
free(p);
}
pthread_mutex_unlock(&task_mutex);
return t;
}
void *
work(void *arg)
{
TQueue *queue = (TQueue *) arg;
// NOTE/BUG: this gets orphaned on the first call to dequeue
#if 0
Task *t = malloc(sizeof(Task));
#else
Task *t;
#endif
for (t = dequeue(queue); t != NULL; t = dequeue(queue))
printf("%d ", t->number);
// NOTE/BUG: this frees some cell allocated in main -- not what we want
#if 0
free(t);
#endif
pthread_exit(NULL);
return 0;
}
// For a simple test i runned this on main:
int
main()
{
TQueue *queue = malloc(sizeof(TQueue));
startQueue(queue);
pthread_t threads[3];
Task t[3];
for (int i = 0; i < 3; i++) {
t[i].number = i + 1;
#if 0
enqueue(queue, t);
#else
enqueue(queue, &t[i]);
#endif
}
for (int i = 0; i < 3; i++)
pthread_create(&threads[i], NULL, work, (void *) queue);
for (int i = 0; i < 3; i++)
pthread_join(threads[i], NULL);
return 0;
}
更新:
线程是否同时执行任务?我一直在使用 htop 测试 CPU 使用率,但我只能最大限度地提高四个核心中单个核心的使用率。
需要记住的一些事情。
htop
在运行时间如此短的程序上可能不会显示太多。即使有 10,000 个队列条目,该程序也会在 20 毫秒内执行。
最好让程序自己打印信息[见下文]。请注意,
printf
对 stdin
执行线程锁定,因此它可能有助于程序的“串行”性质。它还对程序的执行时间贡献了显着(即printf
比dequeue
慢得多)
此外,一个线程(即第一个线程)可以独占队列并在其他线程有机会运行之前耗尽所有条目。 操作系统可以[自由]在单个内核上调度所有线程。然后它可能会稍后“迁移”它们(例如在一秒钟左右)。
我增强了程序,在输出打印中包含一些计时信息,这可能有助于显示更多您想看到的内容。此外,我还添加了命令行选项来控制线程数和排队项目数。这与我为自己的一些程序所做的类似。将程序输出转移到日志文件并检查它。在多次运行中尝试不同的选项
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <malloc.h>
#include <time.h>
int opt_n; // suppress thread output
int opt_T; // number of threads
int opt_Q; // number of queue items
static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;
double tvzero;
typedef struct Task {
int number;
} Task;
typedef struct Cell {
Task *t;
struct Cell *next;
} Cell;
typedef struct TQueue {
struct Cell *head;
struct Cell *tail;
} TQueue;
typedef struct Thread {
pthread_t tid;
int xid;
TQueue *queue;
} Thread;
double
tvgetf(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_REALTIME,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
sec -= tvzero;
return sec;
}
void
startQueue(TQueue *queue)
{
queue->head = NULL;
queue->tail = NULL;
}
int
empty(TQueue *queue)
{
return (queue->head == NULL);
}
void
enqueue(TQueue *queue, Task *t)
{
Cell *p;
pthread_mutex_lock(&task_mutex);
p = malloc(sizeof(Cell));
p->next = NULL;
p->t = t;
if (queue->tail == NULL) {
queue->tail = p;
queue->head = p;
}
else {
queue->tail->next = p;
queue->tail = p;
}
pthread_mutex_unlock(&task_mutex);
}
Task *
dequeue(TQueue *queue)
{
Task *t;
pthread_mutex_lock(&task_mutex);
if (empty(queue))
t = NULL;
else {
Cell *p = queue->head;
if (p == queue->tail)
queue->tail = NULL;
queue->head = p->next;
t = p->t;
free(p);
}
pthread_mutex_unlock(&task_mutex);
return t;
}
void *
work(void *arg)
{
Thread *tskcur = arg;
TQueue *queue = tskcur->queue;
Task *t;
double tvbef;
double tvaft;
while (1) {
tvbef = tvgetf();
t = dequeue(queue);
tvaft = tvgetf();
if (t == NULL)
break;
if (! opt_n)
printf("[%.9f/%.9f %5.5d] %d\n",
tvbef,tvaft - tvbef,tskcur->xid,t->number);
}
return (void *) 0;
}
// For a simple test i runned this on main:
int
main(int argc,char **argv)
{
char *cp;
TQueue *queue;
Task *t;
Thread *tsk;
--argc;
++argv;
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
switch (cp[1]) {
case 'n': // suppress thread output
opt_n = 1;
break;
case 'Q': // number of queue items
opt_Q = atoi(cp + 2);
break;
case 'T': // number of threads
opt_T = atoi(cp + 2);
break;
default:
break;
}
}
tvzero = tvgetf();
queue = malloc(sizeof(TQueue));
startQueue(queue);
if (opt_T == 0)
opt_T = 16;
Thread threads[opt_T];
if (opt_Q == 0)
opt_Q = 10000;
t = malloc(sizeof(Task) * opt_Q);
for (int i = 0; i < opt_Q; i++) {
t[i].number = i + 1;
enqueue(queue, &t[i]);
}
for (int i = 0; i < opt_T; i++) {
tsk = &threads[i];
tsk->xid = i + 1;
tsk->queue = queue;
pthread_create(&tsk->tid, NULL, work, tsk);
}
for (int i = 0; i < opt_T; i++) {
tsk = &threads[i];
pthread_join(tsk->tid, NULL);
}
printf("TOTAL: %.9f\n",tvgetf());
free(t);
return 0;
}
此外,一个线程(即第一个线程)可以独占队列并在其他线程有机会运行之前耗尽所有条目。”在这种情况下可以做什么?
一些事情。
pthread_create
需要一点时间,允许线程 1 运行,而其他线程仍在创建中。改善这种情况的一种方法是创建所有线程,每个线程设置一个“我正在运行”标志(在其线程控制块中)。主线程等待所有线程设置此标志。然后,主线程设置一个全局易失性“you_may_now_all_run”标志,每个线程在进入其主线程循环之前旋转该标志。根据我的经验,它们都在彼此的微秒内开始运行[或更好]。
我没有在下面更新的代码中实现这一点,所以你可以自己尝试一下[以及
nanosleep
]。
互斥体总体上相当公平(至少在 Linux 下),因为阻塞的线程将排队等待互斥体。正如我在评论中提到的,也可以使用
nanosleep
,但这[在某种程度上]违背了目的,因为线程会减慢速度。
线程饥饿的解药是“公平”。正如我所提到的,有一个复杂的算法可以实现无需等待的公平性。这是Kogan/Petrank算法:http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf
这确实有点复杂/高级,所以买者自负.. . 但是,妥协可能是票证锁:
https://en.wikipedia.org/wiki/Ticket_lock我又重新修改了程序。它具有池分配、票证与互斥锁以及日志条目延迟打印等选项。它还交叉检查线程之间的结果,以确保它们没有重复的条目。
当然,这一切的关键是准确、高精度的记录(也就是说,如果你不能测量它,你就无法调整它)。
例如,人们会认为在
free
中执行
dequeue
会比简单地将 Cell 释放到可重用池(类似于平板分配器)慢,但是,性能提升并没有预期的那么好。这可能是 glibc 的 malloc/free
速度非常快 [这就是他们声称]。 这些不同的版本应该可以让您了解如何构建自己的性能测量套件。
无论如何,这是代码:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <malloc.h>
#include <errno.h>
#include <string.h>
#include <time.h>
int opt_p; // print thread output immediately
int opt_T; // number of threads
int opt_Q; // number of queue items
int opt_L; // use ticket lock
int opt_M; // use fast cell alloc/free
typedef unsigned char byte;
typedef unsigned int u32;
#define sysfault(_fmt...) \
do { \
fprintf(stderr,_fmt); \
exit(1); \
} while (0)
// lock control
typedef struct AnyLock {
pthread_mutex_t mutex; // standard mutex
volatile u32 seqreq; // ticket lock request
volatile u32 seqacq; // ticket lock grant
} AnyLock;
// work value
typedef struct Task {
union {
struct Task *next;
int number;
};
} Task;
// queue item
typedef struct Cell {
struct Cell *next;
Task *t;
} Cell;
// queue control
typedef struct TQueue {
struct Cell *head;
struct Cell *tail;
} TQueue;
// thread log entry
typedef struct Log {
double tvbef;
double tvaft;
int number;
} Log;
#define BTVOFF(_off) \
((_off) >> 3)
#define BTVMSK(_off) \
(1u << ((_off) & 0x07))
#define BTVLEN(_len) \
((_len) + 7) >> 3
// thread control
typedef struct Thread {
pthread_t tid;
int xid;
TQueue *queue;
Log *log;
byte *bitv;
} Thread;
static inline byte
btvset(byte *bitv,long off)
{
u32 msk;
byte oval;
bitv += BTVOFF(off);
msk = BTVMSK(off);
oval = *bitv & msk;
*bitv |= msk;
return oval;
}
AnyLock task_mutex;
AnyLock print_mutex;
double tvzero;
Cell *cellpool; // free pool of cells
long bitvlen;
#define BARRIER \
__asm__ __volatile__("" ::: "memory")
// virtual function pointers
Cell *(*cellnew)(void);
void (*cellfree)(Cell *);
void (*lock_acquire)(AnyLock *lock);
void (*lock_release)(AnyLock *lock);
double
tvgetf(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_REALTIME,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
sec -= tvzero;
return sec;
}
void *
xalloc(size_t cnt,size_t siz)
{
void *ptr;
ptr = calloc(cnt,siz);
if (ptr == NULL)
sysfault("xalloc: calloc failure -- %s\n",strerror(errno));
return ptr;
}
void
lock_wait_ticket(AnyLock *lock,u32 newval)
{
u32 oldval;
// wait for our ticket to come up
// NOTE: atomic_load is [probably] overkill here
while (1) {
#if 0
oldval = atomic_load(&lock->seqacq);
#else
oldval = lock->seqacq;
#endif
if (oldval == newval)
break;
}
}
void
lock_acquire_ticket(AnyLock *lock)
{
u32 oldval;
u32 newval;
int ok;
// acquire our ticket value
// NOTE: just use a garbage value for oldval -- the exchange will
// update it with the correct/latest value -- this saves a separate
// refetch within the loop
oldval = 0;
while (1) {
#if 0
BARRIER;
oldval = lock->seqreq;
#endif
newval = oldval + 1;
ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval);
if (ok)
break;
}
lock_wait_ticket(lock,newval);
}
void
lock_release_ticket(AnyLock *lock)
{
// NOTE: atomic_fetch_add is [probably] overkill, but leave it for now
#if 1
atomic_fetch_add(&lock->seqacq,1);
#else
lock->seqacq += 1;
#endif
}
void
lock_acquire_mutex(AnyLock *lock)
{
pthread_mutex_lock(&lock->mutex);
}
void
lock_release_mutex(AnyLock *lock)
{
pthread_mutex_unlock(&lock->mutex);
}
void
lock_init(AnyLock *lock)
{
switch (opt_L) {
case 1:
lock->seqreq = 0;
lock->seqacq = 1;
lock_acquire = lock_acquire_ticket;
lock_release = lock_release_ticket;
break;
default:
pthread_mutex_init(&lock->mutex,NULL);
lock_acquire = lock_acquire_mutex;
lock_release = lock_release_mutex;
break;
}
}
void
startQueue(TQueue *queue)
{
queue->head = NULL;
queue->tail = NULL;
}
int
empty(TQueue *queue)
{
return (queue->head == NULL);
}
// cellnew_pool -- allocate a queue entry
Cell *
cellnew_pool(void)
{
int cnt;
Cell *p;
Cell *pool;
while (1) {
// try for quick allocation
p = cellpool;
// bug out if we got it
if (p != NULL) {
cellpool = p->next;
break;
}
// go to the heap to replenish the pool
cnt = 1000;
p = xalloc(cnt,sizeof(Cell));
// link up the entries
pool = NULL;
for (; cnt > 0; --cnt, ++p) {
p->next = pool;
pool = p;
}
// put this "online"
cellpool = pool;
}
return p;
}
// cellfree_pool -- release a queue entry
void
cellfree_pool(Cell *p)
{
p->next = cellpool;
cellpool = p;
}
// cellnew_std -- allocate a queue entry
Cell *
cellnew_std(void)
{
Cell *p;
p = xalloc(1,sizeof(Cell));
return p;
}
// cellfree_std -- release a queue entry
void
cellfree_std(Cell *p)
{
free(p);
}
void
enqueue(TQueue *queue, Task *t)
{
Cell *p;
lock_acquire(&task_mutex);
p = cellnew();
p->next = NULL;
p->t = t;
if (queue->tail == NULL) {
queue->tail = p;
queue->head = p;
}
else {
queue->tail->next = p;
queue->tail = p;
}
lock_release(&task_mutex);
}
Task *
dequeue(TQueue *queue)
{
Task *t;
lock_acquire(&task_mutex);
if (empty(queue))
t = NULL;
else {
Cell *p = queue->head;
if (p == queue->tail)
queue->tail = NULL;
queue->head = p->next;
t = p->t;
cellfree(p);
}
lock_release(&task_mutex);
return t;
}
void *
work(void *arg)
{
Thread *tskcur = arg;
TQueue *queue = tskcur->queue;
Task *t;
Log *log;
long cnt;
int tprev;
byte *bitv;
double tvbeg;
double tvbef;
double tvaft;
log = tskcur->log;
bitv = tskcur->bitv;
tvbeg = tvgetf();
tprev = 0;
while (1) {
tvbef = tvgetf();
t = dequeue(queue);
tvaft = tvgetf();
if (t == NULL)
break;
// abort if we get a double entry
if (btvset(bitv,t->number))
sysfault("work: duplicate\n");
if (opt_p) {
printf("[%.9f/%.9f %5.5d] %d [%d]\n",
tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev);
tprev = t->number;
continue;
}
log->tvbef = tvbef;
log->tvaft = tvaft;
log->number = t->number;
++log;
}
if (! opt_p) {
tvaft = tvgetf();
cnt = log - tskcur->log;
log = tskcur->log;
lock_acquire(&print_mutex);
printf("\n");
printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ld\n",
tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt);
tprev = 0;
for (; cnt > 0; --cnt, ++log) {
printf("[%.9f/%.9f %5.5d] %d [%d]\n",
log->tvbef,log->tvaft - log->tvbef,tskcur->xid,
log->number,log->number - tprev);
tprev = log->number;
}
lock_release(&print_mutex);
}
return (void *) 0;
}
void
btvchk(Thread *tska,Thread *tskb)
{
byte *btva;
byte *btvb;
byte aval;
byte bval;
int idx;
printf("btvchk: %d ??? %d\n",tska->xid,tskb->xid);
btva = tska->bitv;
btvb = tskb->bitv;
// abort if we get overlapping entries between two threads
for (idx = 0; idx < bitvlen; ++idx) {
aval = btva[idx];
bval = btvb[idx];
if (aval & bval)
sysfault("btvchk: duplicate\n");
}
}
// For a simple test i runned this on main:
int
main(int argc,char **argv)
{
char *cp;
TQueue *queue;
Task *t;
Thread *tsk;
--argc;
++argv;
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
switch (cp[1]) {
case 'p': // print immediately
opt_p = 1;
break;
case 'Q': // number of queue items
opt_Q = atoi(cp + 2);
break;
case 'T': // number of threads
opt_T = atoi(cp + 2);
break;
case 'L':
opt_L = 1;
break;
case 'M':
opt_M = 1;
break;
default:
break;
}
}
printf("p=%d -- thread log is %s\n",opt_p,opt_p ? "immediate" : "deferred");
if (opt_T == 0)
opt_T = 16;
printf("T=%d (number of threads)\n",opt_T);
if (opt_Q == 0)
opt_Q = 1000000;
printf("Q=%d (number of items to enqueue)\n",opt_Q);
printf("L=%d -- lock is %s\n",opt_L,opt_L ? "ticket" : "mutex");
printf("M=%d -- queue item allocation is %s\n",
opt_M,opt_M ? "pooled" : "malloc/free");
tvzero = tvgetf();
lock_init(&task_mutex);
lock_init(&print_mutex);
// select queue item allocation strategy
switch (opt_M) {
case 1:
cellnew = cellnew_pool;
cellfree = cellfree_pool;
break;
default:
cellnew = cellnew_std;
cellfree = cellfree_std;
break;
}
queue = xalloc(1,sizeof(TQueue));
startQueue(queue);
Thread threads[opt_T];
// get byte length of bit vectors
bitvlen = BTVLEN(opt_Q + 1);
// allocate per-thread log buffers
for (int i = 0; i < opt_T; i++) {
tsk = &threads[i];
if (! opt_p)
tsk->log = xalloc(opt_Q,sizeof(Log));
tsk->bitv = xalloc(bitvlen,sizeof(byte));
}
// allocate "work to do"
t = xalloc(opt_Q,sizeof(Task));
// add to master queue
for (int i = 0; i < opt_Q; i++) {
t[i].number = i + 1;
enqueue(queue, &t[i]);
}
// fire up the threads
for (int i = 0; i < opt_T; i++) {
tsk = &threads[i];
tsk->xid = i + 1;
tsk->queue = queue;
pthread_create(&tsk->tid, NULL, work, tsk);
}
// wait for threads to complete
for (int i = 0; i < opt_T; i++) {
tsk = &threads[i];
pthread_join(tsk->tid, NULL);
}
// wait for threads to complete
for (int i = 0; i < opt_T; i++) {
for (int j = i + 1; j < opt_T; j++)
btvchk(&threads[i],&threads[j]);
}
printf("TOTAL: %.9f\n",tvgetf());
free(t);
return 0;
}