尝试将向量块与 Tokio spawn 函数一起用于多线程时出现生命周期错误

问题描述 投票:0回答:1

所以,我有这段代码在单线程中运行良好

pub async fn insert_foos(
    self,
    foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
    let chunks = foos.chunks(10_000);

    for chunk in chunks {
        let _ = &self.insert_foo_chunk(chunk).await?;
    }

    Ok(())
}

然后我尝试使其成为多线程:

pub async fn insert_foos(
    self,
    foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
    let chunks = foos.chunks(10_000);

    let tasks: Vec<_> = chunks
        .map(move|chunk| {
            let pg_repository = self.clone();
            tokio::spawn(async move {
                pg_repository.insert_foo_chunk(chunk).await?;
                Ok::<(), Error>(())
            })
        })
        .collect();

    for task in tasks {
        let _ = task.await?;
    }

    Ok(())
}

但是后来我得到了这个错误:

error[E0597]: `foos` does not live long enough
  --> src/db/postgres_repository.rs:18:22
   |
16 |         foos: Vec<Foo>,
   |         ---- binding `foos` declared here
17 |     ) -> Result<(), Box<dyn std::error::Error>> {
18 |         let chunks = foos.chunks(10_000);
   |                      ^^^^---------------
   |                      |
   |                      borrowed value does not live long enough
   |                      argument requires that `foos` is borrowed for `'static`
...
40 |     }
   |     - `foos` dropped here while still borrowed

检查

Chunks::new
方法的实现

impl<'a, T: 'a> Chunks<'a, T> {
    #[inline]
    pub(super) fn new(slice: &'a [T], size: usize) -> Self {
        Self { v: slice, chunk_size: size }
    }
}

非常简单,表明

v
实际上是我的具有相同寿命的
foos
。 所以我想我理解,因为
async move
,我的
chunks
被移出了函数,但实际上我的
foos
应该一直存在到函数结束,这显然会产生这个生命周期问题。我说得对吗?

但是从昨天开始我就一直摸不着头脑,不知道如何解决这个问题。我不想克隆

foos
,因为这个向量包含大约 500 MB 的数据,实际上对
insert_foos
的调用已经是多线程的(这使我总共加载大约 6 GB 的数据,没有足够的 RAM克隆所有这些)。

插入块后我根本不关心

foos
,所以实际上我不需要这个变量一直存在到函数结束。我只是不知道如何告诉编译器......

非常感谢您的帮助。

rust lifetime rust-tokio chunks
1个回答
1
投票

tokio::spawn
只接受
'static
Future
的,而不是从 chunks 等局部变量中
借用
的期货。这是因为它非常灵活,并且允许生成的 future 在变量超出范围后持续很长时间。

您可以使用

futures::future::try_join_all
代替:

struct Foo;
type Error = std::io::Error;
#[derive(Clone)]
struct Repo;

impl Repo {
    pub async fn insert_foo_chunk(&self, _chunk: &[Foo]) -> Result<(), Error> {
        Ok(())
    }

    pub async fn insert_foos(
        self,
        foos: Vec<Foo>,
    ) -> Result<(), Error> {
        let chunks = foos.chunks(10_000);
        
        futures::future::try_join_all(
            chunks
                .map(move |chunk| {
                    let pg_repository = self.clone();
                    async move {
                        pg_repository.insert_foo_chunk(chunk).await
                    }
                })
        ).await.map(|_| ())
    }
}

请注意,由于与

spawn
相关的错误是不可能出现的,因此错误类型可以变得更简单。另请注意,上面的代码将返回遇到的第一个错误。如果您希望所有任务无论错误如何都完成,就像您的原始代码一样,请改用
futures::future::join_all

© www.soinside.com 2019 - 2024. All rights reserved.