如何删除或处理流以外的情况下忽略错误?

问题描述 投票:1回答:1

我有我想用Stream::buffer_unordered / Stream::buffered运行哪个期货的一个长长的清单。我结合这个流成一个单一的未来for_each,然后与东京执行这一切。这是很常见的是,期货的人会返回一个错误。根据该文件,当返回一个错误for_each将停止。

我怎么能忽略或只是打印的消息时返回这些错误,并继续执行后续期货?

这是类似我的情况一般的代码:

use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;

fn main() {
    let queries: Vec<u32> = (0..10).collect();
    let futures = queries.into_iter().map(move |num| {
        println!("Started {}", num);
        // Maybe throw error
        let future = match num % 3 {
            0 => ok::<u32, u32>(num),
            _ => err::<u32, u32>(num)
        };
        future
    });

    let stream = stream::iter_ok(futures);
    let num_workers = 8;
    let future = stream
        .buffer_unordered(num_workers)
        .map_err(|err| {
            println!("Error on {:?}", err);
        })
        .for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        });

    tokio::runtime::run(future);
}

Rust Playground

如果你尝试这个例子,期货的队列将停止当Err抛出年初执行。

rust future
1个回答
2
投票
  • Stream::map_err - 提供了错误的价值观,它可以转换的类型,但它留下它作为一个错误。
  • Stream::or_else - 提供了错误的价值观,它可以将错误转化为成功,让成功的值不变。
  • Stream::then - 提供了成功和错误值,并且可以做任何你想要的。

Stream::map不给你错误转化为成功的能力,所以它是没有用的。 Stream::or_else确实给的能力,但它的使用的时候可以将错误类型转化为成功的类型。只有Stream::then为您提供了两种类型的一次转换的能力。

Stream::flatten可用于流的数据流转换为单个流。

与事实Result可以作为一个迭代器来处理结合这一点,你可以创建这样的:

stream
    .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
    .flatten()

不管流的产品OkErr,我们把它转换成一个迭代器,并从它创建一个流。然后,我们扁平化流的流。

如果你想打印出的错误,我会使用Stream::inspect_err

stream.inspect_err(|err| println!("Error on {:?}", err))

完整的代码:

use futures::{
    future,
    stream::{self, Stream},
}; // 0.1.25;
use tokio; // 0.1.14

fn main() {
    let stream = stream::iter_ok({
        (0..10).map(|num| {
            println!("Started {}", num);
            match num % 3 {
                0 => future::ok(num),
                _ => future::err(num),
            }
        })
    })
    .buffer_unordered(2);

    let stream = stream
        .inspect_err(|err| println!("Error on {:?}", err))
        .then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
        .flatten();

    tokio::run({
        stream.for_each(|n| {
            println!("Success on {:?}", n);
            Ok(())
        })
    });
}
Started 0
Started 1
Success on 0
Started 2
Error on 1
Started 3
Error on 2
Started 4
Success on 3
Started 5
Error on 4
Started 6
Error on 5
Started 7
Success on 6
Started 8
Error on 7
Started 9
Error on 8
Success on 9
© www.soinside.com 2019 - 2024. All rights reserved.