出于某种原因,我必须在我的项目中使用 tokio/future 0.1,我想看很多 consul 服务。当一个未来被长轮询请求阻塞时,我希望其他相同的未来可以轮询。我该怎么办?
Cargo.toml
[dependencies]
tokio = "0.1"
futures= "0.1"
我的测试用例将每 5 秒输出一次 this is a test xxx
this is a test name1
this is a test name2
this is a test name1
this is a test name2
use std::thread;
use std::thread::sleep;
use futures::{Async, Future, Stream, task};
use futures::future::join_all;
use tokio::runtime::current_thread;
pub struct JnsWatcher {
name: String,
}
impl JnsWatcher {
pub fn new(name: String) -> Self {
JnsWatcher {
name
}
}
}
impl Future for JnsWatcher {
type Item = ();
type Error = ();
fn poll(&mut self, ) -> Result<Async<Self::Item>, Self::Error> {
loop {
sleep(std::time::Duration::from_secs(5)); // real code will be a long poll request, just like : self.client.service(self.ws.service_name.as_str(), self.ws.tag.as_deref(), self.ws.passing_only.unwrap_or(true), Some(opt))
println!("this is a test : {}", self.name);
task::current().notify();
return Ok(Async::NotReady);
}
}
}
#[test]
fn test_pool() {
let w = JnsWatcher::new("name1".to_string());
let w2 = JnsWatcher::new("name2".to_string());
let t = thread::Builder::new()
.name("watchtest".to_string())
.spawn(move || {
current_thread::block_on_all(join_all(vec![w, w2])).unwrap();
})
.expect("fail to spawn worker thread");
t.join().unwrap();
println!("test end");
}
我建议使用
Stream
创建 futures::stream::iter_ok
实例并使用 futures::stream::select_all
函数同时轮询流中的所有期货。
所以代码可能看起来像这样。
use std::thread;
use std::thread::sleep;
use futures::{Async, Future, Stream, task};
use futures::future::join_all;
use futures::stream::{iter_ok, select_all};
use tokio::runtime::current_thread;
pub struct JnsWatcher {
name: String,
}
impl JnsWatcher {
pub fn new(name: String) -> Self {
JnsWatcher {
name
}
}
}
impl Future for JnsWatcher {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
sleep(std::time::Duration::from_secs(5)); // simulate a long poll request
println!("this is a test : {}", self.name);
task::current().notify();
Ok(Async::NotReady)
}
}
#[test]
fn test_pool() {
let watchers = vec![
JnsWatcher::new("name1".to_string()),
JnsWatcher::new("name2".to_string()),
JnsWatcher::new("name3".to_string()),
];
let stream = select_all(watchers.into_iter().map(|w| w.into_future()));
let t = thread::Builder::new()
.name("watchtest".to_string())
.spawn(move || {
current_thread::block_on_all(stream).unwrap();
})
.expect("fail to spawn worker thread");
t.join().unwrap();
println!("test end");
}