为什么Future :: select首先选择睡眠时间较长的未来?

问题描述 投票:2回答:1

我试图了解Future::select:在这个例子中,首先返回具有更长时间延迟的未来。

当我用它的例子读this article时,我得到了认知失调。作者写道:

select函数运行两个(或更多的select_all)期货,并返回第一个完成。这对于实现超时很有用。

我似乎不明白select的感觉。

extern crate futures;
extern crate tokio_core;

use std::thread;
use std::time::Duration;
use futures::{Async, Future};
use tokio_core::reactor::Core;

struct Timeout {
    time: u32,
}

impl Timeout {
    fn new(period: u32) -> Timeout {
        Timeout { time: period }
    }
}

impl Future for Timeout {
    type Item = u32;
    type Error = String;

    fn poll(&mut self) -> Result<Async<u32>, Self::Error> {
        thread::sleep(Duration::from_secs(self.time as u64));
        println!("Timeout is done with time {}.", self.time);
        Ok(Async::Ready(self.time))
    }
}

fn main() {
    let mut reactor = Core::new().unwrap();

    let time_out1 = Timeout::new(5);
    let time_out2 = Timeout::new(1);

    let task = time_out1.select(time_out2);

    let mut reactor = Core::new().unwrap();
    reactor.run(task);
}

我需要以较小的时间延迟处理早期的未来,然后以更长的延迟来处理未来。我该怎么做?

select rust future
1个回答
8
投票

Use tokio::timer

如果有一件事要从中消除:永远不要在异步操作中执行阻塞或长时间运行。

如果你想要超时,请使用tokio::timer中的内容,例如DelayTimeout

use futures::prelude::*; // 0.1.26
use std::time::{Duration, Instant};
use tokio::timer::Delay; // 0.1.18

fn main() {
    let time_out1 = Delay::new(Instant::now() + Duration::from_secs(5));
    let time_out2 = Delay::new(Instant::now() + Duration::from_secs(1));

    let task = time_out1.select(time_out2);

    tokio::run(task.map(drop).map_err(drop));
}

What's the problem?

要了解为什么会得到你所做的行为,你必须在高层次上理解期货的实施。

当你调用run时,有一个循环在传入的未来调用poll。它循环直到未来返回成功或失败,否则未来尚未完成。

你的poll实现“锁定”这个循环5秒钟,因为没有什么可以打破对sleep的调用。到睡眠完成时,未来就绪,因此选择了未来。

异步超时的实现在概念上通过每次轮询时检查时钟来工作,并说明是否已经过了足够的时间。

最大的区别在于,当未来回归尚未准备好时,可以检查另一个未来。这就是select所做的!

戏剧性的重演:

基于睡眠的计时器

核心:嘿select,你准备好了吗?

选择:嘿future1,你准备好了吗?

future1:坚持一个seconnnnnnn [... 5秒钟传递...] nnnnd。是!

基于异步的计时器

核心:嘿select,你准备好了吗?

选择:嘿future1,你准备好了吗?

future1:检查手表号

选择:嘿future2,你准备好了吗?

future2:检查手表号

核心:嘿select,你准备好了吗?

[... 1秒钟通过...]

核心:嘿select,你准备好了吗?

选择:嘿future1,你准备好了吗?

future1:检查手表号

选择:嘿future2,你准备好了吗?

future2:检查是的!

The generic solution

如果您有一个阻塞或长时间运行的操作,那么适当的做法是将该工作移出异步循环。有关详细信息和示例,请参阅What is the best approach to encapsulate blocking I/O in future-rs?

但是,使用线程池实现超时效率非常低;实际上不使用此代码!

use std::{thread, time::Duration};
use tokio::{prelude::*, runtime::Runtime}; // 0.1.18
use tokio_threadpool; // 0.1.13

fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
    future::poll_fn(move || {
        tokio_threadpool::blocking(|| {
            thread::sleep(Duration::from_secs(seconds));
            seconds
        })
    })
}

fn main() {
    let a = delay_for(3);
    let b = delay_for(1);
    let sum = a.join(b).map(|(a, b)| a + b);

    let mut runtime = Runtime::new().expect("Unable to start the runtime");
    let result = runtime.block_on(sum);
    println!("{:?}", result);
}
© www.soinside.com 2019 - 2024. All rights reserved.