使用多线程分叉的 Linux 系统调用中的死锁

问题描述 投票:0回答:1

最近,我一直在开发一种工具,该工具必须重复启动可执行文件并将大量数据输入其中并检查结果。由于此过程的一方面是计算密集型,因此我还使用 pthreads 将其设为多线程。

一旦我的单线程版本开始工作,我尝试了多个线程,很快发现代码被卡住了,而且似乎没有什么充分的理由什么也不做。我做了一些 GDB 调查并发现了以下线程:

我从中总结出以下要点:

  • 在 fork 和 exec 之间只能调用异步信号安全函数
  • 所持有的互斥体将在分叉期间以锁定状态复制

据我所知,这些似乎都不适用于我的情况。

请参阅下面我的最小示例。不幸的是,即使作为 MCV 示例,它仍然很大!高级描述:

  1. 主线程为屏障/“门”创建一个或多个工作线程和互斥体/条件的池,并指示工作运行的完成。工作线程运行并撞上门。
  2. 对于每次运行:
    1. 主人向大门发出信号。
    2. 工作线程使用 stdin/stdout 管道 fork-exec
      sha256sum
      ,并使用简单的 RNG 填充缓冲区、写入子进程、关闭写入管道、读取结果并调用 wait。
    3. Master 等待所有线程完成,在安全阻塞时对数据和线程执行任何操作并打印结果。
  3. Master 发出运行结束信号并释放门,然后加入线程。
  4. 工作人员看到运行结束并退出其入口功能。
  5. Master 根据需要取消初始化。

可以使用线程计数作为第一个参数来运行。当使用多个线程时,我发现代码很快就会卡住,并且这种情况可能以几种奇怪的方式发生。例如,一个或多个工作线程可能会陷入对

read()
close()
wait()
的调用中(第 200 行);或者所有线程都可能在门口陷入僵局(我确信这个事件不应该发生)。

没有任何事情会被认为是错误的;通过的管道 FD 符合预期。在继续之前,我使用 GDB 使用

start <n>
set scheduler-locking on
检查了这一点。

请注意,Valgrind 的 DRD 或 Helgrind 工具未发现此代码有任何问题/警告。有趣的是,代码在它们下以多线程方式工作(我认为是因为它会减慢速度或包装调用?)。

请问有人可以揭开这里发生的事情的神秘面纱吗?

更新

经过更多调查,我认为存在竞争条件,即分叉确实复制了我不打算复制的对象,但我不确定如何证明它。

请参阅以下来自

strace
的修剪输出。

家长:

pipe2([7, 8], 0)                        = 0
pipe2([9, 10], 0)                       = 0
write(1, "Thread  1 | pipe_in = {7, 8}, pi"..., 49) = 49
futex(0x7f82946ee0, FUTEX_WAIT_PRIVATE, 2, NULL) = 0
clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f81f8f250) = 1925
futex(0x7f82946ee0, FUTEX_WAKE_PRIVATE, 1) = 0
close(7)                                = 0
close(10)                               = 0
write(8, "lY\210<\333\205\210^~\1\26\201\305:s\264`m\360\f?\301\230^\222\335V\306\311_b\216"..., 65536) = 65536
...
write(8, "l\231\336n\333\305i\212~A\6\273\305z&\237`\255\212\243?\1\356\271\222\35+\241\311\237\251|"..., 57600) = 57600
close(8)                                = 0
read(9,  <unfinished ...>)              = ?
+++ killed by SIGINT +++

sha256sum
孩子(PID 1925):

read(0, "l\271\236\363\333e\353\364~a[i\305\329\204`\315<Q?\241\271\244\222=BO\311?\226\252"..., 32768) = 32768
read(0, "lY\t\"\333\205\25l~\1?_\305:\210R`m\1r?\301uF\222\335\217\232\311_G\v"..., 32768) = 32768
read(0, "l\371s\240\333\245?\363~\241\"%\305Z\327\260`\r\306\342?\3411\370\222}\335\265\311\177\370\373"..., 32768) = 32768
read(0, "l\231\336n\333\305i\212~A\6\273\305z&\237`\255\212\243?\1\356\271\222\35+\241\311\237\251|"..., 32768) = 32768
read(0, "l9I\215\333\345\2231~\341\351 \305\232u\35`MO\264?!\252\213\222\275x\\\311\277Z\215"..., 32768) = 24832
read(0, 0x55a3541510, 4096)             = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGINT {si_signo=SIGINT, si_code=SI_KERNEL} ---
+++ killed by SIGINT +++

