高水平
我想在异步模式下调用一些没有返回值的函数,而不等待它们完成。如果我使用 std::async ,则未来对象在任务结束之前不会销毁,这使得调用在我的情况下不同步。
示例
void sendMail(const std::string& address, const std::string& message)
{
//sending the e-mail which takes some time...
}
myResonseType processRequest(args...)
{
//Do some processing and valuate the address and the message...
//Sending the e-mail async
auto f = std::async(std::launch::async, sendMail, address, message);
//returning the response ASAP to the client
return myResponseType;
} //<-- I'm stuck here until the async call finish to allow f to be destructed.
// gaining no benefit from the async call.
我的问题是
注:
我宁愿不使用线程+分离的选项(由@galop1n建议),因为创建新线程会产生我希望避免的开销。当使用 std::async (至少在 MSVC 上)时,正在使用内部线程池。
谢谢。
您可以将 future 移动到全局对象中,因此当本地 future 的析构函数运行时,不必等待异步线程完成。
std::vector<std::future<void>> pending_futures;
myResonseType processRequest(args...)
{
//Do some processing and valuate the address and the message...
//Sending the e-mail async
auto f = std::async(std::launch::async, sendMail, address, message);
// transfer the future's shared state to a longer-lived future
pending_futures.push_back(std::move(f));
//returning the response ASAP to the client
return myResponseType;
}
注意如果异步线程引用
processRequest
函数中的任何局部变量,这是不安全的。
使用
(至少在 MSVC 上)时正在使用内部线程池。std::async
这实际上是不符合标准的,标准明确规定使用
std::launch::async
运行的任务必须像在新线程中一样运行,因此任何线程局部变量都不能从一个任务持续到另一个任务。不过通常并不重要。
如果您不关心加入,为什么不直接启动一个线程并分离?
std::thread{ sendMail, address, message}.detach();
std::async 绑定到它返回的 std::future 的生命周期,并且它们是无可替代的。
将 std::future 放入由其他线程读取的等待队列中将需要与接收新任务的池相同的安全机制,例如容器周围的互斥体。
那么,最好的选择是使用线程池来消耗直接推送到线程安全队列中的任务。而且它不取决于具体的实现。在采用任何可调用和参数的线程池实现下面,线程在队列上进行轮询,更好的实现应该使用条件变量(
coliru):
#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>
struct ThreadPool {
struct Task {
virtual void Run() const = 0;
virtual ~Task() {};
};
template < typename task_, typename... args_ >
struct RealTask : public Task {
RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
void Run() const override {
fun_();
}
private:
decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
};
template < typename task_, typename... args_ >
void AddTask( task_&& task, args_&&... args ) {
auto lock = std::unique_lock<std::mutex>{mtx_};
using FinalTask = RealTask<task_, args_... >;
q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
}
ThreadPool() {
for( auto & t : pool_ )
t = std::thread( [=] {
while ( true ) {
std::unique_ptr<Task> task;
{
auto lock = std::unique_lock<std::mutex>{mtx_};
if ( q_.empty() && stop_ )
break;
if ( q_.empty() )
continue;
task = std::move(q_.front());
q_.pop();
}
if (task)
task->Run();
}
} );
}
~ThreadPool() {
{
auto lock = std::unique_lock<std::mutex>{mtx_};
stop_ = true;
}
for( auto & t : pool_ )
t.join();
}
private:
std::queue<std::unique_ptr<Task>> q_;
std::thread pool_[8];
std::mutex mtx_;
volatile bool stop_ {};
};
void foo( int a, int b ) {
std::cout << a << "." << b;
}
void bar( std::string const & s) {
std::cout << s;
}
int main() {
ThreadPool pool;
for( int i{}; i!=42; ++i ) {
pool.AddTask( foo, 3, 14 );
pool.AddTask( bar, " - " );
}
}
本地作用域,而不是将 future 移至全局对象(并手动管理未使用的 future 的删除)。
“让异步函数拥有自己的未来”,可以这么说。我想出了这个适合我的模板包装器(在 Windows 上测试):
#include <future>
template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
std::future<void>&& is_valid, std::promise<void>&& is_moved) {
is_valid.wait(); // Wait until the return value of std::async is written to "future"
auto our_future = std::move(future); // Move "future" to a local variable
is_moved.set_value(); // Only now we can leave void_async in the main thread
// This is also used by std::async so that member function pointers work transparently
auto functor = std::bind(f, std::forward<Args>(args)...);
functor();
}
template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
std::future<void> future; // This is for std::async return value
// This is for our synchronization of moving "future" between threads
std::promise<void> valid;
std::promise<void> is_moved;
auto valid_future = valid.get_future();
auto moved_future = is_moved.get_future();
// Here we pass "future" as a reference, so that async_wrapper
// can later work with std::async's return value
future = std::async(
async_wrapper<Function, Args...>,
std::forward<Function>(f), std::forward<Args>(args)...,
std::ref(future), std::move(valid_future), std::move(is_moved)
);
valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
moved_future.wait(); // Wait for "future" to actually be moved
}
我对它的工作原理感到有点惊讶,因为我认为移动的 future 的析构函数会阻塞,直到我们离开
async_wrapper。它应该等待 async_wrapper 返回,但它正在该函数内部等待。从逻辑上讲,这应该是一个僵局,但事实并非如此。
我还尝试在async_wrapper 的末尾添加一行来手动清空 future 对象:
our_future = std::future<void>();
这也不会阻塞。
auto processRequest() -> void {
static std::forward_list<std::future<void>> requests;
requests.emplace_front(std::async(std::launch::async, sendMail, "address", "message"));
}
当然,您必须考虑您最终想要对所有这些未来做什么,或者您是否接受多个正在进行的请求。我认为解决方案是不使用std::async
,从长远来看,这是我计划做的。
完整示例:
#include <thread>
#include <future>
#include <chrono>
#include <iostream>
#include <forward_list>
using namespace std::chrono_literals;
auto sendMail(const std::string& address, const std::string& message) -> void {
std::cout << "before send!" << std::endl;
std::this_thread::sleep_for(1000ms);
std::cout << "after send!" << std::endl;
}
auto processRequest() -> void {
static std::forward_list<std::future<void>> requests;
requests.emplace_front(std::async(std::launch::async, sendMail, "address", "message"));
}
auto main() -> int {
std::cout << "Before call" << std::endl;
processRequest();
processRequest();
std::cout << "After call" << std::endl;
}
打印(一种变体):
Before call
After call
before send!
before send!
after send!
after send!
future
设为指针。以下正是您正在寻找的内容:
std::make_unique<std::future<void>>((std::async(std::launch::async, sendMail, address, message)));
// :( http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3451.pdf
template<typename T>
void noget(T&& in)
{
static std::mutex vmut;
static std::vector<T> vec;
static std::thread getter;
static std::mutex single_getter;
if (single_getter.try_lock())
{
getter = std::thread([&]()->void
{
size_t size;
for(;;)
{
do
{
vmut.lock();
size=vec.size();
if(size>0)
{
T target=std::move(vec[size-1]);
vec.pop_back();
vmut.unlock();
// cerr << "getting!" << endl;
target.get();
}
else
{
vmut.unlock();
}
}while(size>0);
// ¯\_(ツ)_/¯
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
getter.detach();
}
vmut.lock();
vec.push_back(std::move(in));
vmut.unlock();
}
它为你扔给它的每种类型的 future 创建一个专用的 getter 线程(例如,如果你给它一个 future 和 future,你将有 2 个线程。如果你给它 100x future,你仍然只有 2 个线程) ,当有一个你不想处理的未来时,只需做
notget(fut);
- 你也可以
noget(std::async([]()->void{...}));
工作得很好,看起来没有阻碍。警告,不要在使用 noget() 后尝试从 future 获取值。那可能是 UB 在自找麻烦。