我试图了解Future::select
:在这个例子中,首先返回具有更长时间延迟的未来。
当我用它的例子读this article时,我得到了认知失调。作者写道:
select
函数运行两个(或更多的select_all
)期货,并返回第一个完成。这对于实现超时很有用。
我似乎不明白select
的感觉。
extern crate futures;
extern crate tokio_core;
use std::thread;
use std::time::Duration;
use futures::{Async, Future};
use tokio_core::reactor::Core;
struct Timeout {
time: u32,
}
impl Timeout {
fn new(period: u32) -> Timeout {
Timeout { time: period }
}
}
impl Future for Timeout {
type Item = u32;
type Error = String;
fn poll(&mut self) -> Result<Async<u32>, Self::Error> {
thread::sleep(Duration::from_secs(self.time as u64));
println!("Timeout is done with time {}.", self.time);
Ok(Async::Ready(self.time))
}
}
fn main() {
let mut reactor = Core::new().unwrap();
let time_out1 = Timeout::new(5);
let time_out2 = Timeout::new(1);
let task = time_out1.select(time_out2);
let mut reactor = Core::new().unwrap();
reactor.run(task);
}
我需要以较小的时间延迟处理早期的未来,然后以更长的延迟来处理未来。我该怎么做?
tokio::timer
如果有一件事要从中消除:永远不要在异步操作中执行阻塞或长时间运行。
如果你想要超时,请使用tokio::timer
中的内容,例如Delay
或Timeout
:
use futures::prelude::*; // 0.1.26
use std::time::{Duration, Instant};
use tokio::timer::Delay; // 0.1.18
fn main() {
let time_out1 = Delay::new(Instant::now() + Duration::from_secs(5));
let time_out2 = Delay::new(Instant::now() + Duration::from_secs(1));
let task = time_out1.select(time_out2);
tokio::run(task.map(drop).map_err(drop));
}
要了解为什么会得到你所做的行为,你必须在高层次上理解期货的实施。
当你调用run
时,有一个循环在传入的未来调用poll
。它循环直到未来返回成功或失败,否则未来尚未完成。
你的poll
实现“锁定”这个循环5秒钟,因为没有什么可以打破对sleep
的调用。到睡眠完成时,未来就绪,因此选择了未来。
异步超时的实现在概念上通过每次轮询时检查时钟来工作,并说明是否已经过了足够的时间。
最大的区别在于,当未来回归尚未准备好时,可以检查另一个未来。这就是select
所做的!
戏剧性的重演:
基于睡眠的计时器
核心:嘿
select
,你准备好了吗?选择:嘿
future1
,你准备好了吗?future1:坚持一个seconnnnnnn [... 5秒钟传递...] nnnnd。是!
基于异步的计时器
核心:嘿
select
,你准备好了吗?选择:嘿
future1
,你准备好了吗?future1:检查手表号
选择:嘿
future2
,你准备好了吗?future2:检查手表号
核心:嘿
select
,你准备好了吗?[... 1秒钟通过...]
核心:嘿
select
,你准备好了吗?选择:嘿
future1
,你准备好了吗?future1:检查手表号
选择:嘿
future2
,你准备好了吗?future2:检查是的!
如果您有一个阻塞或长时间运行的操作,那么适当的做法是将该工作移出异步循环。有关详细信息和示例,请参阅What is the best approach to encapsulate blocking I/O in future-rs?。
但是,使用线程池实现超时效率非常低;实际上不使用此代码!
use std::{thread, time::Duration};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13
fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
future::poll_fn(move || {
tokio_threadpool::blocking(|| {
thread::sleep(Duration::from_secs(seconds));
seconds
})
})
}
fn main() {
let a = delay_for(3);
let b = delay_for(1);
let sum = a.join(b).map(|(a, b)| a + b);
let mut runtime = Runtime::new().expect("Unable to start the runtime");
let result = runtime.block_on(sum);
println!("{:?}", result);
}