C++20
std::atomic
有 wait
和 notify_*
成员函数,但没有 wait_for
/wait_until
。
std::atomic
的 Microsoft STL 实现使用 WaitOnAddress
(当操作系统足够新时)。该 API 有一个 dwMilliseconds
参数作为超时值。因此,从标准库编写者的角度来看,我认为缺少的功能很容易实现(至少在 Windows 8 或更高版本上)。我只是想知道为什么它不在 C++20 中。
但是作为(便携式)用户代码编写者,我必须使用标准信号量和原子计数器来模拟行为。所以这是代码:
#include <concepts>
#include <atomic>
#include <type_traits>
#include <cstring>
#include <semaphore>
namespace detail
{
template <size_t N>
struct bytes
{
unsigned char space[N];
auto operator<=>(bytes const &) const = default;
};
//Compare by value representation, as requested by C++20.
//The implementation is a bit awkward.
//Hypothetically `std::atomic<T>::compare(T, T)` would be helpful. :)
template <std::integral T>
bool compare(T a, T b) noexcept
{
static_assert(std::has_unique_object_representations_v<T>);
return a == b;
}
template <typename T>
requires(std::has_unique_object_representations_v<T> && !std::integral<T>)
bool compare(T a, T b) noexcept
{
bytes<sizeof(T)> aa, bb;
std::memcpy(aa.space, &a, sizeof(T));
std::memcpy(bb.space, &b, sizeof(T));
return aa == bb;
}
template <typename T>
requires(!std::has_unique_object_representations_v<T>)
bool compare(T a, T b) noexcept
{
std::atomic<T> aa{ a };
auto equal = aa.compare_exchange_strong(b, b, std::memory_order_relaxed);
return equal;
}
template <typename T>
class atomic_with_timed_wait
: public std::atomic<T>
{
private:
using base_atomic = std::atomic<T>;
std::counting_semaphore<> mutable semaph{ 0 };
std::atomic<std::ptrdiff_t> mutable notify_demand{ 0 };
public:
using base_atomic::base_atomic;
public:
void notify_one() /*noexcept*/
{
auto nd = notify_demand.load(std::memory_order_relaxed);
if (nd <= 0)
return;
notify_demand.fetch_sub(1, std::memory_order_relaxed);
semaph.release(1);//may throw
}
void notify_all() /*noexcept*/
{
auto nd = notify_demand.exchange(0, std::memory_order_relaxed);
if (nd > 0)
{
semaph.release(nd);//may throw
}
else if (nd < 0)
{
//Overly released. Put it back.
notify_demand.fetch_add(nd, std::memory_order_relaxed);
}
}
void wait(T old, std::memory_order order = std::memory_order::seq_cst) const /*noexcept*/
{
for (;;)
{
T const observed = base_atomic::load(order);
if (false == compare(old, observed))
return;
notify_demand.fetch_add(1, std::memory_order_relaxed);
semaph.acquire();//may throw
//Acquired.
}
}
template <typename TPoint>
bool wait_until(int old, TPoint const & abs_time, std::memory_order order = std::memory_order::seq_cst) const /*noexcept*/
//Returns: true->diff; false->timeout
{
for (;;)
{
T const observed = base_atomic::load(order);
if (false == compare(old, observed))
return true;
notify_demand.fetch_add(1, std::memory_order_relaxed);
if (semaph.try_acquire_until(abs_time))//may throw
{
//Acquired.
continue;
}
else
{
//Not acquired and timeout.
//This might happen even if semaph has positive release counter.
//Just cancel demand and return.
//Note that this might give notify_demand a negative value,
// which means the semaph is overly released.
//Subsequent acquire on semaph would just succeed spuriously.
//So it should be OK.
notify_demand.fetch_sub(1, std::memory_order_relaxed);
return false;
}
}
}
//TODO: bool wait_for()...
};
}
using detail::atomic_with_timed_wait;
我只是不确定这是否正确。那么,这段代码有什么问题吗?
try_wait
的定时等待API(wait_for
、wait_until
和std::atomic
)在P2643中提出,针对C++26。 libstdc++ 已经在其内部头文件中实现了对这些操作的底层支持std::counting_semaphore
的定时等待,这本质上是一个更受约束的 std::atomic
。在论文合并之前,至少有两种可移植的方式来模拟定时操作:
一对互斥锁和条件变量:这两者可以组合起来提供通用的定时等待功能。例如,可以使用一对
std::condition_variable_any
和 std::mutex
(N2406) 来实现
std::condition_variable
。正如您的代码中一样,一对 std::atomic
和 std::counting_semaphore
也可能是可行的,但我发现这有点尴尬,因为 std::counting_semaphore
没有 notify_all
操作,这会引入额外的复杂性。一个简单的原型如下(Godbolt):
// NOTE: volatile overloads are not supported
template <class T> struct timed_atomic : atomic<T> {
using atomic<T>::atomic;
bool try_wait(T old, memory_order order = seq_cst) const noexcept {
T value = this->load(order);
// TODO: Ignore padding bits in comparison
return memcmp(addressof(value), addressof(old), sizeof(T));
}
void wait(T old, memory_order order = seq_cst) const {
unique_lock lock(mtx);
cond.wait(lock, [=, this]() { return try_wait(old, relaxed); });
}
template <class Rep, class Period>
bool wait_for(T old, const duration<Rep, Period> &rel_time,
memory_order order = seq_cst) const {
unique_lock lock(mtx);
return cond.wait_for(lock, rel_time,
[=, this]() { return try_wait(old, relaxed); });
}
template <class Clock, class Duration>
bool wait_until(T old, const time_point<Clock, Duration> &abs_time,
memory_order order = seq_cst) const {
unique_lock lock(mtx);
return cond.wait_until(lock, abs_time,
[=, this]() { return try_wait(old, relaxed); });
}
void notify_one() const {
{ lock_guard _(mtx); }
cond.notify_one();
}
void notify_all() const {
{ lock_guard _(mtx); }
cond.notify_all();
}
private:
mutable mutex mtx;
mutable condition_variable cond;
using enum memory_order;
};
正如您在上面所看到的,这种方法的一个缺点是不支持成员函数的
volatile
重载,因为 std::mutex
和 std::condition_variable
本身不支持 volatile
。一种解决方法是将它们存储在 timed_atomic
和哈希地址之外的单独表中以获取相应的对。当平台不支持本机原子等待操作时,libstdc++ 已经实现了类似的功能 (Thomas)。
一个更微妙的问题是,标准要求
wait
比较值表示(即排除填充位)是否相等,而不是对象表示([atomics.types.operations] p30.1)。目前,这还不能以可移植的方式轻松实现,并且需要编译器支持(例如,GCC 中的__builtin_clear_padding
)。
定时退避轮询:这种方法更加轻量级,因为它不需要额外的同步设施。缺点是当通知需要很长时间才能到达时,轮询通常比等待更昂贵。轮询的一个潜在优势是它尊重对用户提供的调整
Clock
。示例实现如下(Godbolt):
template <class T> struct timed_atomic : atomic<T> {
using atomic<T>::atomic;
bool try_wait(T old, memory_order order = seq_cst) const noexcept {
T value = this->load(order);
// TODO: Ignore padding bits in comparison
return memcmp(addressof(value), addressof(old), sizeof(T));
}
template <class Rep, class Period>
bool wait_for(T old, const duration<Rep, Period> &rel_time,
memory_order order = seq_cst) const {
return wait_until(old, steady_clock::now() + rel_time, order);
}
template <class Clock, class Duration>
bool wait_until(T old, const time_point<Clock, Duration> &abs_time,
memory_order order = seq_cst) const {
while (!try_wait(old, order)) {
if (Clock::now() >= abs_time)
return false;
sleep_for(100ms);
}
return true;
}
// NOTE: volatile overloads are omitted
private:
using enum memory_order;
};