我可以使用 std::async 而不等待未来的限制吗?

问题描述 投票:0回答:6

高水平
我想在异步模式下调用一些没有返回值的函数,而不等待它们完成。如果我使用 std::async ,则未来对象在任务结束之前不会销毁,这使得调用在我的情况下不同步。

示例

void sendMail(const std::string& address, const std::string& message)
{
    //sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    //returning the response ASAP to the client
    return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
  // gaining no benefit from the async call.

我的问题是

  1. 有办法克服这个限制吗?
  2. 如果(1)为否,我是否应该实现一个线程来接受这些“僵尸”期货并等待它们?
  3. (1)和(2)都不是,除了构建我自己的线程池之外还有其他选择吗?

注:
我宁愿不使用线程+分离的选项(由@galop1n建议),因为创建新线程会产生我希望避免的开销。当使用 std::async (至少在 MSVC 上)时,正在使用内部线程池。

谢谢。

c++ multithreading c++11 asynchronous stdasync
6个回答
25
投票

您可以将 future 移动到全局对象中,因此当本地 future 的析构函数运行时,不必等待异步线程完成。

std::vector<std::future<void>> pending_futures;

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    // transfer the future's shared state to a longer-lived future
    pending_futures.push_back(std::move(f));

    //returning the response ASAP to the client
    return myResponseType;

}

注意如果异步线程引用

processRequest
函数中的任何局部变量,这是不安全的。

使用

std::async
(至少在 MSVC 上)时正在使用内部线程池。

这实际上是不符合标准的,标准明确规定使用

std::launch::async
运行的任务必须像在新线程中一样运行,因此任何线程局部变量都不能从一个任务持续到另一个任务。不过通常并不重要。


20
投票

如果您不关心加入,为什么不直接启动一个线程并分离?

std::thread{ sendMail, address, message}.detach();   

std::async 绑定到它返回的 std::future 的生命周期,并且它们是无可替代的。

将 std::future 放入由其他线程读取的等待队列中将需要与接收新任务的池相同的安全机制,例如容器周围的互斥体。

那么,最好的选择是使用线程池来消耗直接推送到线程安全队列中的任务。而且它不取决于具体的实现。

在采用任何可调用和参数的线程池实现下面,线程在队列上进行轮询,更好的实现应该使用条件变量(

coliru):

#include <iostream> #include <queue> #include <memory> #include <thread> #include <mutex> #include <functional> #include <string> struct ThreadPool { struct Task { virtual void Run() const = 0; virtual ~Task() {}; }; template < typename task_, typename... args_ > struct RealTask : public Task { RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {} void Run() const override { fun_(); } private: decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_; }; template < typename task_, typename... args_ > void AddTask( task_&& task, args_&&... args ) { auto lock = std::unique_lock<std::mutex>{mtx_}; using FinalTask = RealTask<task_, args_... >; q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) ); } ThreadPool() { for( auto & t : pool_ ) t = std::thread( [=] { while ( true ) { std::unique_ptr<Task> task; { auto lock = std::unique_lock<std::mutex>{mtx_}; if ( q_.empty() && stop_ ) break; if ( q_.empty() ) continue; task = std::move(q_.front()); q_.pop(); } if (task) task->Run(); } } ); } ~ThreadPool() { { auto lock = std::unique_lock<std::mutex>{mtx_}; stop_ = true; } for( auto & t : pool_ ) t.join(); } private: std::queue<std::unique_ptr<Task>> q_; std::thread pool_[8]; std::mutex mtx_; volatile bool stop_ {}; }; void foo( int a, int b ) { std::cout << a << "." << b; } void bar( std::string const & s) { std::cout << s; } int main() { ThreadPool pool; for( int i{}; i!=42; ++i ) { pool.AddTask( foo, 3, 14 ); pool.AddTask( bar, " - " ); } }
    

10
投票
您实际上可以将 future 移至异步调用函数的

本地作用域,而不是将 future 移至全局对象(并手动管理未使用的 future 的删除)。

“让异步函数拥有自己的未来”,可以这么说。

我想出了这个适合我的模板包装器(在 Windows 上测试):

