在设置 Rust 服务器时,我必须进行超过 100K 的 RPC 调用。这是服务器初始化时的一次性操作。我必须将这 100K+ 次提取的结果写入数据库。我正在使用 rusqlite 作为我的数据库。并使用 reqwest 进行 RPC 调用。我希望这是一个异步操作,因为服务器应该能够通过从数据库读取数据来处理查询,同时初始化过程仍在进行中,这可能需要几个小时,因为它正在进行 100K RPC 调用。
我尝试生成一个 tokio 异步任务并在其中进行 RPC 调用:
let handle = tokio::task::spawn(async move {
let conn = Connection::open("rpc_results.db")?;
for _ in 0..200000 {
let res = fetch_rpc_endpoint().await?;
write_to_db(res, conn)?;
}
});
这里我在每次 RPC 调用后写入数据库,这是非常低效的。我可以通过将 100 个 RPC 结果保存在 Vec 中并批量写入数据库来优化这一点。
是否可以进一步优化?我正在考虑并行化 RPC 调用和并行批量写入数据库,这是否可能,同时牢记数据库的竞争条件。这可能是一个问题,因为我必须打开一个新连接以同时从部分写入的数据库中读取。
或者任何 Rust 库组合来实现这一点,因为这似乎是一个标准问题?
正如评论中所暗示的,这可以通过
futures::{Try,}StreamExt
的 buffer_unordered
和 try_chunks
相对整齐地实现:
futures::stream::iter(0..100)
.map(|_| fetch_rpc_endpoint())
.buffer_unordered(5)
.try_chunks(20)
.map(|data| {
write_to_db(data.expect("TODO: handle error properly in real code"), &mut conn)?;
Ok::<_, Infallible>(())
})
.try_collect::<()>()
.await?;
如果您关心顺序,还有
buffered
而不是 buffer_unordered
。
顺便说一句,外循环上的
tokio::spawn
没有任何作用。