最近,我一直在开发一种工具,该工具必须重复启动可执行文件并将大量数据输入其中并检查结果。由于此过程的一方面是计算密集型,因此我还使用 pthreads 将其设为多线程。
一旦我的单线程版本开始工作,我尝试了多个线程,很快发现代码被卡住了,而且似乎没有什么充分的理由什么也不做。我做了一些 GDB 调查并发现了以下线程:
我从中总结出以下要点:
据我所知,这些似乎都不适用于我的情况。
请参阅下面我的最小示例。不幸的是,即使作为 MCV 示例,它仍然很大!高级描述:
sha256sum
,并使用简单的 RNG 填充缓冲区、写入子进程、关闭写入管道、读取结果并调用 wait。可以使用线程计数作为第一个参数来运行。当使用多个线程时,我发现代码很快就会卡住,并且这种情况可能以几种奇怪的方式发生。例如,一个或多个工作线程可能会陷入对
read()
、close()
或 wait()
的调用中(第 200 行);或者所有线程都可能在门口陷入僵局(我确信这个事件不应该发生)。
没有任何事情会被认为是错误的;通过的管道 FD 符合预期。在继续之前,我使用 GDB 使用
start <n>
和 set scheduler-locking on
检查了这一点。
请注意,Valgrind 的 DRD 或 Helgrind 工具未发现此代码有任何问题/警告。有趣的是,代码在它们下以多线程方式工作(我认为是因为它会减慢速度或包装调用?)。
请问有人可以揭开这里发生的事情的神秘面纱吗?
更新
经过更多调查,我认为存在竞争条件,即分叉确实复制了我不打算复制的对象,但我不确定如何证明它。
请参阅以下来自
strace
的修剪输出。
家长:
pipe2([7, 8], 0) = 0
pipe2([9, 10], 0) = 0
write(1, "Thread 1 | pipe_in = {7, 8}, pi"..., 49) = 49
futex(0x7f82946ee0, FUTEX_WAIT_PRIVATE, 2, NULL) = 0
clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f81f8f250) = 1925
futex(0x7f82946ee0, FUTEX_WAKE_PRIVATE, 1) = 0
close(7) = 0
close(10) = 0
write(8, "lY\210<\333\205\210^~\1\26\201\305:s\264`m\360\f?\301\230^\222\335V\306\311_b\216"..., 65536) = 65536
...
write(8, "l\231\336n\333\305i\212~A\6\273\305z&\237`\255\212\243?\1\356\271\222\35+\241\311\237\251|"..., 57600) = 57600
close(8) = 0
read(9, <unfinished ...>) = ?
+++ killed by SIGINT +++
sha256sum
孩子(PID 1925):
read(0, "l\271\236\363\333e\353\364~a[i\305\329\204`\315<Q?\241\271\244\222=BO\311?\226\252"..., 32768) = 32768
read(0, "lY\t\"\333\205\25l~\1?_\305:\210R`m\1r?\301uF\222\335\217\232\311_G\v"..., 32768) = 32768
read(0, "l\371s\240\333\245?\363~\241\"%\305Z\327\260`\r\306\342?\3411\370\222}\335\265\311\177\370\373"..., 32768) = 32768
read(0, "l\231\336n\333\305i\212~A\6\273\305z&\237`\255\212\243?\1\356\271\222\35+\241\311\237\251|"..., 32768) = 32768
read(0, "l9I\215\333\345\2231~\341\351 \305\232u\35`MO\264?!\252\213\222\275x\\\311\277Z\215"..., 32768) = 24832
read(0, 0x55a3541510, 4096) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGINT {si_signo=SIGINT, si_code=SI_KERNEL} ---
+++ killed by SIGINT +++
因此,父级关闭了子级标准输入的最终 FD,但是
read()
调用不会像应有的那样返回零,除非我弄错了。每一端的写入和读取大小不同,但这似乎是一个转移注意力的事情。
我认为这只能是由于在分叉上复制 FD 的不确定性发生?
代码:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include <sys/wait.h>
#include <pthread.h>
#include <sched.h>
#include <assert.h>
#include <stdarg.h>
#define MIN(a, b) (((a) > (b)) ? (b) : (a))
#define LOG_PERROR(func, failCond) \
do { \
if (failCond) { \
perror(#func); \
abort(); \
} \
} \
while (0)
#define LOG_PTHREAD(funcCall) \
do { \
const int res = (funcCall); \
if (res != 0) { \
fprintf(stderr, #funcCall " failed with ret code %" PRId32 "\n", \
res); \
abort(); \
} \
} \
while (0)
typedef struct
{
uint32_t thread_count;
pthread_mutex_t print_mutex;
pthread_mutex_t master_mutex;
pthread_cond_t master_cond;
/* Guarded by `master_mutex` */
uint32_t threads_finished_work;
pthread_mutex_t gate_mutex;
pthread_cond_t gate_cond;
/* Guarded by `gate_mutex` */
uint32_t gate_waiting;
int running;
} context_t;
typedef struct
{
context_t* ctx;
uint32_t thread_id;
uint32_t seed;
char in_buf[1024];
} thread_data_t;
static void printf_guarded(context_t* ctx, const char* fmt, ...)
{
va_list ap;
LOG_PTHREAD(pthread_mutex_lock(&ctx->print_mutex));
va_start(ap, fmt);
vprintf(fmt, ap);
va_end(ap);
LOG_PTHREAD(pthread_mutex_unlock(&ctx->print_mutex));
}
static void do_work(thread_data_t* const td)
{
const int32_t count = 100000000;
const uint32_t lcg_a = UINT32_C(1664525);
const uint32_t lcg_c = UINT32_C(1013904223);
char out_buf[65536];
char* args[2] = {"sha256sum", NULL};
int pipe_in[2];
int pipe_out[2];
pid_t child;
pid_t wait_res;
ssize_t cumul_bytes = 0;
ssize_t this_bytes;
ssize_t res;
assert((sizeof(count) % 4) == 0);
assert((sizeof(out_buf) % 4) == 0);
/*
printf_guarded(ctx, "Thread %2" PRIu32 " | seed: %10" PRIu32 "\n",
td->thread_id, td->seed);
*/
LOG_PERROR(pipe, pipe(pipe_in) == -1);
LOG_PERROR(pipe, pipe(pipe_out) == -1);
printf_guarded(td->ctx, "Thread %2" PRIu32
" | pipe_in = {%d, %d}, pipe_out = {%d, %d}\n",
td->thread_id, pipe_in[0], pipe_in[1], pipe_out[0], pipe_out[1]);
child = fork();
LOG_PERROR(fork, child == -1);
if (child == 0)
{
LOG_PERROR(dup2, dup2(pipe_in[0], STDIN_FILENO) == -1);
LOG_PERROR(dup2, dup2(pipe_out[1], STDOUT_FILENO) == -1);
LOG_PERROR(close, close(pipe_in[0]) == -1);
LOG_PERROR(close, close(pipe_in[1]) == -1);
LOG_PERROR(close, close(pipe_out[0]) == -1);
LOG_PERROR(close, close(pipe_out[1]) == -1);
LOG_PERROR(execvp, execvp(args[0], args) == -1);
/* Never returns */
perror("execvp");
abort();
}
LOG_PERROR(close, close(pipe_in[0]) == -1);
LOG_PERROR(close, close(pipe_out[1]) == -1);
while (cumul_bytes < count)
{
int32_t i;
const ssize_t to_write_bytes =
MIN(count - cumul_bytes, (ssize_t) sizeof(out_buf));
/*
printf_guarded(ctx, "Thread %2" PRIu32 " | cumul_bytes = %" PRId64 "\n",
td->thread_id, cumul_bytes);
*/
assert((to_write_bytes % 4) == 0);
for (i = 0; i < (to_write_bytes / 4); i++)
{
const int32_t strided_idx = i * 4;
uint32_t* casted = (uint32_t*) &out_buf[strided_idx];
td->seed = lcg_a * td->seed + lcg_c;
*casted = td->seed;
}
this_bytes = 0;
while (this_bytes < to_write_bytes)
{
/*
printf_guarded(ctx, "Thread %2" PRIu32 " | write(..., [%"
PRId64 "], %" PRId64 ")\n",
td->thread_id, this_bytes, to_write_bytes - this_bytes);
*/
res = write(
pipe_in[1],
&out_buf[this_bytes],
to_write_bytes - this_bytes);
LOG_PERROR(write, res == -1);
this_bytes += res;
}
cumul_bytes += this_bytes;
}
/* Finished writing; close the write side of the in pipe. */
LOG_PERROR(close, close(pipe_in[1]) == -1);
this_bytes = 0;
do
{
/*
printf_guarded(ctx, "Thread %2" PRIu32 " | read(..., [%"
PRId64 "], %" PRId64 ")\n",
td->thread_id, this_bytes, sizeof(td->in_buf) - this_bytes);
*/
res = read(
pipe_out[0],
&td->in_buf[this_bytes],
sizeof(td->in_buf) - this_bytes);
LOG_PERROR(read, res == -1);
if (this_bytes >= (ssize_t) sizeof(td->in_buf))
{
fprintf(stderr, "Soemthing went wrong!\n");
abort();
}
this_bytes += res;
}
while (res > 0);
LOG_PERROR(close, close(pipe_out[0]) == -1);
wait_res = waitpid(child, NULL, 0);
LOG_PERROR(waitpid, wait_res == -1);
if (wait_res != child)
{
fprintf(stderr, "waitpid returned unexpected result (%"
PRId32 ")\n", wait_res);
abort();
}
td->in_buf[this_bytes] = 0;
}
static void* thread_entry(void* arg)
{
thread_data_t* const td = (thread_data_t*) arg;
context_t* const ctx = td->ctx;
while (1)
{
/* Stop all threads at gate */
LOG_PTHREAD(pthread_mutex_lock(&ctx->gate_mutex));
ctx->gate_waiting++;
printf_guarded(ctx, "Thread %2" PRIu32 " | gate, WAITING\n",
td->thread_id);
LOG_PTHREAD(pthread_cond_wait(&ctx->gate_cond, &ctx->gate_mutex));
printf_guarded(ctx, "Thread %2" PRIu32 " | gate, RELEASED\n",
td->thread_id);
LOG_PTHREAD(pthread_mutex_unlock(&ctx->gate_mutex));
/* All threads are now released from the gate */
if (!ctx->running)
break;
do_work(td);
LOG_PTHREAD(pthread_mutex_lock(&ctx->master_mutex));
ctx->threads_finished_work++;
assert(ctx->threads_finished_work <= ctx->thread_count);
/* Last thread to finish should signal the master */
if (ctx->threads_finished_work == ctx->thread_count)
{
printf_guarded(ctx, "Thread %2" PRIu32
" | finished, signalling master\n",
td->thread_id);
LOG_PTHREAD(pthread_cond_signal(&ctx->master_cond));
}
else
{
printf_guarded(ctx, "Thread %2" PRIu32 " | finished\n", td->thread_id);
}
LOG_PTHREAD(pthread_mutex_unlock(&ctx->master_mutex));
}
return NULL;
}
int main(int argc, char** argv)
{
context_t ctx;
pthread_t* threads;
thread_data_t* tds;
uint32_t i, j;
if (argc > 1)
ctx.thread_count = strtoul(argv[1], NULL, 10);
else
ctx.thread_count = 1;
LOG_PTHREAD(pthread_mutex_init(&ctx.print_mutex, NULL));
LOG_PTHREAD(pthread_mutex_init(&ctx.master_mutex, NULL));
LOG_PTHREAD(pthread_cond_init(&ctx.master_cond, NULL));
ctx.threads_finished_work = 0;
LOG_PTHREAD(pthread_mutex_init(&ctx.gate_mutex, NULL));
LOG_PTHREAD(pthread_cond_init(&ctx.gate_cond, NULL));
ctx.gate_waiting = 0;
ctx.running = 1;
threads = malloc(sizeof(pthread_t) * ctx.thread_count);
tds = malloc(sizeof(thread_data_t) * ctx.thread_count);
for (i = 0; i < ctx.thread_count; i++)
{
tds[i].ctx = &ctx;
tds[i].thread_id = i;
tds[i].seed = i;
LOG_PTHREAD(pthread_create(&threads[i], NULL, thread_entry, &tds[i]));
}
for (i = 0; ctx.running && (i < 10); i++)
{
int all_threads_at_gate = 0;
while (!all_threads_at_gate)
{
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
assert(ctx.gate_waiting <= ctx.thread_count);
if (ctx.gate_waiting == ctx.thread_count)
all_threads_at_gate = 1;
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
if (!all_threads_at_gate)
sched_yield();
}
printf_guarded(&ctx, "====================\nStart of run %" PRIu32
"\nThread master has seen all threads at gate - "
"signalling them\n", i);
ctx.gate_waiting = 0;
/* The threads are now good to go. */
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
LOG_PTHREAD(pthread_cond_broadcast(&ctx.gate_cond));
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
/* Wait for threads to do the work... */
LOG_PTHREAD(pthread_mutex_lock(&ctx.master_mutex));
assert(ctx.threads_finished_work <= ctx.thread_count);
if (ctx.threads_finished_work < ctx.thread_count)
{
/* Master beat the threads to this point (very likely). */
printf_guarded(&ctx, "Thread master is waiting for signal...\n");
LOG_PTHREAD(pthread_cond_wait(
&ctx.master_cond, &ctx.master_mutex));
}
else
{
printf_guarded(&ctx,
"Thread master didn't need to wait for signal!\n");
}
printf_guarded(&ctx, "Thread master is proceeding\n");
ctx.threads_finished_work = 0;
LOG_PTHREAD(pthread_mutex_unlock(&ctx.master_mutex));
for (j = 0; j < ctx.thread_count; j++)
{
printf_guarded(&ctx, "Thread %2" PRIu32 " | got output: %s",
tds[j].thread_id, tds[j].in_buf);
}
}
ctx.running = 0;
printf_guarded(&ctx, "Signalling threads...\n");
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
assert(ctx.gate_waiting == ctx.thread_count);
ctx.running = 0;
pthread_cond_broadcast(&ctx.gate_cond);
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
printf_guarded(&ctx, "Joining threads...\n");
for (i = 0; i < ctx.thread_count; i++)
{
LOG_PTHREAD(pthread_join(threads[i], NULL));
printf_guarded(&ctx, "Thread %2" PRIu32 " | final seed: %10"
PRIu32 "\n", i, tds[i].seed);
}
printf_guarded(&ctx, "Now freeing...\n");
free(tds);
free(threads);
LOG_PTHREAD(pthread_mutex_destroy(&ctx.gate_mutex));
LOG_PTHREAD(pthread_cond_destroy(&ctx.gate_cond));
LOG_PTHREAD(pthread_mutex_destroy(&ctx.master_mutex));
LOG_PTHREAD(pthread_cond_destroy(&ctx.master_cond));
LOG_PTHREAD(pthread_mutex_destroy(&ctx.print_mutex));
return 0;
}
对于这个复杂的问题,我深表歉意。我一直在思考想法并找到了答案(或者至少是大部分答案)。
问题:管道显然在进程中的所有线程(及其子线程)之间共享。
考虑到这个不稳定的假设,即该软件是为创建子进程而编写的,这些子进程会摄取任意数据直到文件末尾,然后打印其输出,不难看出杂散管道文件描述符如何意味着关闭写入端
sha256sum
stdin 管道的 不指示 EOF。
证明这一点的代码如下。它打印出任何不属于它只能访问的四个管道端之一的管道 FD。当使用一个线程运行它时,没有任何差异输出,但使用多个线程时......:
Thread 0 | gate, WAITING
Thread 1 | gate, WAITING
====================
Start of run 0
Thread master has seen all threads at gate - signalling them
Thread master is waiting for signal...
Thread 1 | gate, RELEASED
Thread 0 | gate, RELEASED
Thread 1 | pipe_in = {3, 4}, pipe_out = {5, 6}
Thread 0 | pipe_in = {7, 8}, pipe_out = {9, 10}
Thread 1 | my PID: 1678
Thread 1 | PARENT | unexpected pipe, fd = 7, inode = 8846
Thread 1 | PARENT | unexpected pipe, fd = 8, inode = 8846
Thread 1 | PARENT | unexpected pipe, fd = 9, inode = 8847
Thread 1 | PARENT | unexpected pipe, fd = 10, inode = 8847
Thread 1 | CHILD | unexpected pipe, fd = 7, inode = 8846
Thread 1 | CHILD | unexpected pipe, fd = 8, inode = 8846
Thread 1 | CHILD | unexpected pipe, fd = 9, inode = 8847
Thread 0 | my PID: 1678
Thread 1 | CHILD | unexpected pipe, fd = 10, inode = 8847
Thread 0 | PARENT | unexpected pipe, fd = 3, inode = 9492
Thread 0 | PARENT | unexpected pipe, fd = 4, inode = 9492
Thread 0 | PARENT | unexpected pipe, fd = 5, inode = 9493
Thread 0 | PARENT | unexpected pipe, fd = 6, inode = 9493
Thread 0 | CHILD | unexpected pipe, fd = 3, inode = 9492
Thread 0 | CHILD | unexpected pipe, fd = 4, inode = 9492
Thread 0 | CHILD | unexpected pipe, fd = 5, inode = 9493
Thread 0 | CHILD | unexpected pipe, fd = 6, inode = 9493
^C
这可以通过确保孩子们在调用
execvp()
之前关闭所有无关的管道 FD 来解决。
我关心的不仅仅是管道,比如继承的 futexes。我不明白 futexes 是如何工作的,但是
fork()
上也可能存在“继承”问题。原因是,在极少数情况下,我目睹所有线程成功执行运行但卡在门上,这应该是不可能的,除非门互斥锁在锁定时被继承。
如有任何反馈,我们将不胜感激。
完整代码:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <stdint.h>
#include <stdarg.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <pthread.h>
#include <sched.h>
#include <assert.h>
#define MIN(a, b) (((a) > (b)) ? (b) : (a))
#define LOG_PERROR(func, failCond) \
do { \
if (failCond) { \
perror(#func); \
abort(); \
} \
} \
while (0)
#define LOG_PTHREAD(funcCall) \
do { \
const int res = (funcCall); \
if (res != 0) { \
fprintf(stderr, #funcCall " failed with ret code %" PRId32 "\n", \
res); \
abort(); \
} \
} \
while (0)
typedef struct
{
uint32_t thread_count;
pthread_mutex_t print_mutex;
pthread_mutex_t master_mutex;
pthread_cond_t master_cond;
/* Guarded by `master_mutex` */
uint32_t threads_finished_work;
pthread_mutex_t gate_mutex;
pthread_cond_t gate_cond;
/* Guarded by `gate_mutex` */
uint32_t gate_waiting;
int running;
} context_t;
typedef struct
{
context_t* ctx;
uint32_t thread_id;
uint32_t seed;
char in_buf[1024];
} thread_data_t;
static void printf_guarded(context_t* ctx, const char* fmt, ...)
{
va_list ap;
LOG_PTHREAD(pthread_mutex_lock(&ctx->print_mutex));
va_start(ap, fmt);
vprintf(fmt, ap);
va_end(ap);
LOG_PTHREAD(pthread_mutex_unlock(&ctx->print_mutex));
}
static void do_work(thread_data_t* const td)
{
const int32_t count = 100000000;
const uint32_t lcg_a = UINT32_C(1664525);
const uint32_t lcg_c = UINT32_C(1013904223);
char out_buf[65536];
char* args[2] = {"sha256sum", NULL};
int pipe_in[2];
int pipe_out[2];
pid_t child;
pid_t wait_res;
ssize_t cumul_bytes = 0;
ssize_t this_bytes;
ssize_t res;
assert((sizeof(count) % 4) == 0);
assert((sizeof(out_buf) % 4) == 0);
/*
printf_guarded(ctx, "Thread %2" PRIu32 " | seed: %10" PRIu32 "\n",
td->thread_id, td->seed);
*/
LOG_PERROR(pipe, pipe(pipe_in) == -1);
LOG_PERROR(pipe, pipe(pipe_out) == -1);
printf_guarded(td->ctx, "Thread %2" PRIu32
" | pipe_in = {%d, %d}, pipe_out = {%d, %d}\n",
td->thread_id, pipe_in[0], pipe_in[1], pipe_out[0], pipe_out[1]);
child = fork();
LOG_PERROR(fork, child == -1);
if (child > 0)
{
int i;
struct stat statbuf;
printf("Thread %2" PRIu32 " | my PID: %d\n", td->thread_id, getpid());
for (i = 0; i < getdtablesize(); i++)
{
if ((i != pipe_in[0]) &&
(i != pipe_in[1]) &&
(i != pipe_out[0]) &&
(i != pipe_out[1]))
{
if (fcntl(i, F_GETFD) != -1)
{
if (fstat(i, &statbuf) != -1)
{
if (S_ISFIFO(statbuf.st_mode))
{
printf("Thread %2" PRIu32 " | PARENT | "
"unexpected pipe, fd = %d, inode = %" PRIu64 "\n",
td->thread_id, i, statbuf.st_ino);
}
}
}
}
}
}
else if (child == 0)
{
int i;
struct stat statbuf;
for (i = 0; i < getdtablesize(); i++)
{
if ((i != pipe_in[0]) &&
(i != pipe_in[1]) &&
(i != pipe_out[0]) &&
(i != pipe_out[1]))
{
if (fcntl(i, F_GETFD) != -1)
{
if (fstat(i, &statbuf) != -1)
{
if (S_ISFIFO(statbuf.st_mode))
{
printf("Thread %2" PRIu32 " | CHILD | "
"unexpected pipe, fd = %d, inode = %" PRIu64 "\n",
td->thread_id, i, statbuf.st_ino);
}
}
}
}
}
LOG_PERROR(dup2, dup2(pipe_in[0], STDIN_FILENO) == -1);
LOG_PERROR(dup2, dup2(pipe_out[1], STDOUT_FILENO) == -1);
LOG_PERROR(close, close(pipe_in[0]) == -1);
LOG_PERROR(close, close(pipe_in[1]) == -1);
LOG_PERROR(close, close(pipe_out[0]) == -1);
LOG_PERROR(close, close(pipe_out[1]) == -1);
/* Never returns */
LOG_PERROR(execvp, execvp(args[0], args) == -1);
perror("execvp");
abort();
}
LOG_PERROR(close, close(pipe_in[0]) == -1);
LOG_PERROR(close, close(pipe_out[1]) == -1);
while (cumul_bytes < count)
{
int32_t i;
const ssize_t to_write_bytes =
MIN(count - cumul_bytes, (ssize_t) sizeof(out_buf));
assert((to_write_bytes % 4) == 0);
for (i = 0; i < (to_write_bytes / 4); i++)
{
const int32_t strided_idx = i * 4;
uint32_t* casted = (uint32_t*) &out_buf[strided_idx];
td->seed = lcg_a * td->seed + lcg_c;
*casted = td->seed;
}
this_bytes = 0;
while (this_bytes < to_write_bytes)
{
res = write(
pipe_in[1],
&out_buf[this_bytes],
to_write_bytes - this_bytes);
LOG_PERROR(write, res == -1);
this_bytes += res;
}
cumul_bytes += this_bytes;
}
/* Finished writing; close the write side of the in pipe. */
LOG_PERROR(close, close(pipe_in[1]) == -1);
this_bytes = 0;
do
{
res = read(
pipe_out[0],
&td->in_buf[this_bytes],
sizeof(td->in_buf) - this_bytes);
LOG_PERROR(read, res == -1);
if (this_bytes >= (ssize_t) sizeof(td->in_buf))
{
fprintf(stderr, "Soemthing went wrong!\n");
abort();
}
this_bytes += res;
}
while (res > 0);
LOG_PERROR(close, close(pipe_out[0]) == -1);
wait_res = waitpid(child, NULL, 0);
LOG_PERROR(waitpid, wait_res == -1);
if (wait_res != child)
{
fprintf(stderr, "waitpid returned unexpected result (%"
PRId32 ")\n", wait_res);
abort();
}
td->in_buf[this_bytes] = 0;
}
static void* thread_entry(void* arg)
{
thread_data_t* const td = (thread_data_t*) arg;
context_t* const ctx = td->ctx;
while (1)
{
/* Stop all threads at gate */
LOG_PTHREAD(pthread_mutex_lock(&ctx->gate_mutex));
ctx->gate_waiting++;
printf_guarded(ctx, "Thread %2" PRIu32 " | gate, WAITING\n",
td->thread_id);
LOG_PTHREAD(pthread_cond_wait(&ctx->gate_cond, &ctx->gate_mutex));
printf_guarded(ctx, "Thread %2" PRIu32 " | gate, RELEASED\n",
td->thread_id);
LOG_PTHREAD(pthread_mutex_unlock(&ctx->gate_mutex));
/* All threads are now released from the gate */
if (!ctx->running)
break;
do_work(td);
LOG_PTHREAD(pthread_mutex_lock(&ctx->master_mutex));
ctx->threads_finished_work++;
assert(ctx->threads_finished_work <= ctx->thread_count);
/* Last thread to finish should signal the master */
if (ctx->threads_finished_work == ctx->thread_count)
{
printf_guarded(ctx, "Thread %2" PRIu32
" | finished, signalling master\n",
td->thread_id);
LOG_PTHREAD(pthread_cond_signal(&ctx->master_cond));
}
else
{
printf_guarded(ctx, "Thread %2" PRIu32 " | finished\n", td->thread_id);
}
LOG_PTHREAD(pthread_mutex_unlock(&ctx->master_mutex));
}
return NULL;
}
int main(int argc, char** argv)
{
context_t ctx;
pthread_t* threads;
thread_data_t* tds;
uint32_t i, j;
if (argc > 1)
ctx.thread_count = strtoul(argv[1], NULL, 10);
else
ctx.thread_count = 1;
LOG_PTHREAD(pthread_mutex_init(&ctx.print_mutex, NULL));
LOG_PTHREAD(pthread_mutex_init(&ctx.master_mutex, NULL));
LOG_PTHREAD(pthread_cond_init(&ctx.master_cond, NULL));
ctx.threads_finished_work = 0;
LOG_PTHREAD(pthread_mutex_init(&ctx.gate_mutex, NULL));
LOG_PTHREAD(pthread_cond_init(&ctx.gate_cond, NULL));
ctx.gate_waiting = 0;
ctx.running = 1;
threads = malloc(sizeof(pthread_t) * ctx.thread_count);
tds = malloc(sizeof(thread_data_t) * ctx.thread_count);
for (i = 0; i < ctx.thread_count; i++)
{
tds[i].ctx = &ctx;
tds[i].thread_id = i;
tds[i].seed = i;
LOG_PTHREAD(pthread_create(&threads[i], NULL, thread_entry, &tds[i]));
}
for (i = 0; ctx.running && (i < 10); i++)
{
int all_threads_at_gate = 0;
while (!all_threads_at_gate)
{
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
assert(ctx.gate_waiting <= ctx.thread_count);
if (ctx.gate_waiting == ctx.thread_count)
all_threads_at_gate = 1;
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
if (!all_threads_at_gate)
sched_yield();
}
printf_guarded(&ctx, "====================\nStart of run %" PRIu32
"\nThread master has seen all threads at gate - "
"signalling them\n", i);
ctx.gate_waiting = 0;
/* The threads are now good to go. */
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
LOG_PTHREAD(pthread_cond_broadcast(&ctx.gate_cond));
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
/* Wait for threads to do the work... */
LOG_PTHREAD(pthread_mutex_lock(&ctx.master_mutex));
assert(ctx.threads_finished_work <= ctx.thread_count);
if (ctx.threads_finished_work < ctx.thread_count)
{
/* Master beat the threads to this point (very likely). */
printf_guarded(&ctx, "Thread master is waiting for signal...\n");
LOG_PTHREAD(pthread_cond_wait(
&ctx.master_cond, &ctx.master_mutex));
}
else
{
printf_guarded(&ctx,
"Thread master didn't need to wait for signal!\n");
}
printf_guarded(&ctx, "Thread master is proceeding\n");
ctx.threads_finished_work = 0;
LOG_PTHREAD(pthread_mutex_unlock(&ctx.master_mutex));
for (j = 0; j < ctx.thread_count; j++)
{
printf_guarded(&ctx, "Thread %2" PRIu32 " | got output: %s",
tds[j].thread_id, tds[j].in_buf);
}
}
ctx.running = 0;
printf_guarded(&ctx, "Signalling threads...\n");
LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
assert(ctx.gate_waiting == ctx.thread_count);
ctx.running = 0;
pthread_cond_broadcast(&ctx.gate_cond);
LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));
printf_guarded(&ctx, "Joining threads...\n");
for (i = 0; i < ctx.thread_count; i++)
{
LOG_PTHREAD(pthread_join(threads[i], NULL));
printf_guarded(&ctx, "Thread %2" PRIu32 " | final seed: %10"
PRIu32 "\n", i, tds[i].seed);
}
printf_guarded(&ctx, "Now freeing...\n");
free(tds);
free(threads);
LOG_PTHREAD(pthread_mutex_destroy(&ctx.gate_mutex));
LOG_PTHREAD(pthread_cond_destroy(&ctx.gate_cond));
LOG_PTHREAD(pthread_mutex_destroy(&ctx.master_mutex));
LOG_PTHREAD(pthread_cond_destroy(&ctx.master_cond));
LOG_PTHREAD(pthread_mutex_destroy(&ctx.print_mutex));
return 0;
}