因此,父级关闭了子级标准输入的最终 FD,但是

read()
调用不会像应有的那样返回零,除非我弄错了。每一端的写入和读取大小不同,但这似乎是一个转移注意力的事情。

我认为这只能是由于在分叉上复制 FD 的不确定性发生?

代码:

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include <sys/wait.h>
#include <pthread.h>
#include <sched.h>
#include <assert.h>
#include <stdarg.h>

#define MIN(a, b) (((a) > (b)) ? (b) : (a))

#define LOG_PERROR(func, failCond) \
    do { \
        if (failCond) { \
            perror(#func); \
            abort(); \
        } \
    } \
    while (0)

#define LOG_PTHREAD(funcCall) \
    do { \
        const int res = (funcCall); \
        if (res != 0) { \
            fprintf(stderr, #funcCall " failed with ret code %" PRId32 "\n", \
                    res); \
            abort(); \
        } \
    } \
    while (0)

typedef struct
{
    uint32_t thread_count;

    pthread_mutex_t print_mutex;

    pthread_mutex_t master_mutex;
    pthread_cond_t  master_cond;

    /* Guarded by `master_mutex` */
    uint32_t threads_finished_work;

    pthread_mutex_t gate_mutex;
    pthread_cond_t  gate_cond;

    /* Guarded by `gate_mutex` */
    uint32_t gate_waiting;

    int running;
} context_t;

typedef struct
{
    context_t* ctx;
    uint32_t thread_id;
    uint32_t seed;
    char in_buf[1024];
} thread_data_t;

static void printf_guarded(context_t* ctx, const char* fmt, ...)
{
    va_list ap;

    LOG_PTHREAD(pthread_mutex_lock(&ctx->print_mutex));

    va_start(ap, fmt);
    vprintf(fmt, ap);
    va_end(ap);

    LOG_PTHREAD(pthread_mutex_unlock(&ctx->print_mutex));
}

static void do_work(thread_data_t* const td)
{
    const int32_t count = 100000000;
    const uint32_t lcg_a = UINT32_C(1664525);
    const uint32_t lcg_c = UINT32_C(1013904223);

    char out_buf[65536];
    char* args[2] = {"sha256sum", NULL};
    int pipe_in[2];
    int pipe_out[2];
    pid_t child;
    pid_t wait_res;
    ssize_t cumul_bytes = 0;
    ssize_t this_bytes;
    ssize_t res;

    assert((sizeof(count) % 4) == 0);
    assert((sizeof(out_buf) % 4) == 0);

    /*
    printf_guarded(ctx, "Thread %2" PRIu32 " | seed: %10" PRIu32 "\n",
            td->thread_id, td->seed);
    */
    LOG_PERROR(pipe, pipe(pipe_in) == -1);
    LOG_PERROR(pipe, pipe(pipe_out) == -1);

    printf_guarded(td->ctx, "Thread %2" PRIu32
            " | pipe_in = {%d, %d}, pipe_out = {%d, %d}\n",
            td->thread_id, pipe_in[0], pipe_in[1], pipe_out[0], pipe_out[1]);

    child = fork();
    LOG_PERROR(fork, child == -1);

    if (child == 0)
    {
        LOG_PERROR(dup2, dup2(pipe_in[0], STDIN_FILENO) == -1);
        LOG_PERROR(dup2, dup2(pipe_out[1], STDOUT_FILENO) == -1);
        LOG_PERROR(close, close(pipe_in[0]) == -1);
        LOG_PERROR(close, close(pipe_in[1]) == -1);
        LOG_PERROR(close, close(pipe_out[0]) == -1);
        LOG_PERROR(close, close(pipe_out[1]) == -1);
        LOG_PERROR(execvp, execvp(args[0], args) == -1);
        /* Never returns */

        perror("execvp");
        abort();
    }

    LOG_PERROR(close, close(pipe_in[0]) == -1);
    LOG_PERROR(close, close(pipe_out[1]) == -1);

    while (cumul_bytes < count)
    {
        int32_t i;
        const ssize_t to_write_bytes =
            MIN(count - cumul_bytes, (ssize_t) sizeof(out_buf));

        /*
        printf_guarded(ctx, "Thread %2" PRIu32 " | cumul_bytes = %" PRId64 "\n",
                td->thread_id, cumul_bytes);
        */

        assert((to_write_bytes % 4) == 0);

        for (i = 0; i < (to_write_bytes / 4); i++)
        {
            const int32_t strided_idx = i * 4;
            uint32_t* casted = (uint32_t*) &out_buf[strided_idx];

            td->seed = lcg_a * td->seed + lcg_c;
            *casted = td->seed;
        }

        this_bytes = 0;

        while (this_bytes < to_write_bytes)
        {
            /*
            printf_guarded(ctx, "Thread %2" PRIu32 " | write(..., [%"
                    PRId64 "], %" PRId64 ")\n",
                    td->thread_id, this_bytes, to_write_bytes - this_bytes);
            */

            res = write(
                    pipe_in[1],
                    &out_buf[this_bytes],
                    to_write_bytes - this_bytes);
            LOG_PERROR(write, res == -1);
            this_bytes += res;
        }

        cumul_bytes += this_bytes;
    }

    /* Finished writing; close the write side of the in pipe. */
    LOG_PERROR(close, close(pipe_in[1]) == -1);

    this_bytes = 0;

    do
    {
        /*
        printf_guarded(ctx, "Thread %2" PRIu32 " | read(..., [%"
                PRId64 "], %" PRId64 ")\n",
                td->thread_id, this_bytes, sizeof(td->in_buf) - this_bytes);
        */

        res = read(
                pipe_out[0],
                &td->in_buf[this_bytes],
                sizeof(td->in_buf) - this_bytes);
        LOG_PERROR(read, res == -1);

        if (this_bytes >= (ssize_t) sizeof(td->in_buf))
        {
            fprintf(stderr, "Soemthing went wrong!\n");
            abort();
        }

        this_bytes += res;
    }
    while (res > 0);

    LOG_PERROR(close, close(pipe_out[0]) == -1);
    wait_res = waitpid(child, NULL, 0);
    LOG_PERROR(waitpid, wait_res == -1);

    if (wait_res != child)
    {
        fprintf(stderr, "waitpid returned unexpected result (%"
                PRId32 ")\n", wait_res);
        abort();
    }

    td->in_buf[this_bytes] = 0;
}

static void* thread_entry(void* arg)
{
    thread_data_t* const td = (thread_data_t*) arg;
    context_t* const ctx = td->ctx;

    while (1)
    {
        /* Stop all threads at gate */

        LOG_PTHREAD(pthread_mutex_lock(&ctx->gate_mutex));
        ctx->gate_waiting++;
        printf_guarded(ctx, "Thread %2" PRIu32 " | gate, WAITING\n",
                td->thread_id);
        LOG_PTHREAD(pthread_cond_wait(&ctx->gate_cond, &ctx->gate_mutex));
        printf_guarded(ctx, "Thread %2" PRIu32 " | gate, RELEASED\n",
                td->thread_id);
        LOG_PTHREAD(pthread_mutex_unlock(&ctx->gate_mutex));

        /* All threads are now released from the gate */

        if (!ctx->running)
            break;

        do_work(td);

        LOG_PTHREAD(pthread_mutex_lock(&ctx->master_mutex));

        ctx->threads_finished_work++;
        assert(ctx->threads_finished_work <= ctx->thread_count);

        /* Last thread to finish should signal the master */

        if (ctx->threads_finished_work == ctx->thread_count)
        {
            printf_guarded(ctx, "Thread %2" PRIu32
                    " | finished, signalling master\n",
                    td->thread_id);
            LOG_PTHREAD(pthread_cond_signal(&ctx->master_cond));
        }
        else
        {
            printf_guarded(ctx, "Thread %2" PRIu32 " | finished\n", td->thread_id);
        }

        LOG_PTHREAD(pthread_mutex_unlock(&ctx->master_mutex));
    }

    return NULL;
}

int main(int argc, char** argv)
{
    context_t ctx;
    pthread_t* threads;
    thread_data_t* tds;
    uint32_t i, j;

    if (argc > 1)
        ctx.thread_count = strtoul(argv[1], NULL, 10);
    else
        ctx.thread_count = 1;

    LOG_PTHREAD(pthread_mutex_init(&ctx.print_mutex, NULL));

    LOG_PTHREAD(pthread_mutex_init(&ctx.master_mutex, NULL));
    LOG_PTHREAD(pthread_cond_init(&ctx.master_cond, NULL));

    ctx.threads_finished_work = 0;

    LOG_PTHREAD(pthread_mutex_init(&ctx.gate_mutex, NULL));
    LOG_PTHREAD(pthread_cond_init(&ctx.gate_cond, NULL));

    ctx.gate_waiting = 0;
    ctx.running = 1;

    threads = malloc(sizeof(pthread_t) * ctx.thread_count);
    tds = malloc(sizeof(thread_data_t) * ctx.thread_count);

    for (i = 0; i < ctx.thread_count; i++)
    {
        tds[i].ctx = &ctx;
        tds[i].thread_id = i;
        tds[i].seed = i;

        LOG_PTHREAD(pthread_create(&threads[i], NULL, thread_entry, &tds[i]));
    }

    for (i = 0; ctx.running && (i < 10); i++)
    {
        int all_threads_at_gate = 0;

        while (!all_threads_at_gate)
        {
            LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));

            assert(ctx.gate_waiting <= ctx.thread_count);

            if (ctx.gate_waiting == ctx.thread_count)
                all_threads_at_gate = 1;

            LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));

            if (!all_threads_at_gate)
                sched_yield();
        }

        printf_guarded(&ctx, "====================\nStart of run %" PRIu32
                "\nThread master has seen all threads at gate - "
                "signalling them\n", i);

        ctx.gate_waiting = 0;

        /* The threads are now good to go. */

        LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
        LOG_PTHREAD(pthread_cond_broadcast(&ctx.gate_cond));
        LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));

        /* Wait for threads to do the work... */

        LOG_PTHREAD(pthread_mutex_lock(&ctx.master_mutex));

        assert(ctx.threads_finished_work <= ctx.thread_count);

        if (ctx.threads_finished_work < ctx.thread_count)
        {
            /* Master beat the threads to this point (very likely). */

            printf_guarded(&ctx, "Thread master is waiting for signal...\n");

            LOG_PTHREAD(pthread_cond_wait(
                        &ctx.master_cond, &ctx.master_mutex));
        }
        else
        {
            printf_guarded(&ctx,
                    "Thread master didn't need to wait for signal!\n");
        }

        printf_guarded(&ctx, "Thread master is proceeding\n");
        ctx.threads_finished_work = 0;

        LOG_PTHREAD(pthread_mutex_unlock(&ctx.master_mutex));

        for (j = 0; j < ctx.thread_count; j++)
        {
            printf_guarded(&ctx, "Thread %2" PRIu32 " | got output: %s",
                    tds[j].thread_id, tds[j].in_buf);
        }
    }

    ctx.running = 0;

    printf_guarded(&ctx, "Signalling threads...\n");

    LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));

    assert(ctx.gate_waiting == ctx.thread_count);
    ctx.running = 0;
    pthread_cond_broadcast(&ctx.gate_cond);

    LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));

    printf_guarded(&ctx, "Joining threads...\n");

    for (i = 0; i < ctx.thread_count; i++)
    {
        LOG_PTHREAD(pthread_join(threads[i], NULL));
        printf_guarded(&ctx, "Thread %2" PRIu32 " | final seed: %10"
                PRIu32 "\n", i, tds[i].seed);
    }

    printf_guarded(&ctx, "Now freeing...\n");

    free(tds);
    free(threads);

    LOG_PTHREAD(pthread_mutex_destroy(&ctx.gate_mutex));
    LOG_PTHREAD(pthread_cond_destroy(&ctx.gate_cond));

    LOG_PTHREAD(pthread_mutex_destroy(&ctx.master_mutex));
    LOG_PTHREAD(pthread_cond_destroy(&ctx.master_cond));

    LOG_PTHREAD(pthread_mutex_destroy(&ctx.print_mutex));

    return 0;
}
c linux multithreading system-calls deadlock
1个回答
0
投票

