我想构建一个程序来收集天气更新并将其表示为流。我想无限循环地调用get_weather()
,在[[finish和start之间有60秒的延迟。
async fn get_weather() -> Weather { /* ... */ }
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
loop {
tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
let weather = get_weather().await;
yield weather; // This is not supported
// Note: waiting for get_weather() stops the timer and avoids overflows.
}
}
有什么方法可以轻松地做到这一点吗?
tokio::timer::Interval
花费60秒以上时,无法使用get_weather()
:
fn get_weather_stream() -> impl futures::Stream<Item = Weather> { tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60)) .then(|| get_weather()) }
如果发生这种情况,下一个功能将立即启动。我想在上一个get_weather()
开始和下一个get_weather()
开始之间保持准确的60秒。
stream::unfold
从“期货世界”转到“流世界”。我们不需要任何额外的状态,因此我们使用空元组:stream::unfold
use std::time::Duration;
use tokio::timer;
struct Weather;
async fn get_weather() -> Weather {
Weather
}
const BETWEEN: Duration = Duration::from_secs(1);
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
futures::stream::unfold((), |_| {
async {
timer::delay_for(BETWEEN).await;
let weather = get_weather().await;
Some((weather, ()))
}
})
}
use futures::StreamExt;
#[tokio::main]
async fn main() {
get_weather_stream()
.take(3)
.for_each(|_v| {
async {
println!("Got the weather");
}
})
.await;
}
% time ./target/debug/example
Got the weather
Got the weather
Got the weather
real 3.013 3013370us
user 0.004 3763us
sys 0.003 2604us
使用rustc 1.39.0-beta.7测试另请参见:
[dependencies]
futures-preview = "0.3.0-alpha.19"
tokio = "0.2.0-alpha.6"