我对内存排序模型的理解正确吗?

问题描述 投票:0回答:1

我正在学习 C 内存排序模型,我想出了这个小代码,让生产者和消费者与“单元”共享“桶”。

我打算在生产者 (P) 和消费者 (C) 之间创建这种“发生在之前”的关系序列:

  • P:
    write bucket [fill]
  • P:释放存储bucket_full
  • C:获取负载bucket_full
  • C:
    write,read bucket [decrement,print]
  • C:释放存储桶已满
  • P:获取负载bucket_full
  • P:
    write bucket [fill]
    (循环重复)

(我知道有更清洁的方法来实现生产者-消费者二人组,这只是一个练习)

问题

  1. 我是否正确建立了“发生在之前”的关系?
  2. 我是否真的陷入了数据竞争,并且纯粹是运气好,它只是不可见?

代码:

#include <stdio.h>
#include <stdatomic.h>
#include <pthread.h>

int num_buckets = 30, bucket = 0, units_per_bucket = 12;
_Atomic int bucket_full = 0;

void *producer(void *arg){
    for(int i=0; i<num_buckets+1; ++i){
        //acquire load to wait for "empty bucket" signal
        while(atomic_load_explicit(&bucket_full, memory_order_acquire))
            sched_yield();
        if(i == num_buckets){ // no more buckets to fill
            atomic_store_explicit(&bucket_full, -1, memory_order_release);
            break;
        }
        bucket = units_per_bucket; // fill bucket
        printf("\n[%2i]: %d", i+1, bucket);
        // release store to signal "bucket is full"
        atomic_store_explicit(&bucket_full, 1, memory_order_release);
    }
    return arg;
}

void *consumer(void *arg){
    while(1){
        //acquire load to wait for "bucket is full" signal
        while(!atomic_load_explicit(&bucket_full, memory_order_acquire))
            sched_yield();
        if(bucket_full == -1) //no more buckets, exit
            break;
        bucket --;
        printf(" %d", bucket);
        if(bucket < 1) // release store to signal "bucket is empty"
            atomic_store_explicit(&bucket_full, 0, memory_order_release);
    }
    return arg;
}

int main(){
    pthread_t t0, t1;
    pthread_create(&t0, NULL, consumer, (void *)NULL);
    pthread_create(&t1, NULL, producer, (void *)NULL);
    pthread_join(t0, NULL);
    pthread_join(t1, NULL);
    printf("\n");
    return 0;
}

编译为:

gcc -std=c11 -pthread -O2 -Wall -Wextra -Werror -pedantic -pedantic-errors main.c -o main

谢谢。

c pthreads memory-barriers stdatomic
1个回答
0
投票

在我看来,单个生产者和单个消费者没问题,因为他们无法并行执行任何工作;一个旋转在执行释放存储后等待另一个,这将使另一个离开循环。负载是

acquire
所以是的,这会创建一个发生之前。

消费者无需在

bucket_full
向0倒计时时重新检查
bucket
;一旦它看到
bucket_full
不为零(而不是
-1
具有单独的
seq_cst
负载,因为您没有加载到自旋循环中的临时值中),您也可能有一个像
while(--bucket >= 1){ ... } 这样的内部循环
除非您希望能够有第三个线程(例如主线程)将
bucket_full = -1
设置为消费者的
exit_now
标志。

按照您编写的方式,消费者在从 12 开始倒计时时将无法将非原子

bucket
保留在寄存器中,因为每个获取负载都可能意味着您可能已与存储的另一个线程同步那些变量。 (如果不引起潜在的数据竞争,这实际上可能是不可能的,但编译器对此有些保守。此外,调用像
printf
这样的非内联函数也会使编译器假设所有全局变量都可能已被修改。并且
bucket 
是全局的,不是
static
。)

当然,整个算法希望只是一个学习练习。 线程之间没有任何并行性,因为一个线程总是在旋转等待,而另一个线程则工作,并且没有简单的方法来修改它。除了我想让制作人在旋转等待

bucket_full == 0
之前做一些工作,例如准备一个重要的值放入一个桶中。

使用数组的单生产者单消费者 (SPSC) 队列是存在的,如果您想要的话,它可以很好地工作,如果双方运行速度相似,则允许双方将大部分 CPU 时间花在不旋转的工作上。


如果您在 x86 硬件上进行测试,请记住它的内存模型非常强大;每个 asm 加载和存储都分别隐式获取和释放。 因此,通常只有编译时重新排序才能导致 C++ 内存模型允许的情况。 您可以使用

clang -O2 -fsanitize=thread
来检测一些运行时问题。

© www.soinside.com 2019 - 2024. All rights reserved.