使用互斥锁来停止启动/停止运行同一函数的多个线程

问题描述 投票:0回答:2

我的目标是这样的:

我想让多个线程运行同一个函数,该函数是一个 while(1) 循环,所以它会继续下去。在某些时候,main 应该使用互斥体来强制所有线程阻塞,直到 main(或其他东西)发出信号让它们继续,此时它们都将继续,直到它们被迫阻塞它们。

线程应该并行运行

我尝试使用 pthread_cond_wait() 让线程等待,并使用 pthread_cond_broadcast() 让它们继续,但它根本不起作用

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mtx;
pthread_cond_t cv;

void* func(void* arg) {
    while (1) {
        pthread_mutex_lock(&mtx);
        pthread_cond_wait(&cv, &mtx);

        printf("Thread %lu is running.\n", pthread_self());
        pthread_mutex_unlock(&mtx);

        usleep(500000);
    }
}

int main() {
    pthread_t threads[2];

    pthread_mutex_init(&mtx, NULL);
    pthread_cond_init(&cv, NULL);

    for (int i = 0; i < 2; ++i) {
        pthread_create(&threads[i], NULL, func, NULL);
    }
    pthread_mutex_unlock(&mtx);
    pthread_cond_broadcast(&cv);

    sleep(2);

    pthread_mutex_lock(&mtx);
    
    sleep(2);

    pthread_mutex_unlock(&mtx);
    pthread_cond_broadcast(&cv);
    sleep(2);

    return 0;
}

c pthreads mutex
2个回答
2
投票

当您希望只有一个线程能够访问给定的代码部分(或者在信号量的情况下,您希望一些固定数量的线程进入给定的代码部分)时,互斥锁非常有用。

这不是您的用例所要求的:您可以在 while 循环的“关键部分”内拥有无限数量的线程,您只需要一种与所有线程通信它们应该停止的方法。

您的具体用例应该决定您的解决方案,但您可以通过以下几种方法继续:

  • 您可能需要考虑读写锁:这是一种锁,它不仅仅提供
    lock
    unlock
    方法,还提供
    readLock
    readUnlock
    writeLock
    writeUnlock
    方法并在前提是任意数量的读者可以同时持有锁,但读者不能在写者可以持有锁的任何时候持有锁。因此,在您的示例中,每个 while 循环的开始将是对
    lock.readLock()
    的调用,在循环结束时调用
    lock.readUnlock()
    ,当您的主函数想要停止所有线程时,它将调用
     writeLock.lock()
  • 一个更简单的解决方案是每个线程都将在其上旋转的单个原子变量
    should_wait
   // start of your while(1) loop
   while(should_wait.load());

主功能将设置为

true
false
。由于这可能会导致线程之间出现大量争用,因此您也可以为每个线程使用其中一个(对于
N
线程,您可能有一组
N
布尔标志,其中每个线程将自行旋转
bool
多变的)。由于这种繁忙等待通常仅在等待时间很短的情况下才实用,因此您可能需要引入某种退避:

    // start of your while(1) loop
    
    while(should_wait.load())
    {
        backoff();
    }
    reset_backoff();

其中 backoff 是每个线程的状态,每次调用时都会在

MIN_WAIT
MAX_WAIT
之间选择一些随机时间进行睡眠,并且每次调用都会加倍
MAX_WAIT
。当然,其他更有效等待的方法的兔子洞非常深,但这种方法并非不合理。

另请注意,如果您不需要任何线程位于关键部分,则应该在每个线程将对其进行操作的

while
循环中设置某种线程计数器,如下所示:

    // main function
    should_wait.store(true);
    // do work that assumes all threads are waiting

如果线程在设置等待标志之前进入临界区,则不起作用。我会注意到这本质上是一个

readWrite
锁。


0
投票

条件变量的正确使用是支持线程和/或进程在必要时暂停执行,直到满足某些可计算条件,这必然通过不同线程的操作而成为这种情况。 因此,

  • 总有一个要测试的谓词作为等待简历的条件,并且
  • 该谓词必须在等待后进行重新测试,以确定是继续还是等待更多。

此外,至少有一些有助于谓词评估的数据必须在线程之间共享才能使其工作,并且这些数据必须是原子的或(更典型地)通过与谓词关联的相同互斥体进行保护。简历.

结果可能会像这样:

static _Bool can_continue = 1;

// ...

void *func(void* arg) {
    while (1) {
        pthread_mutex_lock(&mtx);
        while (!can_continue) {            // test the predicate
            pthread_cond_wait(&cv, &mtx);
        }
        pthread_mutex_unlock(&mtx);

        // do something ...
    }
}

int main(void) {
    // ...

    // Make threads stop when they next reach the top of the loop
    pthread_mutex_lock(&mtx);
    can_continue = 0;
    pthread_mutex_unlock(&mtx);

    // ... do something ...

    // Allow threads to continue
    pthread_mutex_lock(&mtx);
    can_continue = 1;
    pthread_mutex_unlock(&mtx);
    pthread_cond_broadcast(&cv);

    // ... do something ...
}

请注意,与大多数库函数一样,pthreads 函数可能会失败。为了保持健壮性,程序每次都需要检查此类失败,除非你不关心函数是否成功。 为了清楚起见,我从上面的示例代码中省略了此类测试,但任何实际的程序都应该包含它们。

这个简单的变化允许运行

func()
的线程在停止之前迭代任意次数(每个线程不一定有相同的次数),并且它们仅在到达循环顶部的条件检查时停止。 如果您想要更强的同步,例如每个线程在允许继续之前只执行一次迭代,那么您需要构建一些更复杂的东西,但互斥体 + CV 仍然是实现这一目标的好工具。 另一方面,如果它恰好满足您的特定目的,那么您可能会发现pthreads屏障很方便。

© www.soinside.com 2019 - 2024. All rights reserved.