所以,我有这段代码在单线程中运行良好
pub async fn insert_foos(
self,
foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
let chunks = foos.chunks(10_000);
for chunk in chunks {
let _ = &self.insert_foo_chunk(chunk).await?;
}
Ok(())
}
然后我尝试使其成为多线程:
pub async fn insert_foos(
self,
foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
let chunks = foos.chunks(10_000);
let tasks: Vec<_> = chunks
.map(move|chunk| {
let pg_repository = self.clone();
tokio::spawn(async move {
pg_repository.insert_foo_chunk(chunk).await?;
Ok::<(), Error>(())
})
})
.collect();
for task in tasks {
let _ = task.await?;
}
Ok(())
}
但是后来我得到了这个错误:
error[E0597]: `foos` does not live long enough
--> src/db/postgres_repository.rs:18:22
|
16 | foos: Vec<Foo>,
| ---- binding `foos` declared here
17 | ) -> Result<(), Box<dyn std::error::Error>> {
18 | let chunks = foos.chunks(10_000);
| ^^^^---------------
| |
| borrowed value does not live long enough
| argument requires that `foos` is borrowed for `'static`
...
40 | }
| - `foos` dropped here while still borrowed
检查
Chunks::new
方法的实现
impl<'a, T: 'a> Chunks<'a, T> {
#[inline]
pub(super) fn new(slice: &'a [T], size: usize) -> Self {
Self { v: slice, chunk_size: size }
}
}
非常简单,表明
v
实际上是我的具有相同寿命的foos
。
所以我想我理解,因为 async move
,我的 chunks
被移出了函数,但实际上我的 foos
应该一直存在到函数结束,这显然会造成这个生命周期问题。我说得对吗?
但是从昨天开始我就一直摸不着头脑,不知道如何解决这个问题。我不想克隆
foos
,因为这个向量包含大约 500 MB 的数据,实际上对 insert_foos
的调用已经是多线程的(这使我总共加载大约 6 GB 的数据,没有足够的 RAM克隆所有这些)。
插入块后我根本不关心
foos
,所以我实际上不需要这个变量一直存在到函数结束。我只是不知道如何告诉编译器......
非常感谢您的帮助。
tokio::spawn
只接受 'static
Future
的期货,不接受从局部变量(如 chunks
)借用的期货。
您可以使用
futres_util::future::join_all
代替:
struct Foo;
type Error = std::io::Error;
#[derive(Clone)]
struct Repo;
impl Repo {
pub async fn insert_foo_chunk(&self, _chunk: &[Foo]) -> Result<(), Error> {
Ok(())
}
pub async fn insert_foos(
self,
foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
let chunks = foos.chunks(10_000);
futures_util::future::join_all(
chunks
.map(move |chunk| {
let pg_repository = self.clone();
async move {
pg_repository.insert_foo_chunk(chunk).await?;
Ok::<(), Error>(())
}
})
).await;
Ok(())
}
}