#include <future> template<class Function, class... Args> void async_wrapper(Function&& f, Args&&... args, std::future<void>& future, std::future<void>&& is_valid, std::promise<void>&& is_moved) { is_valid.wait(); // Wait until the return value of std::async is written to "future" auto our_future = std::move(future); // Move "future" to a local variable is_moved.set_value(); // Only now we can leave void_async in the main thread // This is also used by std::async so that member function pointers work transparently auto functor = std::bind(f, std::forward<Args>(args)...); functor(); } template<class Function, class... Args> // This is what you call instead of std::async void void_async(Function&& f, Args&&... args) { std::future<void> future; // This is for std::async return value // This is for our synchronization of moving "future" between threads std::promise<void> valid; std::promise<void> is_moved; auto valid_future = valid.get_future(); auto moved_future = is_moved.get_future(); // Here we pass "future" as a reference, so that async_wrapper // can later work with std::async's return value future = std::async( async_wrapper<Function, Args...>, std::forward<Function>(f), std::forward<Args>(args)..., std::ref(future), std::move(valid_future), std::move(is_moved) ); valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid moved_future.wait(); // Wait for "future" to actually be moved }

我对它的工作原理感到有点惊讶,因为我认为移动的 future 的析构函数会阻塞,直到我们离开

async_wrapper。它应该等待 async_wrapper 返回,但它正在该函数内部等待。从逻辑上讲,这应该是一个僵局,但事实并非如此。

我还尝试在

async_wrapper 的末尾添加一行来手动清空 future 对象:

our_future = std::future<void>();

这也不会阻塞。


0
投票
我陷入了同样的情况,并应用了一个肮脏的修复,我只是将所有期货推到全局列表上:

auto processRequest() -> void { static std::forward_list<std::future<void>> requests; requests.emplace_front(std::async(std::launch::async, sendMail, "address", "message")); }
当然,您必须考虑您最终想要对所有这些未来做什么,或者您是否接受多个正在进行的请求。我认为解决方案是不使用

std::async

,从长远来看,这是我计划做的。

完整示例

#include <thread> #include <future> #include <chrono> #include <iostream> #include <forward_list> using namespace std::chrono_literals; auto sendMail(const std::string& address, const std::string& message) -> void { std::cout << "before send!" << std::endl; std::this_thread::sleep_for(1000ms); std::cout << "after send!" << std::endl; } auto processRequest() -> void { static std::forward_list<std::future<void>> requests; requests.emplace_front(std::async(std::launch::async, sendMail, "address", "message")); } auto main() -> int { std::cout << "Before call" << std::endl; processRequest(); processRequest(); std::cout << "After call" << std::endl; }
打印(一种变体):

Before call After call before send! before send! after send! after send!
    

0
投票
您需要将

future

 设为指针。以下正是您正在寻找的内容:

std::make_unique<std::future<void>>((std::async(std::launch::async, sendMail, address, message)));
    

-1
投票
我不知道我在做什么,但这似乎有效:

// :( http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3451.pdf template<typename T> void noget(T&& in) { static std::mutex vmut; static std::vector<T> vec; static std::thread getter; static std::mutex single_getter; if (single_getter.try_lock()) { getter = std::thread([&]()->void { size_t size; for(;;) { do { vmut.lock(); size=vec.size(); if(size>0) { T target=std::move(vec[size-1]); vec.pop_back(); vmut.unlock(); // cerr << "getting!" << endl; target.get(); } else { vmut.unlock(); } }while(size>0); // ¯\_(ツ)_/¯ std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }); getter.detach(); } vmut.lock(); vec.push_back(std::move(in)); vmut.unlock(); }

它为你扔给它的每种类型的 future 创建一个专用的 getter 线程(例如,如果你给它一个 future 和 future,你将有 2 个线程。如果你给它 100x future,你仍然只有 2 个线程) ,当有一个你不想处理的未来时,只需做

notget(fut);

 - 你也可以 
noget(std::async([]()->void{...}));
 工作得很好,看起来没有阻碍。警告,不要在使用 noget() 后尝试从 future 获取值。那可能是 UB 在自找麻烦。

© www.soinside.com 2019 - 2024. All rights reserved.