对于这个复杂的问题,我深表歉意。我一直在思考想法并找到了答案(或者至少是大部分答案)。

问题:管道显然在进程中的所有线程(及其子线程)之间共享

考虑到这个不稳定的假设,即该软件是为创建子进程而编写的,这些子进程会摄取任意数据直到文件末尾,然后打印其输出,不难看出杂散管道文件描述符如何意味着关闭写入端

sha256sum
stdin 管道的 不指示 EOF。

证明这一点的代码如下。它打印出任何不属于它只能访问的四个管道端之一的管道 FD。当使用一个线程运行它时,没有任何差异输出,但使用多个线程时......:

Thread  0 | gate, WAITING
Thread  1 | gate, WAITING
====================
Start of run 0
Thread master has seen all threads at gate - signalling them
Thread master is waiting for signal...
Thread  1 | gate, RELEASED
Thread  0 | gate, RELEASED
Thread  1 | pipe_in = {3, 4}, pipe_out = {5, 6}
Thread  0 | pipe_in = {7, 8}, pipe_out = {9, 10}
Thread  1 | my PID: 1678
Thread  1 | PARENT | unexpected pipe, fd = 7, inode = 8846
Thread  1 | PARENT | unexpected pipe, fd = 8, inode = 8846
Thread  1 | PARENT | unexpected pipe, fd = 9, inode = 8847
Thread  1 | PARENT | unexpected pipe, fd = 10, inode = 8847
Thread  1 | CHILD  | unexpected pipe, fd = 7, inode = 8846
Thread  1 | CHILD  | unexpected pipe, fd = 8, inode = 8846
Thread  1 | CHILD  | unexpected pipe, fd = 9, inode = 8847
Thread  0 | my PID: 1678
Thread  1 | CHILD  | unexpected pipe, fd = 10, inode = 8847
Thread  0 | PARENT | unexpected pipe, fd = 3, inode = 9492
Thread  0 | PARENT | unexpected pipe, fd = 4, inode = 9492
Thread  0 | PARENT | unexpected pipe, fd = 5, inode = 9493
Thread  0 | PARENT | unexpected pipe, fd = 6, inode = 9493
Thread  0 | CHILD  | unexpected pipe, fd = 3, inode = 9492
Thread  0 | CHILD  | unexpected pipe, fd = 4, inode = 9492
Thread  0 | CHILD  | unexpected pipe, fd = 5, inode = 9493
Thread  0 | CHILD  | unexpected pipe, fd = 6, inode = 9493
^C

