如何高效地将数千次 RPC 调用的结果写入 Rust SQLite DB

问题描述 投票:0回答:1

在设置 Rust 服务器时,我必须进行超过 100K 的 RPC 调用。这是服务器初始化时的一次性操作。我必须将这 100K+ 次提取的结果写入数据库。我正在使用 rusqlite 作为我的数据库。并使用 reqwest 进行 RPC 调用。我希望这是一个异步操作,因为服务器应该能够通过从数据库读取数据来处理查询,同时初始化过程仍在进行中,这可能需要几个小时,因为它正在进行 100K RPC 调用。

我尝试生成一个 tokio 异步任务并在其中进行 RPC 调用:

let handle = tokio::task::spawn(async move {
    let conn = Connection::open("rpc_results.db")?;
    for _ in 0..200000 {
        let res = fetch_rpc_endpoint().await?;
        write_to_db(res, conn)?;
    }
});

这里我在每次 RPC 调用后写入数据库,这是非常低效的。我可以通过将 100 个 RPC 结果保存在 Vec 中并批量写入数据库来优化这一点。

是否可以进一步优化?我正在考虑并行化 RPC 调用和并行批量写入数据库,这是否可能,同时牢记数据库的竞争条件。这可能是一个问题,因为我必须打开一个新连接以同时从部分写入的数据库中读取。

或者任何 Rust 库组合来实现这一点,因为这似乎是一个标准问题?

database sqlite rust parallel-processing rust-tokio
1个回答
0
投票

正如评论中所暗示的,这可以通过

futures::{Try,}StreamExt
buffer_unordered
try_chunks
相对整齐地实现:

futures::stream::iter(0..100)
    .map(|_| fetch_rpc_endpoint())
    .buffer_unordered(5)
    .try_chunks(20)
    .map(|data| {
        write_to_db(data.expect("TODO: handle error properly in real code"), &mut conn)?;
        Ok::<_, Infallible>(())
    })
    .try_collect::<()>()
    .await?;

游乐场

如果您关心顺序,还有

buffered
而不是
buffer_unordered

顺便说一句,外循环上的

tokio::spawn
没有任何作用。

© www.soinside.com 2019 - 2024. All rights reserved.