Rust 非常新......我正在寻找使用指定查询从 Gmail 下载电子邮件附件,我有一个粗略的同步代码,我正在寻找使用多线程使用?Tokio::spawn? (因为项目的其余部分已经在使用 tokio)当前的多线程?代码如下:
struct EmailInterface {
pub gmail: Gmail<HttpsConnector<HttpConnector>>,
}
impl EmailInterface {
async fn new(path_to_key: String) -> Result<EmailInterface, Box<dyn error::Error>> {
let authenticator =
ServiceAccountAuthenticator::builder(read_service_account_key(path_to_key).await?)
.build()
.await?;
let gmail = Gmail::new(
hyper::Client::builder().build(
hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build(),
),
authenticator,
);
Ok(EmailInterface { gmail })
}
pub async fn get_attachments(
self,
query: &str,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(messages_list) = self
.gmail
.users()
.messages_list("me")
.q(query)
.include_spam_trash(false)
.doit()
.await?
.1
.messages
{
let messages_id_list: Vec<String> = messages_list
.into_iter()
.flat_map(|message| message.id)
.collect();
for id in messages_id_list {
tokio::spawn(async move {
let message = self.gmail
.users()
.messages_get("me", id.as_str())
.doit()
.await
.unwrap_or_default()
.1;
let message = self
.gmail
.users()
.messages_get("me", id.as_str())
.doit()
.await
.unwrap_or_default()
.1;
for part in message.payload {
if let Some(part_body) = part.body {
if let Some(attachment_id) = part_body.attachment_id {
let attachment = self
.gmail
.users()
.messages_attachments_get(
"me",
id.as_str(),
attachment_id.as_str(),
)
.doit()
.await
.unwrap_or_default();
let data = general_purpose::STANDARD
.decode(&attachment.1.data.unwrap_or_default())
.unwrap();
std::fs::write(part.filename.expect("Should have a name"), &data)
.unwrap();
}
}
}
});
}
}
Ok(())
}
}
我目前得到的错误
use of moved value: `self.gmail`
move occurs because `self.gmail` has type `Gmail<HttpsConnector<HttpConnector>>`, which does not implement the `Copy` trait
如果有更精简的方法来做到这一点,我洗耳恭听(眼睛?)
我也尝试过使用 ThreadPool crate 和 ChatGPT 寻求建议,但后者总是以错误结束。
咕噜咕噜MRE咕噜
首先,我给几乎所有 Rust 初学者一个建议:不要依赖你的 IDE 来获得好的错误消息。阅读
cargo check
的输出,它通常提供更多信息和帮助:
error[E0382]: use of moved value: `self.gmail`
--> src/main.rs:52:41
|
52 | tokio::spawn(async move {
| _________________________________________^
53 | | let message = self
| |___________________________________-
54 | || .gmail
| ||______________________________- use occurs due to use in generator
55 | | .users()
... |
91 | | }
92 | | });
| |_________________^ value moved here, in previous iteration of loop
|
= note: move occurs because `self.gmail` has type `Gmail<HttpsConnector<HttpConnector>>`, which does not implement the `Copy` trait
在这种情况下,它告诉你它需要多个
self.gmail
,因为有一个for循环,但它只有一个。为了避免这种情况,您需要知道的是,如果某些东西没有实现Copy
,它通常仍然会实现Clone
。不同之处在于,编译器可以自由地自行使用 Copy
来避免移动,但您需要手动使用 Clone
。在这种情况下,您可以使用 Clone
为每个衍生的未来创建 gmail
客户端的专用实例。
for id in messages_id_list {
let gmail = self.gmail.clone();
tokio::spawn(async move {
let message = gmail
.users()
.…
// replace all the other instances of self.gmail as well
tokio::spawn
启动任务,但它不能确保它们在你的程序退出之前完成:
不能保证派生的任务会执行到完成。当运行时关闭时,所有未完成的任务都会被删除,无论该任务的生命周期如何。
这里的“运行时”是你在注释
#[tokio::main]
时创建的,它会在你的main
函数退出时关闭。
另外,如果你有很多邮件,我怀疑 gmail 会因为同时收到数千个请求而生你的气。
一次解决这两个问题:
futures::stream::iter(messages_id_list.iter())
.map(|id| {
let gmail = self.gmail.clone();
async move {
let message = gmail
.users()
// …
Ok::<_, String>(()) // You don't have error handling yet, so I'll use String as a dummy type.
}
})
.buffer_unordered(8)
.try_collect::<Vec<_>>()
.await?;
buffer_unordered
部分确保最多运行固定数量的消息获取器,而 await
确保它在 await
上的 get_attachments
完成之前全部完成。
最后:我认为
async
是 Rust 中不太好用的部分之一。你选择了一些难以开始的东西。