我这里有一个关于同步的棘手问题。我有一个“编写者”线程,它在每次迭代时为承诺分配不同的值“p”。我需要“读取器”线程等待该值的shared_futures,然后处理它们,我的问题是如何使用 future/promise 来确保读取器线程在执行处理任务之前等待“p”的新更新每次迭代?非常感谢。
您可以通过将 Promise 分配给空白 Promise 来“重置”它。
myPromise = promise< int >();
更完整的示例:
promise< int > myPromise;
void writer()
{
for( int i = 0; i < 10; ++i )
{
cout << "Setting promise.\n";
myPromise.set_value( i );
myPromise = promise< int >{}; // Reset the promise.
cout << "Waiting to set again...\n";
this_thread::sleep_for( chrono::seconds( 1 ));
}
}
void reader()
{
int result;
do
{
auto myFuture = myPromise.get_future();
cout << "Waiting to receive result...\n";
result = myFuture.get();
cout << "Received " << result << ".\n";
} while( result < 9 );
}
int main()
{
std::thread write( writer );
std::thread read( reader );
write.join();
read.join();
return 0;
}
但是,这种方法的一个问题是,两个线程之间的同步可能会导致编写器在读取器调用
promise::set_value()
之间多次调用 future::get()
,或者在重置 Promise 时调用 future::get()
。这些问题可以通过小心避免(例如,在调用之间适当休眠),但这将我们带入黑客和猜测的领域,而不是逻辑上正确的并发领域。
因此,尽管可以通过将 Promise 分配给新的 Promise 来重置 Promise,但这样做往往会引发更广泛的同步问题。
promise
/future
对被设计为仅携带单个值(或例外)。要执行您所描述的操作,您可能需要采用不同的工具。
如果您希望多个线程(您的读者)都停在一个公共点,您可以考虑
barrier
。
以下代码演示了如何使用
future
和 promise
实现生产者/消费者模式。
有两个
promise
变量,由生产者线程和消费者线程使用。每个线程重置两个 promise
变量之一并等待另一个。
#include <iostream>
#include <future>
#include <thread>
using namespace std;
// produces integers from 0 to 99
void producer(promise<int>& dataready, promise<void>& consumed)
{
for (int i = 0; i < 100; ++i) {
// do some work here ...
consumed = promise<void>{}; // reset
dataready.set_value(i); // make data available
consumed.get_future().wait(); // wait for the data to be consumed
}
dataready.set_value(-1); // no more data
}
// consumes integers
void consumer(promise<int>& dataready, promise<void>& consumed)
{
for (;;) {
int n = dataready.get_future().get(); // wait for data ready
if (n >= 0) {
std::cout << n << ",";
dataready = promise<int>{}; // reset
consumed.set_value(); // mark data as consumed
// do some work here ...
}
else
break;
}
}
int main(int argc, const char*argv[])
{
promise<int> dataready{};
promise<void> consumed{};
thread th1([&] {producer(dataready, consumed); });
thread th2([&] {consumer(dataready, consumed); });
th1.join();
th2.join();
std::cout << "\n";
return 0;
}
此答案支持 Jeff Wofford、Managu 和 Fabio 给出的答案,并进行了一些澄清。
杰夫和法比奥的解决方案有效。但要澄清的是,解决方案中发生的情况是,所谓的承诺重置并不是按所说重置承诺,而只是将新的承诺分配到为承诺所做的空间分配中。在 Fabio 的解决方案中,此分配是在 main 的堆栈上进行的,而在 Jeff 的解决方案中,它是一个全局变量。在每次迭代中,新的 Promise 都是一个完全不同的 Promise,并且具有新的共享状态,并且从前一个 Promise 检索到的任何 Future 都不会引用这个新的共享状态。
所以这些解决方案并不完全是重置,而是回收承诺。每次迭代都会发生新的共享状态分配和解除分配。
正如Managu所提到的,
/promise
对被设计为仅携带单个值(或 例外。)。future
它作为单次异步数据通信机制。