使用 tokio::sync::Semaphore 限制块中的异步请求

问题描述 投票:0回答:1

我正在使用一个 API,该 API 限制我每秒 40 个请求,每 120 秒 200 个请求。

我目前正在使用

reqwest
tokio
在 Rust 中设计异步模式。我想纳入速率限制约束。我在 Python 中使用信号量完成了类似的事情,并研究了 Rust 中的信号量,但不太确定如何构建我的代码。

理想情况下,我想:

  1. 批量发送 40 个请求(每秒不得超过 40 个)
  2. 一旦我达到 200 个请求,而计时器还没有达到 120 秒。停下来等待 120 秒。达到 429 将导致 120 秒的等待,因此目标是填满存储桶直到达到该限制,然后等到我可以再次开始发送请求。
  3. 所有请求完成后,将响应收集到
    Vec

关于如何最好地处理这个问题的其他想法和想法。我已经阅读了有关此类情况的其他几个问题,但尚未找到有效的方法。我对 Rust 中的 async-await 也是完全陌生的,所以任何重构建议都会有帮助。

当前的异步模式如下:

use tokio::time::{ sleep, Duration };
use reqwest::header::HeaderMap;

async fn _make_requests(
    headers: &HeaderMap, 
    requests: &Vec<String>
) -> Result<Vec<String>, Box<dyn std::error::Error>>
{
    let client = reqwest::Client::new();
    
    // Each req is a string URL which will pull back the response text from the API
    for req in requests
    {
        let client = client.clone();
        tokio::spawn(
            match async move {
                      let resp = client.get(req)
                                       .headers(headers.to_owned())
                                       .send()
                                       .await?
                                       .text()
                                       .await?;
                      Ok(resp)
            }
            .await
            // Handle resp status in match
            {

             Ok(resp) => println!("{:?}", resp),
             Err(e) => eprintln!("{}", e),
            }
       );
    }
}

fn main()
{
  // Create sample headers
  let mut headers = HeaderMap::new();
  headers.insert("Accept", "application/json".parse().unwrap());

  let rt = tokio::runtime::Builder::new_current_thread()
      .enable_all()
      .build()
      .unwrap();
  
  // Run event loop until complete
  rt.block_on(_make_requests(&headers, &requests));

  Ok(())  
}

rest rust async-await rust-tokio reqwest
1个回答
0
投票

你就快到了:

use tokio::time::{ sleep, Duration };
use reqwest::header::HeaderMap;

async fn _make_requests(
    headers: &HeaderMap, 
    requests: &Vec<String>
) -> Result<Vec<String>, Box<dyn std::error::Error>>
{
    let client = reqwest::Client::new();
    // Create a semaphore (important: including Arc)
    let semaphore = Arc::new(Semaphore::new(40));
    // Each req is a string URL which will pull back the response text from the API
    for req in requests
    {
        let client = client.clone();
        // Important is the acquire_owned() here
        let semaphore = Arc::clone(&semaphore);
        let permit = semaphore.acquire_owned().await.unwrap();

        tokio::spawn(
            match async move {
                      let resp = client.get(req)
                                       .headers(headers.to_owned())
                                       .send()
                                       .await?
                                       .text()
                                       .await?;
                      Ok(resp)
            }
            .await
            // Handle resp status in match
            {

             Ok(resp) => println!("{:?}", resp),
             Err(e) => eprintln!("{}", e),
            }
       );
    }
}

我还建议将从

tokio::spawn
返回的句柄添加到 Vec 并在 for 循环之后“等待”它们。

这应该一次为您提供 40 个并行生成的 tokio 任务。

© www.soinside.com 2019 - 2024. All rights reserved.