如何将 google_gmail1::Gmail<HttpsConnector<HttpConnector>> 传递给线程生成?

问题描述 投票:0回答:1

Rust 非常新......我正在寻找使用指定查询从 Gmail 下载电子邮件附件,我有一个粗略的同步代码,我正在寻找使用多线程使用?Tokio::spawn? (因为项目的其余部分已经在使用 tokio)当前的多线程?代码如下:

struct EmailInterface {
    pub gmail: Gmail<HttpsConnector<HttpConnector>>,
}

impl EmailInterface {
    async fn new(path_to_key: String) -> Result<EmailInterface, Box<dyn error::Error>> {
        let authenticator =
            ServiceAccountAuthenticator::builder(read_service_account_key(path_to_key).await?)
                .build()
                .await?;

        let gmail = Gmail::new(
            hyper::Client::builder().build(
                hyper_rustls::HttpsConnectorBuilder::new()
                    .with_native_roots()
                    .https_or_http()
                    .enable_http1()
                    .enable_http2()
                    .build(),
            ),
            authenticator,
        );

        Ok(EmailInterface { gmail })
    }

    pub async fn get_attachments(
        self,
        query: &str,
    ) -> Result<(), Box<dyn std::error::Error>> {
        if let Some(messages_list) = self
            .gmail
            .users()
            .messages_list("me")
            .q(query)
            .include_spam_trash(false)
            .doit()
            .await?
            .1
            .messages
        {
            let messages_id_list: Vec<String> = messages_list
                .into_iter()
                .flat_map(|message| message.id)
                .collect();

            for id in messages_id_list {
                tokio::spawn(async move {

                    let message = self.gmail
                        .users()
                        .messages_get("me", id.as_str())
                        .doit()
                        .await
                        .unwrap_or_default()
                        .1;

                    let message = self
                        .gmail
                        .users()
                        .messages_get("me", id.as_str())
                        .doit()
                        .await
                        .unwrap_or_default()
                        .1;
                    for part in message.payload {
                        if let Some(part_body) = part.body {
                            if let Some(attachment_id) = part_body.attachment_id {
                                let attachment = self
                                    .gmail
                                    .users()
                                    .messages_attachments_get(
                                        "me",
                                        id.as_str(),
                                        attachment_id.as_str(),
                                    )
                                    .doit()
                                    .await
                                    .unwrap_or_default();
                                let data = general_purpose::STANDARD
                                    .decode(&attachment.1.data.unwrap_or_default())
                                    .unwrap();
                                std::fs::write(part.filename.expect("Should have a name"), &data)
                                    .unwrap();
                            }
                        }
                    }
                });
            }
        }
        Ok(())
    }
}

我目前得到的错误

use of moved value: `self.gmail`
move occurs because `self.gmail` has type `Gmail<HttpsConnector<HttpConnector>>`, which does not implement the `Copy` trait

如果有更精简的方法来做到这一点,我洗耳恭听(眼睛?)

我也尝试过使用 ThreadPool crate 和 ChatGPT 寻求建议,但后者总是以错误结束。

multithreading rust threadpool gmail-api
1个回答
0
投票

咕噜咕噜MRE咕噜

首先,我给几乎所有 Rust 初学者一个建议:不要依赖你的 IDE 来获得好的错误消息。阅读

cargo check
的输出,它通常提供更多信息和帮助:

error[E0382]: use of moved value: `self.gmail`
  --> src/main.rs:52:41
   |
52 |                    tokio::spawn(async move {
   |   _________________________________________^
53 |  |                     let message = self
   |  |___________________________________-
54 | ||                         .gmail
   | ||______________________________- use occurs due to use in generator
55 |  |                         .users()
...   |
91 |  |                     }
92 |  |                 });
   |  |_________________^ value moved here, in previous iteration of loop
   |
   = note: move occurs because `self.gmail` has type `Gmail<HttpsConnector<HttpConnector>>`, which does not implement the `Copy` trait

在这种情况下,它告诉你它需要多个

self.gmail
,因为有一个for循环,但它只有一个。为了避免这种情况,您需要知道的是,如果某些东西没有实现
Copy
,它通常仍然会实现
Clone
。不同之处在于,编译器可以自由地自行使用
Copy
来避免移动,但您需要手动使用
Clone
。在这种情况下,您可以使用
Clone
为每个衍生的未来创建
gmail
客户端的专用实例。

for id in messages_id_list {
    let gmail = self.gmail.clone();
    tokio::spawn(async move {
        let message = gmail
            .users()
            .…
        // replace all the other instances of self.gmail as well

这将使你的代码编译,但它不会工作:

tokio::spawn
启动任务,但它不能确保它们在你的程序退出之前完成:

不能保证派生的任务会执行到完成。当运行时关闭时,所有未完成的任务都会被删除,无论该任务的生命周期如何。

这里的“运行时”是你在注释

#[tokio::main]
时创建的,它会在你的
main
函数退出时关闭。

另外,如果你有很多邮件,我怀疑 gmail 会因为同时收到数千个请求而生你的气。

一次解决这两个问题:

futures::stream::iter(messages_id_list.iter())
    .map(|id| {
        let gmail = self.gmail.clone();
        async move {
            let message = gmail
                .users()
            // …
            Ok::<_, String>(()) // You don't have error handling yet, so I'll use String as a dummy type.
        }
    })
    .buffer_unordered(8)
    .try_collect::<Vec<_>>()
    .await?;

buffer_unordered
部分确保最多运行固定数量的消息获取器,而
await
确保它在
await
上的
get_attachments
完成之前全部完成。

最后:我认为

async
是 Rust 中不太好用的部分之一。你选择了一些难以开始的东西。

© www.soinside.com 2019 - 2024. All rights reserved.