这可以通过确保孩子们在调用

execvp()
之前关闭所有无关的管道 FD 来解决。

我关心的不仅仅是管道,比如继承的 futexes。我不明白 futexes 是如何工作的,但是

fork()
上也可能存在“继承”问题。原因是,在极少数情况下,我目睹所有线程成功执行运行但卡在门上,这应该是不可能的,除非门互斥锁在锁定时被继承。

如有任何反馈,我们将不胜感激。

完整代码:

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <stdint.h>
#include <stdarg.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <pthread.h>
#include <sched.h>
#include <assert.h>

#define MIN(a, b) (((a) > (b)) ? (b) : (a))

#define LOG_PERROR(func, failCond) \
    do { \
        if (failCond) { \
            perror(#func); \
            abort(); \
        } \
    } \
    while (0)

#define LOG_PTHREAD(funcCall) \
    do { \
        const int res = (funcCall); \
        if (res != 0) { \
            fprintf(stderr, #funcCall " failed with ret code %" PRId32 "\n", \
                    res); \
            abort(); \
        } \
    } \
    while (0)

typedef struct
{
    uint32_t thread_count;

    pthread_mutex_t print_mutex;

    pthread_mutex_t master_mutex;
    pthread_cond_t  master_cond;

    /* Guarded by `master_mutex` */
    uint32_t threads_finished_work;

    pthread_mutex_t gate_mutex;
    pthread_cond_t  gate_cond;

    /* Guarded by `gate_mutex` */
    uint32_t gate_waiting;

    int running;
} context_t;

typedef struct
{
    context_t* ctx;
    uint32_t thread_id;
    uint32_t seed;
    char in_buf[1024];
} thread_data_t;

static void printf_guarded(context_t* ctx, const char* fmt, ...)
{
    va_list ap;

    LOG_PTHREAD(pthread_mutex_lock(&ctx->print_mutex));

    va_start(ap, fmt);
    vprintf(fmt, ap);
    va_end(ap);

    LOG_PTHREAD(pthread_mutex_unlock(&ctx->print_mutex));
}

static void do_work(thread_data_t* const td)
{
    const int32_t count = 100000000;
    const uint32_t lcg_a = UINT32_C(1664525);
    const uint32_t lcg_c = UINT32_C(1013904223);

    char out_buf[65536];
    char* args[2] = {"sha256sum", NULL};
    int pipe_in[2];
    int pipe_out[2];
    pid_t child;
    pid_t wait_res;
    ssize_t cumul_bytes = 0;
    ssize_t this_bytes;
    ssize_t res;

    assert((sizeof(count) % 4) == 0);
    assert((sizeof(out_buf) % 4) == 0);

    /*
    printf_guarded(ctx, "Thread %2" PRIu32 " | seed: %10" PRIu32 "\n",
            td->thread_id, td->seed);
    */
    LOG_PERROR(pipe, pipe(pipe_in) == -1);
    LOG_PERROR(pipe, pipe(pipe_out) == -1);

    printf_guarded(td->ctx, "Thread %2" PRIu32
            " | pipe_in = {%d, %d}, pipe_out = {%d, %d}\n",
            td->thread_id, pipe_in[0], pipe_in[1], pipe_out[0], pipe_out[1]);

    child = fork();
    LOG_PERROR(fork, child == -1);

    if (child > 0)
    {
        int i;
        struct stat statbuf;

        printf("Thread %2" PRIu32 " | my PID: %d\n", td->thread_id, getpid());

        for (i = 0; i < getdtablesize(); i++)
        {
            if ((i != pipe_in[0]) &&
                (i != pipe_in[1]) &&
                (i != pipe_out[0]) &&
                (i != pipe_out[1]))
            {
                if (fcntl(i, F_GETFD) != -1)
                {
                    if (fstat(i, &statbuf) != -1)
                    {
                        if (S_ISFIFO(statbuf.st_mode))
                        {
                            printf("Thread %2" PRIu32 " | PARENT | "
                                    "unexpected pipe, fd = %d, inode = %" PRIu64 "\n",
                                    td->thread_id, i, statbuf.st_ino);
                        }
                    }
                }
            }
        }
    }
    else if (child == 0)
    {
        int i;
        struct stat statbuf;

        for (i = 0; i < getdtablesize(); i++)
        {
            if ((i != pipe_in[0]) &&
                (i != pipe_in[1]) &&
                (i != pipe_out[0]) &&
                (i != pipe_out[1]))
            {
                if (fcntl(i, F_GETFD) != -1)
                {
                    if (fstat(i, &statbuf) != -1)
                    {
                        if (S_ISFIFO(statbuf.st_mode))
                        {
                            printf("Thread %2" PRIu32 " | CHILD  | "
                                    "unexpected pipe, fd = %d, inode = %" PRIu64 "\n",
                                    td->thread_id, i, statbuf.st_ino);
                        }
                    }
                }
            }
        }

        LOG_PERROR(dup2, dup2(pipe_in[0], STDIN_FILENO) == -1);
        LOG_PERROR(dup2, dup2(pipe_out[1], STDOUT_FILENO) == -1);
        LOG_PERROR(close, close(pipe_in[0]) == -1);
        LOG_PERROR(close, close(pipe_in[1]) == -1);
        LOG_PERROR(close, close(pipe_out[0]) == -1);
        LOG_PERROR(close, close(pipe_out[1]) == -1);

        /* Never returns */
        LOG_PERROR(execvp, execvp(args[0], args) == -1);

        perror("execvp");
        abort();
    }

    LOG_PERROR(close, close(pipe_in[0]) == -1);
    LOG_PERROR(close, close(pipe_out[1]) == -1);

    while (cumul_bytes < count)
    {
        int32_t i;
        const ssize_t to_write_bytes =
            MIN(count - cumul_bytes, (ssize_t) sizeof(out_buf));

        assert((to_write_bytes % 4) == 0);

        for (i = 0; i < (to_write_bytes / 4); i++)
        {
            const int32_t strided_idx = i * 4;
            uint32_t* casted = (uint32_t*) &out_buf[strided_idx];

            td->seed = lcg_a * td->seed + lcg_c;
            *casted = td->seed;
        }

        this_bytes = 0;

        while (this_bytes < to_write_bytes)
        {
            res = write(
                    pipe_in[1],
                    &out_buf[this_bytes],
                    to_write_bytes - this_bytes);
            LOG_PERROR(write, res == -1);
            this_bytes += res;
        }

        cumul_bytes += this_bytes;
    }

    /* Finished writing; close the write side of the in pipe. */
    LOG_PERROR(close, close(pipe_in[1]) == -1);

    this_bytes = 0;

    do
    {
        res = read(
                pipe_out[0],
                &td->in_buf[this_bytes],
                sizeof(td->in_buf) - this_bytes);
        LOG_PERROR(read, res == -1);

        if (this_bytes >= (ssize_t) sizeof(td->in_buf))
        {
            fprintf(stderr, "Soemthing went wrong!\n");
            abort();
        }

        this_bytes += res;
    }
    while (res > 0);

    LOG_PERROR(close, close(pipe_out[0]) == -1);
    wait_res = waitpid(child, NULL, 0);
    LOG_PERROR(waitpid, wait_res == -1);

    if (wait_res != child)
    {
        fprintf(stderr, "waitpid returned unexpected result (%"
                PRId32 ")\n", wait_res);
        abort();
    }

    td->in_buf[this_bytes] = 0;
}

static void* thread_entry(void* arg)
{
    thread_data_t* const td = (thread_data_t*) arg;
    context_t* const ctx = td->ctx;

    while (1)
    {
        /* Stop all threads at gate */

        LOG_PTHREAD(pthread_mutex_lock(&ctx->gate_mutex));
        ctx->gate_waiting++;
        printf_guarded(ctx, "Thread %2" PRIu32 " | gate, WAITING\n",
                td->thread_id);
        LOG_PTHREAD(pthread_cond_wait(&ctx->gate_cond, &ctx->gate_mutex));
        printf_guarded(ctx, "Thread %2" PRIu32 " | gate, RELEASED\n",
                td->thread_id);
        LOG_PTHREAD(pthread_mutex_unlock(&ctx->gate_mutex));

        /* All threads are now released from the gate */

        if (!ctx->running)
            break;

        do_work(td);

        LOG_PTHREAD(pthread_mutex_lock(&ctx->master_mutex));

        ctx->threads_finished_work++;
        assert(ctx->threads_finished_work <= ctx->thread_count);

        /* Last thread to finish should signal the master */

        if (ctx->threads_finished_work == ctx->thread_count)
        {
            printf_guarded(ctx, "Thread %2" PRIu32
                    " | finished, signalling master\n",
                    td->thread_id);
            LOG_PTHREAD(pthread_cond_signal(&ctx->master_cond));
        }
        else
        {
            printf_guarded(ctx, "Thread %2" PRIu32 " | finished\n", td->thread_id);
        }

        LOG_PTHREAD(pthread_mutex_unlock(&ctx->master_mutex));
    }

    return NULL;
}

int main(int argc, char** argv)
{
    context_t ctx;
    pthread_t* threads;
    thread_data_t* tds;
    uint32_t i, j;

    if (argc > 1)
        ctx.thread_count = strtoul(argv[1], NULL, 10);
    else
        ctx.thread_count = 1;

    LOG_PTHREAD(pthread_mutex_init(&ctx.print_mutex, NULL));

    LOG_PTHREAD(pthread_mutex_init(&ctx.master_mutex, NULL));
    LOG_PTHREAD(pthread_cond_init(&ctx.master_cond, NULL));

    ctx.threads_finished_work = 0;

    LOG_PTHREAD(pthread_mutex_init(&ctx.gate_mutex, NULL));
    LOG_PTHREAD(pthread_cond_init(&ctx.gate_cond, NULL));

    ctx.gate_waiting = 0;
    ctx.running = 1;

    threads = malloc(sizeof(pthread_t) * ctx.thread_count);
    tds = malloc(sizeof(thread_data_t) * ctx.thread_count);

    for (i = 0; i < ctx.thread_count; i++)
    {
        tds[i].ctx = &ctx;
        tds[i].thread_id = i;
        tds[i].seed = i;

        LOG_PTHREAD(pthread_create(&threads[i], NULL, thread_entry, &tds[i]));
    }

    for (i = 0; ctx.running && (i < 10); i++)
    {
        int all_threads_at_gate = 0;

        while (!all_threads_at_gate)
        {
            LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));

            assert(ctx.gate_waiting <= ctx.thread_count);

            if (ctx.gate_waiting == ctx.thread_count)
                all_threads_at_gate = 1;

            LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));

            if (!all_threads_at_gate)
                sched_yield();
        }

        printf_guarded(&ctx, "====================\nStart of run %" PRIu32
                "\nThread master has seen all threads at gate - "
                "signalling them\n", i);

        ctx.gate_waiting = 0;

        /* The threads are now good to go. */

        LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));
        LOG_PTHREAD(pthread_cond_broadcast(&ctx.gate_cond));
        LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));

        /* Wait for threads to do the work... */

        LOG_PTHREAD(pthread_mutex_lock(&ctx.master_mutex));

        assert(ctx.threads_finished_work <= ctx.thread_count);

        if (ctx.threads_finished_work < ctx.thread_count)
        {
            /* Master beat the threads to this point (very likely). */

            printf_guarded(&ctx, "Thread master is waiting for signal...\n");

            LOG_PTHREAD(pthread_cond_wait(
                        &ctx.master_cond, &ctx.master_mutex));
        }
        else
        {
            printf_guarded(&ctx,
                    "Thread master didn't need to wait for signal!\n");
        }

        printf_guarded(&ctx, "Thread master is proceeding\n");
        ctx.threads_finished_work = 0;

        LOG_PTHREAD(pthread_mutex_unlock(&ctx.master_mutex));

        for (j = 0; j < ctx.thread_count; j++)
        {
            printf_guarded(&ctx, "Thread %2" PRIu32 " | got output: %s",
                    tds[j].thread_id, tds[j].in_buf);
        }
    }

    ctx.running = 0;

    printf_guarded(&ctx, "Signalling threads...\n");

    LOG_PTHREAD(pthread_mutex_lock(&ctx.gate_mutex));

    assert(ctx.gate_waiting == ctx.thread_count);
    ctx.running = 0;
    pthread_cond_broadcast(&ctx.gate_cond);

    LOG_PTHREAD(pthread_mutex_unlock(&ctx.gate_mutex));

    printf_guarded(&ctx, "Joining threads...\n");

    for (i = 0; i < ctx.thread_count; i++)
    {
        LOG_PTHREAD(pthread_join(threads[i], NULL));
        printf_guarded(&ctx, "Thread %2" PRIu32 " | final seed: %10"
                PRIu32 "\n", i, tds[i].seed);
    }

    printf_guarded(&ctx, "Now freeing...\n");

    free(tds);
    free(threads);

    LOG_PTHREAD(pthread_mutex_destroy(&ctx.gate_mutex));
    LOG_PTHREAD(pthread_cond_destroy(&ctx.gate_cond));

    LOG_PTHREAD(pthread_mutex_destroy(&ctx.master_mutex));
    LOG_PTHREAD(pthread_cond_destroy(&ctx.master_cond));

    LOG_PTHREAD(pthread_mutex_destroy(&ctx.print_mutex));

    return 0;
}
© www.soinside.com 2019 - 2024. All rights reserved.