有什么方法可以创建异步流生成器,从而产生重复调用函数的结果?

问题描述 投票:0回答:1

我想构建一个程序来收集天气更新并将其表示为流。我想无限循环地调用get_weather(),在[[finish和start之间有60秒的延迟。

简化版本如下:

async fn get_weather() -> Weather { /* ... */ } fn get_weather_stream() -> impl futures::Stream<Item = Weather> { loop { tokio::timer::delay_for(std::time::Duration::from_secs(60)).await; let weather = get_weather().await; yield weather; // This is not supported // Note: waiting for get_weather() stops the timer and avoids overflows. } }

有什么方法可以轻松地做到这一点吗?

tokio::timer::Interval花费60秒以上时,无法使用get_weather()

fn get_weather_stream() -> impl futures::Stream<Item = Weather> { tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60)) .then(|| get_weather()) }

如果发生这种情况,下一个功能将立即启动。我想在上一个get_weather()开始和下一个get_weather()开始之间保持准确的60秒。
asynchronous rust async-await rust-tokio
1个回答
0
投票
使用stream::unfold从“期货世界”转到“流世界”。我们不需要任何额外的状态,因此我们使用空元组:

stream::unfold

use std::time::Duration;
use tokio::timer;

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| {
        async {
            timer::delay_for(BETWEEN).await;
            let weather = get_weather().await;
            Some((weather, ()))
        }
    })
}

use futures::StreamExt;

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| {
            async {
                println!("Got the weather");
            }
        })
        .await;
}
% time ./target/debug/example

Got the weather
Got the weather
Got the weather

real    3.013   3013370us
user    0.004   3763us
sys     0.003   2604us
使用rustc 1.39.0-beta.7测试

另请参见:

© www.soinside.com 2019 - 2024. All rights reserved.