我有我想用Stream::buffer_unordered
/ Stream::buffered
运行哪个期货的一个长长的清单。我结合这个流成一个单一的未来for_each
,然后与东京执行这一切。这是很常见的是,期货的人会返回一个错误。根据该文件,当返回一个错误for_each
将停止。
我怎么能忽略或只是打印的消息时返回这些错误,并继续执行后续期货?
这是类似我的情况一般的代码:
use futures::stream;
use futures::stream::Stream;
use futures::future::err;
use futures::future::ok;
use tokio;
fn main() {
let queries: Vec<u32> = (0..10).collect();
let futures = queries.into_iter().map(move |num| {
println!("Started {}", num);
// Maybe throw error
let future = match num % 3 {
0 => ok::<u32, u32>(num),
_ => err::<u32, u32>(num)
};
future
});
let stream = stream::iter_ok(futures);
let num_workers = 8;
let future = stream
.buffer_unordered(num_workers)
.map_err(|err| {
println!("Error on {:?}", err);
})
.for_each(|n| {
println!("Success on {:?}", n);
Ok(())
});
tokio::runtime::run(future);
}
如果你尝试这个例子,期货的队列将停止当Err
抛出年初执行。
Stream::map_err
- 提供了错误的价值观,它可以转换的类型,但它留下它作为一个错误。Stream::or_else
- 提供了错误的价值观,它可以将错误转化为成功,让成功的值不变。Stream::then
- 提供了成功和错误值,并且可以做任何你想要的。Stream::map
不给你错误转化为成功的能力,所以它是没有用的。 Stream::or_else
确实给的能力,但它的使用的时候可以将错误类型转化为成功的类型。只有Stream::then
为您提供了两种类型的一次转换的能力。
Stream::flatten
可用于流的数据流转换为单个流。
与事实Result
可以作为一个迭代器来处理结合这一点,你可以创建这样的:
stream
.then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
.flatten()
不管流的产品Ok
或Err
,我们把它转换成一个迭代器,并从它创建一个流。然后,我们扁平化流的流。
如果你想打印出的错误,我会使用Stream::inspect_err
:
stream.inspect_err(|err| println!("Error on {:?}", err))
完整的代码:
use futures::{
future,
stream::{self, Stream},
}; // 0.1.25;
use tokio; // 0.1.14
fn main() {
let stream = stream::iter_ok({
(0..10).map(|num| {
println!("Started {}", num);
match num % 3 {
0 => future::ok(num),
_ => future::err(num),
}
})
})
.buffer_unordered(2);
let stream = stream
.inspect_err(|err| println!("Error on {:?}", err))
.then(|r| future::ok(stream::iter_ok::<_, ()>(r)))
.flatten();
tokio::run({
stream.for_each(|n| {
println!("Success on {:?}", n);
Ok(())
})
});
}
Started 0
Started 1
Success on 0
Started 2
Error on 1
Started 3
Error on 2
Started 4
Success on 3
Started 5
Error on 4
Started 6
Error on 5
Started 7
Success on 6
Started 8
Error on 7
Started 9
Error on 8
Success on 9