当请求到来时,我必须以高度并行化的方式处理数据。该处理混合了一些 CPU 密集型工作(1 毫秒突发),并且可能需要阻塞 Web 请求(库不受我的控制)。该函数作为同步函数向我公开,但在阻塞时在幕后使用 block_in_place() 。
假设我有以下示例:
async fn handle_request(data: &Bytes) {
let mut results = vec![];
for i in 0..1000 {
let processed = do_heavy_calc_that_might_block(data, i); <-- this possibly needs to do a blocking web request, uses block_in_place when it does
let result = do_something_with_res(processed).await;
results.push(result);
}
results
}
我很想使用
tokio::task::spawn()
或spawn_blocking()
,但两者在测试中都存在问题。如果我将 spawn
与以下内容一起使用:
async fn handle_request(data: &Bytes) {
let mut tasks = vec![];
for i in 0..1000 {
tasks.push(tokio::task::spawn(async move {
let processed = do_heavy_calc_that_might_block(data, i);
let result = do_something_with_res(processed).await;
result
}));
}
let results = futures_util::future::join_all(tasks).await;
results
}
当 do_heavy_calc 运行时,如果太多的块需要阻塞,它们会
block_in_place()
,并且运行时会变得饥饿,什么也不会完成。
如果我切换到
spawn_blocking
,我无法向其传递异步函数,因此 do_something_with_res
不能 .await
。显然我可以从任务中返回处理后的数据并在循环后执行do_something_with_res
,但任务中的实际代码在块/异步/块/异步之间来回多次。所以正在寻找更通用的东西。
我尝试使用 Rayon 而不是在 tokio 中,但最终没有达到我想要的并行化程度。由于阻塞调用可能需要时间,它们也会堵塞线程池,并且由于它们与异步调用混合在一起,因此最好利用我已经在其中工作的 tokio 运行时......
常规的 Tokio 工作线程池和 Rayon 的线程池都是围绕这样的假设而设计的:它们运行的每个任务要么忙于使用 CPU,要么让步。如果您将网络阻塞任务放在其中任何一个上,您的性能都会很差。您需要使用“其他”而不是已针对 CPU 并行性调整大小的线程池。
spawn_blocking()
是一个很好的方法。为了与阻塞任务中的异步代码交互,您可以调用
Handle::block_on()
。
请注意,
max_blocking_threads
的数量也会影响spawn_blocking
任务的执行方式。默认值为 512,因此除非您有大量工作要做,否则您不太可能遇到它,但如果它确实出现,您可能希望在应用程序逻辑中施加明确的限制,因为任务正在等待运行仍然占用内存,网络服务器可能不喜欢太多的传入连接,而且无论如何你都只有这么多的CPU核心——所以在这一点上你可能会发现通过在应用程序逻辑中有意地对事物进行排队而不是提高整体吞吐量而不是把你所有的东西都扔到东京。