代码在单线程中运行良好,但与 tokio 一起使用时出现终身错误

问题描述 投票:0回答:1

所以,我有这段代码在单线程中运行良好

pub async fn insert_foos(
    self,
    foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
    let chunks = foos.chunks(10_000);

    for chunk in chunks {
        let _ = &self.insert_foo_chunk(chunk).await?;
    }

    Ok(())
}

然后我尝试使其成为多线程:

pub async fn insert_foos(
    self,
    foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
    let chunks = foos.chunks(10_000);

    let tasks: Vec<_> = chunks
        .map(move|chunk| {
            let pg_repository = self.clone();
            tokio::spawn(async move {
                pg_repository.insert_foo_chunk(chunk).await?;
                Ok::<(), Error>(())
            })
        })
        .collect();

    for task in tasks {
        let _ = task.await?;
    }

    Ok(())
}

但是后来我得到了这个错误:

error[E0597]: `foos` does not live long enough
  --> src/db/postgres_repository.rs:18:22
   |
16 |         foos: Vec<Foo>,
   |         ---- binding `foos` declared here
17 |     ) -> Result<(), Box<dyn std::error::Error>> {
18 |         let chunks = foos.chunks(10_000);
   |                      ^^^^---------------
   |                      |
   |                      borrowed value does not live long enough
   |                      argument requires that `foos` is borrowed for `'static`
...
40 |     }
   |     - `foos` dropped here while still borrowed

检查

Chunks::new
方法的实现

impl<'a, T: 'a> Chunks<'a, T> {
    #[inline]
    pub(super) fn new(slice: &'a [T], size: usize) -> Self {
        Self { v: slice, chunk_size: size }
    }
}

非常简单,表明

v
实际上是我的具有相同寿命的
foos
。 所以我想我理解,因为
async move
,我的
chunks
被移出了函数,但实际上我的
foos
应该一直存在到函数结束,这显然会造成这个生命周期问题。我说得对吗?

但是从昨天开始我就一直摸不着头脑,不知道如何解决这个问题。我不想克隆

foos
,因为这个向量包含大约 500 MB 的数据,实际上对
insert_foos
的调用已经是多线程的(这使我总共加载大约 6 GB 的数据,没有足够的 RAM克隆所有这些)。

插入块后我根本不关心

foos
,所以我实际上不需要这个变量一直存在到函数结束。我只是不知道如何告诉编译器......

非常感谢您的帮助。

rust lifetime rust-tokio chunks
1个回答
0
投票

tokio::spawn
只接受
'static
Future
的期货,不接受从局部变量(如 chunks
借用
的期货。

您可以使用

futres_util::future::join_all
代替:

struct Foo;
type Error = std::io::Error;
#[derive(Clone)]
struct Repo;

impl Repo {
    pub async fn insert_foo_chunk(&self, _chunk: &[Foo]) -> Result<(), Error> {
        Ok(())
    }

    pub async fn insert_foos(
        self,
        foos: Vec<Foo>,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let chunks = foos.chunks(10_000);
        
        futures_util::future::join_all(
            chunks
                .map(move |chunk| {
                    let pg_repository = self.clone();
                    async move {
                        pg_repository.insert_foo_chunk(chunk).await?;
                        Ok::<(), Error>(())
                    }
                })
        ).await;
    
        Ok(())
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.